Change udpserver packet handling to be similar to udphandler

merge-requests/5/head
Phil Taylor 2021-06-01 23:32:39 +01:00
rodzic 27ae15af33
commit 2112c97b1e
2 zmienionych plików z 175 dodań i 145 usunięć

Wyświetl plik

@ -748,10 +748,7 @@ void udpServer::commonReceived(QList<CLIENT*>* l, CLIENT* current, QByteArray r)
{
// Single packet request
qInfo(logUdpServer()) << current->ipAddress.toString() << "(" << current->type << "): Received 'retransmit' request for " << in->seq;
auto match = std::find_if(current->txSeqBuf.begin(), current->txSeqBuf.end(), [&cs = in->seq](SEQBUFENTRY& s) {
return s.seqNum == cs;
});
QMap<quint16, SEQBUFENTRY>::iterator match = current->txSeqBuf.find(in->seq);
if (match != current->txSeqBuf.end() && match->retransmitCount < 5) {
// Found matching entry?
@ -811,28 +808,39 @@ void udpServer::commonReceived(QList<CLIENT*>* l, CLIENT* current, QByteArray r)
current->rxMutex.lock();
if (current->rxSeqBuf.isEmpty())
{
current->rxSeqBuf.append(in->seq);
if (current->rxSeqBuf.size() > 400)
{
current->rxSeqBuf.remove(0);
}
current->rxSeqBuf.insert(in->seq, QTime::currentTime());
}
else
{
std::sort(current->rxSeqBuf.begin(), current->rxSeqBuf.end());
if (in->seq < current->rxSeqBuf.front())
if (in->seq < current->rxSeqBuf.firstKey())
{
qInfo(logUdpServer()) << current->ipAddress.toString() << "(" << current->type << "): ******* seq number may have rolled over ****** previous highest: " << hex << current->rxSeqBuf.back() << " current: " << hex << in->seq;
qInfo(logUdpServer()) << current->ipAddress.toString() << "(" << current->type << "): ******* seq number may have rolled over ****** previous highest: " << hex << current->rxSeqBuf.lastKey() << " current: " << hex << in->seq;
// Looks like it has rolled over so clear buffer and start again.
current->rxSeqBuf.clear();
current->rxMutex.unlock(); // Must unlock the Mutex!
current->missMutex.lock();
current->rxMissing.clear();
current->missMutex.unlock();
return;
}
if (!current->rxSeqBuf.contains(in->seq))
{
// Add incoming packet to the received buffer and if it is in the missing buffer, remove it.
current->rxSeqBuf.append(in->seq);
if (current->rxSeqBuf.size() > 400)
{
current->rxSeqBuf.remove(0);
}
current->rxSeqBuf.insert(in->seq, QTime::currentTime());
} else{
// Check whether this is one of our missing ones!
current->missMutex.lock();
auto s = std::find_if(current->rxMissing.begin(), current->rxMissing.end(), [&cs = in->seq](SEQBUFENTRY& s) { return s.seqNum == cs; });
QMap<quint16, int>::iterator s = current->rxMissing.find(in->seq);
if (s != current->rxMissing.end())
{
qInfo(logUdpServer()) << current->ipAddress.toString() << "(" << current->type << "): Missing SEQ has been received! " << hex << in->seq;
@ -862,12 +870,14 @@ void udpServer::sendControl(CLIENT* c, quint8 type, quint16 seq)
if (seq == 0x00)
{
p.seq = c->txSeq;
SEQBUFENTRY s;
s.seqNum = seq;
s.timeSent = QTime::currentTime();
s.retransmitCount = 0;
s.data = QByteArray::fromRawData((const char*)p.packet, sizeof(p));
c->txMutex.lock();
c->txSeqBuf.append(SEQBUFENTRY());
c->txSeqBuf.last().seqNum = seq;
c->txSeqBuf.last().timeSent = QTime::currentTime();
c->txSeqBuf.last().retransmitCount = 0;
c->txSeqBuf.last().data = QByteArray::fromRawData((const char*)p.packet, sizeof(p));
c->txSeqBuf.insert(seq, s);
c->txMutex.unlock();
udpMutex.lock();
c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port);
udpMutex.unlock();
@ -875,7 +885,6 @@ void udpServer::sendControl(CLIENT* c, quint8 type, quint16 seq)
//if (c->idleTimer != Q_NULLPTR) {
// c->idleTimer->start(100);
//}
c->txMutex.unlock();
}
else {
p.seq = seq;
@ -966,21 +975,22 @@ void udpServer::sendLoginResponse(CLIENT* c, bool allowed)
strcpy(p.connection, "WFVIEW");
}
SEQBUFENTRY s;
s.seqNum = c->txSeq;
s.timeSent = QTime::currentTime();
s.retransmitCount = 0;
s.data = QByteArray::fromRawData((const char*)p.packet, sizeof(p));
c->txMutex.lock();
c->txSeqBuf.append(SEQBUFENTRY());
c->txSeqBuf.last().seqNum = c->txSeq;
c->txSeqBuf.last().timeSent = QTime::currentTime();
c->txSeqBuf.last().retransmitCount = 0;
c->txSeqBuf.last().data = QByteArray::fromRawData((const char*)p.packet, sizeof(p));
c->txSeqBuf.insert(c->txSeq, s);
c->txMutex.unlock();
udpMutex.lock();
c->socket->writeDatagram(c->txSeqBuf.last().data, c->ipAddress, c->port);
c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port);
udpMutex.unlock();
c->txSeq++;
if (c->idleTimer != Q_NULLPTR)
c->idleTimer->start(100);
c->txMutex.unlock();
return;
}
@ -1067,16 +1077,22 @@ void udpServer::sendCapabilities(CLIENT* c)
p.capf = 0x5001;
p.capg = 0x0190;
SEQBUFENTRY s;
s.seqNum = p.seq;
s.timeSent = QTime::currentTime();
s.retransmitCount = 0;
s.data = QByteArray::fromRawData((const char*)p.packet, sizeof(p));
c->txMutex.lock();
c->txSeqBuf.append(SEQBUFENTRY());
c->txSeqBuf.last().seqNum = p.seq;
c->txSeqBuf.last().timeSent = QTime::currentTime();
c->txSeqBuf.last().retransmitCount = 0;
c->txSeqBuf.last().data = QByteArray::fromRawData((const char*)p.packet, sizeof(p));
if (c->txSeqBuf.size() > 400)
{
c->txSeqBuf.remove(0);
}
c->txSeqBuf.insert(p.seq, s);
c->txMutex.unlock();
udpMutex.lock();
c->socket->writeDatagram(c->txSeqBuf.last().data, c->ipAddress, c->port);
c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port);
udpMutex.unlock();
if (c->idleTimer != Q_NULLPTR)
@ -1121,13 +1137,19 @@ void udpServer::sendConnectionInfo(CLIENT* c)
p.identb = c->identb;
}
c->txMutex.lock();
c->txSeqBuf.append(SEQBUFENTRY());
c->txSeqBuf.last().seqNum = p.seq;
c->txSeqBuf.last().timeSent = QTime::currentTime();
c->txSeqBuf.last().retransmitCount = 0;
c->txSeqBuf.last().data = QByteArray::fromRawData((const char*)p.packet, sizeof(p));
SEQBUFENTRY s;
s.seqNum = p.seq;
s.timeSent = QTime::currentTime();
s.retransmitCount = 0;
s.data = QByteArray::fromRawData((const char*)p.packet, sizeof(p));
c->txMutex.lock();
if (c->txSeqBuf.size() > 400)
{
c->txSeqBuf.remove(0);
}
c->txSeqBuf.insert(p.seq, s);
c->txMutex.unlock();
udpMutex.lock();
c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port);
udpMutex.unlock();
@ -1160,17 +1182,24 @@ void udpServer::sendTokenResponse(CLIENT* c, quint8 type)
p.commoncap = c->commonCap;
p.res = type;
c->txMutex.lock();
c->txSeqBuf.append(SEQBUFENTRY());
c->txSeqBuf.last().seqNum = p.seq;
c->txSeqBuf.last().timeSent = QTime::currentTime();
c->txSeqBuf.last().retransmitCount = 0;
c->txSeqBuf.last().data = QByteArray::fromRawData((const char*)p.packet, sizeof(p));
SEQBUFENTRY s;
s.seqNum = p.seq;
s.timeSent = QTime::currentTime();
s.retransmitCount = 0;
s.data = QByteArray::fromRawData((const char*)p.packet, sizeof(p));
c->txMutex.lock();
if (c->txSeqBuf.size() > 400)
{
c->txSeqBuf.remove(0);
}
c->txSeqBuf.insert(p.seq, s);
c->txMutex.unlock();
udpMutex.lock();
c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port);
udpMutex.unlock();
if (c->idleTimer != Q_NULLPTR)
c->idleTimer->start(100);
@ -1183,35 +1212,9 @@ void udpServer::sendTokenResponse(CLIENT* c, quint8 type)
void udpServer::watchdog(CLIENT* c)
{
if (c->txMutex.tryLock()) {
//qInfo(logUdpServer()) << c->ipAddress.toString() << ":" << c->port << ":Buffers tx:"<< c->txSeqBuf.length() << " rx:" << c->rxSeqBuf.length();
// Erase old entries from the tx packet buffer. Keep the first 100 sent packets as we seem to get asked for these?
if (!c->txSeqBuf.isEmpty())
{
c->txSeqBuf.erase(std::remove_if(c->txSeqBuf.begin(), c->txSeqBuf.end(), [](const SEQBUFENTRY& v)
{ return v.timeSent.secsTo(QTime::currentTime()) > PURGE_SECONDS; }), c->txSeqBuf.end());
}
c->txMutex.unlock();
}
// Erase old entries from the missing packets buffer
if (c->missMutex.tryLock()) {
if (!c->rxMissing.isEmpty()) {
c->rxMissing.erase(std::remove_if(c->rxMissing.begin(), c->rxMissing.end(), [](const SEQBUFENTRY& v)
{ return v.timeSent.secsTo(QTime::currentTime()) > PURGE_SECONDS; }), c->rxMissing.end());
}
c->missMutex.unlock();
}
if (c->rxMutex.tryLock()) {
if (!c->rxSeqBuf.isEmpty()) {
std::sort(c->rxSeqBuf.begin(), c->rxSeqBuf.end());
Q_UNUSED(c);
// Do something!
if (c->rxSeqBuf.length() > 400)
{
c->rxSeqBuf.remove(0, 200);
}
}
c->rxMutex.unlock();
}
}
void udpServer::sendStatus(CLIENT* c)
@ -1242,20 +1245,23 @@ void udpServer::sendStatus(CLIENT* c)
// Send this to reject the request to tx/rx audio/civ
//memcpy(p + 0x30, QByteArrayLiteral("\xff\xff\xff\xfe").constData(), 4);
c->txSeq++;
SEQBUFENTRY s;
s.seqNum = p.seq;
s.timeSent = QTime::currentTime();
s.retransmitCount = 0;
s.data = QByteArray::fromRawData((const char*)p.packet, sizeof(p));
c->txMutex.lock();
c->txSeqBuf.append(SEQBUFENTRY());
c->txSeqBuf.last().seqNum = p.seq;
c->txSeqBuf.last().timeSent = QTime::currentTime();
c->txSeqBuf.last().retransmitCount = 0;
c->txSeqBuf.last().data = QByteArray::fromRawData((const char*)p.packet, sizeof(p));
if (c->txSeqBuf.size() > 400)
{
c->txSeqBuf.remove(0);
}
c->txSeqBuf.insert(p.seq, s);
c->txMutex.unlock();
udpMutex.lock();
c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port);
udpMutex.unlock();
c->txSeq++;
}
@ -1278,15 +1284,19 @@ void udpServer::dataForServer(QByteArray d)
p.sendseq = client->innerSeq;
QByteArray t = QByteArray::fromRawData((const char*)p.packet, sizeof(p));
t.append(d);
client->txMutex.lock();
client->txSeqBuf.append(SEQBUFENTRY());
client->txSeqBuf.last().seqNum = p.seq;
client->txSeqBuf.last().timeSent = QTime::currentTime();
client->txSeqBuf.last().retransmitCount = 0;
client->txSeqBuf.last().data = t;
client->txMutex.unlock();
//qInfo(logUdpServer()) << "Sending:" << d;
SEQBUFENTRY s;
s.seqNum = p.seq;
s.timeSent = QTime::currentTime();
s.retransmitCount = 0;
s.data = t;
client->txMutex.lock();
if (client->txSeqBuf.size() > 400)
{
client->txSeqBuf.remove(0);
}
client->txSeqBuf.insert(p.seq, s);
client->txMutex.unlock();
udpMutex.lock();
client->socket->writeDatagram(t, client->ipAddress, client->port);
udpMutex.unlock();
@ -1333,21 +1343,22 @@ void udpServer::receiveAudioData(const audioPacket& d)
p.seq = client->txSeq;
QByteArray t = QByteArray::fromRawData((const char*)p.packet, sizeof(p));
t.append(d.data);
client->txMutex.lock();
client->txSeqBuf.append(SEQBUFENTRY());
client->txSeqBuf.last().seqNum = p.seq;
client->txSeqBuf.last().timeSent = QTime::currentTime();
client->txSeqBuf.last().retransmitCount = 0;
client->txSeqBuf.last().data = t;
client->txMutex.unlock();
if (udpMutex.tryLock()) {
client->socket->writeDatagram(t, client->ipAddress, client->port);
udpMutex.unlock();
}
else {
qDebug(logUdpServer()) << "Failed to lock udpMutex()";
SEQBUFENTRY s;
s.seqNum = p.seq;
s.timeSent = QTime::currentTime();
s.retransmitCount = 0;
s.data = t;
client->txMutex.lock();
if (client->txSeqBuf.size() > 400)
{
client->txSeqBuf.remove(0);
}
client->txSeqBuf.insert(p.seq, s);
client->txMutex.unlock();
udpMutex.lock();
client->socket->writeDatagram(t, client->ipAddress, client->port);
udpMutex.unlock();
client->txSeq++;
client->sendAudioSeq++;
}
@ -1363,69 +1374,71 @@ void udpServer::receiveAudioData(const audioPacket& d)
/// <param name="c"></param>
void udpServer::sendRetransmitRequest(CLIENT* c)
{
c->missMutex.lock();
// Find all gaps in received packets and then send requests for them.
// This will run every 100ms so out-of-sequence packets will not trigger a retransmit request.
QByteArray missingSeqs;
auto i = std::adjacent_find(c->rxSeqBuf.begin(), c->rxSeqBuf.end(), [](quint16 l, quint16 r) {return l + 1 < r; });
while (i != c->rxSeqBuf.end())
c->rxMutex.lock();
if (!c->rxSeqBuf.empty() && c->rxSeqBuf.size() <= c->rxSeqBuf.lastKey() - c->rxSeqBuf.firstKey())
{
if (i + 1 != c->rxSeqBuf.end())
if ((c->rxSeqBuf.lastKey() - c->rxSeqBuf.firstKey() - c->rxSeqBuf.size()) > 20)
{
if (*(i + 1) - *i < 30)
{
for (quint16 j = *i + 1; j < *(i + 1); j++)
{
qInfo(logUdpServer()) << c->ipAddress.toString() << "(" << c->type << "): Found missing seq between " << *i << " : " << *(i + 1) << " (" << j << ")";
auto s = std::find_if(c->rxMissing.begin(), c->rxMissing.end(), [&cs = j](SEQBUFENTRY& s) { return s.seqNum == cs; });
// Too many packets to process, flush buffers and start again!
qDebug(logUdp()) << "Too many missing packets, flushing buffer: " << c->rxSeqBuf.lastKey() << "missing=" << c->rxSeqBuf.lastKey() - c->rxSeqBuf.firstKey() - c->rxSeqBuf.size() + 1;
c->missMutex.lock();
c->rxMissing.clear();
c->rxSeqBuf.clear();
c->missMutex.unlock();
}
else {
// We have at least 1 missing packet!
qDebug(logUdp()) << "Missing Seq: size=" << c->rxSeqBuf.size() << "firstKey=" << c->rxSeqBuf.firstKey() << "lastKey=" << c->rxSeqBuf.lastKey() << "missing=" << c->rxSeqBuf.lastKey() - c->rxSeqBuf.firstKey() - c->rxSeqBuf.size() + 1;
// We are missing packets so iterate through the buffer and add the missing ones to missing packet list
for (int i = 0; i < c->rxSeqBuf.keys().length() - 1; i++) {
c->missMutex.lock();
for (quint16 j = c->rxSeqBuf.keys()[i] + 1; j < c->rxSeqBuf.keys()[i + 1]; j++) {
auto s = c->rxMissing.find(j);
if (s == c->rxMissing.end())
{
// We haven't seen this missing packet before
qInfo(logUdpServer()) << c->ipAddress.toString() << "(" << c->type << "): Adding to missing buffer (len=" << c->rxMissing.length() << "): " << j;
c->rxMissing.append(SEQBUFENTRY());
c->rxMissing.last().seqNum = j;
c->rxMissing.last().retransmitCount = 0;
c->rxMissing.last().timeSent = QTime::currentTime();
//packetsLost++;
qDebug(logUdp()) << this->metaObject()->className() << ": Adding to missing buffer (len=" << c->rxMissing.size() << "): " << j;
c->rxMissing.insert(j, 0);
if (c->rxSeqBuf.size() > 400)
{
c->rxSeqBuf.remove(0);
}
c->rxSeqBuf.insert(j, QTime::currentTime()); // Add this missing packet to the rxbuffer as we now long about it.
}
else {
if (s->retransmitCount == 4 && j != 0)
if (s.value() == 4)
{
// We have tried 4 times to request this packet, time to give up!
s = c->rxMissing.erase(s);
c->rxMutex.lock();
c->rxSeqBuf.append(j); // Final thing is to add to received buffer!
c->rxMutex.unlock();
}
}
}
}
else {
qInfo(logUdpServer()) << c->ipAddress.toString() << ": Too many missing, flushing buffers";
c->rxMutex.lock();
c->rxSeqBuf.clear();
c->rxMutex.unlock();
missingSeqs.clear();
break;
c->missMutex.unlock();
}
}
i++;
}
c->rxMutex.unlock();
c->missMutex.lock();
for (auto it = c->rxMissing.begin(); it != c->rxMissing.end(); ++it)
{
if (it->retransmitCount < 10)
if (it.value() < 10)
{
missingSeqs.append(it->seqNum & 0xff);
missingSeqs.append(it->seqNum >> 8 & 0xff);
missingSeqs.append(it->seqNum & 0xff);
missingSeqs.append(it->seqNum >> 8 & 0xff);
it->retransmitCount++;
missingSeqs.append(it.key() & 0xff);
missingSeqs.append(it.key() >> 8 & 0xff);
missingSeqs.append(it.key() & 0xff);
missingSeqs.append(it.key() >> 8 & 0xff);
it.value()++;
}
}
if (missingSeqs.length() != 0)
{
control_packet p;
@ -1437,20 +1450,21 @@ void udpServer::sendRetransmitRequest(CLIENT* c)
if (missingSeqs.length() == 4) // This is just a single missing packet so send using a control.
{
p.seq = (missingSeqs[0] & 0xff) | (quint16)(missingSeqs[1] << 8);
qInfo(logUdpServer()) << c->ipAddress.toString() << "(" << c->type << "): sending request for missing packet : " << hex << p.seq;
qDebug(logUdp()) << this->metaObject()->className() << ": sending request for missing packet : " << hex << p.seq;
udpMutex.lock();
c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port);
udpMutex.unlock();
}
else
{
qInfo(logUdpServer()) << c->ipAddress.toString() << "(" << c->type << "): sending request for multiple missing packets : " << missingSeqs.toHex();
missingSeqs.insert(0, p.packet, sizeof(p.packet));
qDebug(logUdp()) << this->metaObject()->className() << ": sending request for multiple missing packets : " << missingSeqs.toHex();
udpMutex.lock();
missingSeqs.insert(0, p.packet, sizeof(p.packet));
c->socket->writeDatagram(missingSeqs, c->ipAddress, c->port);
udpMutex.unlock();
}
}
c->missMutex.unlock();
}
@ -1486,6 +1500,19 @@ void udpServer::deleteConnection(QList<CLIENT*>* l, CLIENT* c)
connMutex.lock();
c->rxMutex.lock();
c->rxSeqBuf.clear();
c->rxMutex.unlock();
c->txMutex.lock();
c->txSeqBuf.clear();
c->txMutex.unlock();
c->missMutex.lock();
c->rxMissing.clear();
c->missMutex.unlock();
QList<CLIENT*>::iterator it = l->begin();
while (it != l->end()) {
CLIENT* client = *it;

Wyświetl plik

@ -12,6 +12,7 @@
#include <QByteArray>
#include <QList>
#include <QVector>
#include <QMap>
// Allow easy endian-ness conversions
#include <QtEndian>
@ -102,9 +103,11 @@ private:
quint16 txSampleRate;
SERVERUSER user;
QVector <SEQBUFENTRY> txSeqBuf;
QVector <quint16> rxSeqBuf;
QVector <SEQBUFENTRY> rxMissing;
QMap<quint16, QTime> rxSeqBuf;
QMap<quint16, SEQBUFENTRY> txSeqBuf;
QMap<quint16, int> rxMissing;
QMutex txMutex;
QMutex rxMutex;
QMutex missMutex;