From 2112c97b1ed3ee99bb155ce9d92d128f30ebbd03 Mon Sep 17 00:00:00 2001 From: Phil Taylor Date: Tue, 1 Jun 2021 23:32:39 +0100 Subject: [PATCH] Change udpserver packet handling to be similar to udphandler --- udpserver.cpp | 311 +++++++++++++++++++++++++++----------------------- udpserver.h | 9 +- 2 files changed, 175 insertions(+), 145 deletions(-) diff --git a/udpserver.cpp b/udpserver.cpp index dd2abcc..b80a72b 100644 --- a/udpserver.cpp +++ b/udpserver.cpp @@ -748,10 +748,7 @@ void udpServer::commonReceived(QList* l, CLIENT* current, QByteArray r) { // Single packet request qInfo(logUdpServer()) << current->ipAddress.toString() << "(" << current->type << "): Received 'retransmit' request for " << in->seq; - - auto match = std::find_if(current->txSeqBuf.begin(), current->txSeqBuf.end(), [&cs = in->seq](SEQBUFENTRY& s) { - return s.seqNum == cs; - }); + QMap::iterator match = current->txSeqBuf.find(in->seq); if (match != current->txSeqBuf.end() && match->retransmitCount < 5) { // Found matching entry? @@ -811,28 +808,39 @@ void udpServer::commonReceived(QList* l, CLIENT* current, QByteArray r) current->rxMutex.lock(); if (current->rxSeqBuf.isEmpty()) { - current->rxSeqBuf.append(in->seq); + if (current->rxSeqBuf.size() > 400) + { + current->rxSeqBuf.remove(0); + } + current->rxSeqBuf.insert(in->seq, QTime::currentTime()); } else { - std::sort(current->rxSeqBuf.begin(), current->rxSeqBuf.end()); - if (in->seq < current->rxSeqBuf.front()) + + if (in->seq < current->rxSeqBuf.firstKey()) { - qInfo(logUdpServer()) << current->ipAddress.toString() << "(" << current->type << "): ******* seq number may have rolled over ****** previous highest: " << hex << current->rxSeqBuf.back() << " current: " << hex << in->seq; + qInfo(logUdpServer()) << current->ipAddress.toString() << "(" << current->type << "): ******* seq number may have rolled over ****** previous highest: " << hex << current->rxSeqBuf.lastKey() << " current: " << hex << in->seq; // Looks like it has rolled over so clear buffer and start again. current->rxSeqBuf.clear(); current->rxMutex.unlock(); // Must unlock the Mutex! + current->missMutex.lock(); + current->rxMissing.clear(); + current->missMutex.unlock(); return; } if (!current->rxSeqBuf.contains(in->seq)) { // Add incoming packet to the received buffer and if it is in the missing buffer, remove it. - - current->rxSeqBuf.append(in->seq); + if (current->rxSeqBuf.size() > 400) + { + current->rxSeqBuf.remove(0); + } + current->rxSeqBuf.insert(in->seq, QTime::currentTime()); + } else{ // Check whether this is one of our missing ones! current->missMutex.lock(); - auto s = std::find_if(current->rxMissing.begin(), current->rxMissing.end(), [&cs = in->seq](SEQBUFENTRY& s) { return s.seqNum == cs; }); + QMap::iterator s = current->rxMissing.find(in->seq); if (s != current->rxMissing.end()) { qInfo(logUdpServer()) << current->ipAddress.toString() << "(" << current->type << "): Missing SEQ has been received! " << hex << in->seq; @@ -862,12 +870,14 @@ void udpServer::sendControl(CLIENT* c, quint8 type, quint16 seq) if (seq == 0x00) { p.seq = c->txSeq; + SEQBUFENTRY s; + s.seqNum = seq; + s.timeSent = QTime::currentTime(); + s.retransmitCount = 0; + s.data = QByteArray::fromRawData((const char*)p.packet, sizeof(p)); c->txMutex.lock(); - c->txSeqBuf.append(SEQBUFENTRY()); - c->txSeqBuf.last().seqNum = seq; - c->txSeqBuf.last().timeSent = QTime::currentTime(); - c->txSeqBuf.last().retransmitCount = 0; - c->txSeqBuf.last().data = QByteArray::fromRawData((const char*)p.packet, sizeof(p)); + c->txSeqBuf.insert(seq, s); + c->txMutex.unlock(); udpMutex.lock(); c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port); udpMutex.unlock(); @@ -875,7 +885,6 @@ void udpServer::sendControl(CLIENT* c, quint8 type, quint16 seq) //if (c->idleTimer != Q_NULLPTR) { // c->idleTimer->start(100); //} - c->txMutex.unlock(); } else { p.seq = seq; @@ -966,21 +975,22 @@ void udpServer::sendLoginResponse(CLIENT* c, bool allowed) strcpy(p.connection, "WFVIEW"); } + SEQBUFENTRY s; + s.seqNum = c->txSeq; + s.timeSent = QTime::currentTime(); + s.retransmitCount = 0; + s.data = QByteArray::fromRawData((const char*)p.packet, sizeof(p)); c->txMutex.lock(); - c->txSeqBuf.append(SEQBUFENTRY()); - c->txSeqBuf.last().seqNum = c->txSeq; - c->txSeqBuf.last().timeSent = QTime::currentTime(); - c->txSeqBuf.last().retransmitCount = 0; - c->txSeqBuf.last().data = QByteArray::fromRawData((const char*)p.packet, sizeof(p)); - + c->txSeqBuf.insert(c->txSeq, s); + c->txMutex.unlock(); udpMutex.lock(); - c->socket->writeDatagram(c->txSeqBuf.last().data, c->ipAddress, c->port); + c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port); udpMutex.unlock(); + c->txSeq++; if (c->idleTimer != Q_NULLPTR) c->idleTimer->start(100); - c->txMutex.unlock(); return; } @@ -1067,16 +1077,22 @@ void udpServer::sendCapabilities(CLIENT* c) p.capf = 0x5001; p.capg = 0x0190; + + + SEQBUFENTRY s; + s.seqNum = p.seq; + s.timeSent = QTime::currentTime(); + s.retransmitCount = 0; + s.data = QByteArray::fromRawData((const char*)p.packet, sizeof(p)); c->txMutex.lock(); - c->txSeqBuf.append(SEQBUFENTRY()); - c->txSeqBuf.last().seqNum = p.seq; - c->txSeqBuf.last().timeSent = QTime::currentTime(); - c->txSeqBuf.last().retransmitCount = 0; - c->txSeqBuf.last().data = QByteArray::fromRawData((const char*)p.packet, sizeof(p)); - - + if (c->txSeqBuf.size() > 400) + { + c->txSeqBuf.remove(0); + } + c->txSeqBuf.insert(p.seq, s); + c->txMutex.unlock(); udpMutex.lock(); - c->socket->writeDatagram(c->txSeqBuf.last().data, c->ipAddress, c->port); + c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port); udpMutex.unlock(); if (c->idleTimer != Q_NULLPTR) @@ -1121,13 +1137,19 @@ void udpServer::sendConnectionInfo(CLIENT* c) p.identb = c->identb; } - c->txMutex.lock(); - c->txSeqBuf.append(SEQBUFENTRY()); - c->txSeqBuf.last().seqNum = p.seq; - c->txSeqBuf.last().timeSent = QTime::currentTime(); - c->txSeqBuf.last().retransmitCount = 0; - c->txSeqBuf.last().data = QByteArray::fromRawData((const char*)p.packet, sizeof(p)); + SEQBUFENTRY s; + s.seqNum = p.seq; + s.timeSent = QTime::currentTime(); + s.retransmitCount = 0; + s.data = QByteArray::fromRawData((const char*)p.packet, sizeof(p)); + c->txMutex.lock(); + if (c->txSeqBuf.size() > 400) + { + c->txSeqBuf.remove(0); + } + c->txSeqBuf.insert(p.seq, s); + c->txMutex.unlock(); udpMutex.lock(); c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port); udpMutex.unlock(); @@ -1160,17 +1182,24 @@ void udpServer::sendTokenResponse(CLIENT* c, quint8 type) p.commoncap = c->commonCap; p.res = type; - c->txMutex.lock(); - c->txSeqBuf.append(SEQBUFENTRY()); - c->txSeqBuf.last().seqNum = p.seq; - c->txSeqBuf.last().timeSent = QTime::currentTime(); - c->txSeqBuf.last().retransmitCount = 0; - c->txSeqBuf.last().data = QByteArray::fromRawData((const char*)p.packet, sizeof(p)); + SEQBUFENTRY s; + s.seqNum = p.seq; + s.timeSent = QTime::currentTime(); + s.retransmitCount = 0; + s.data = QByteArray::fromRawData((const char*)p.packet, sizeof(p)); + c->txMutex.lock(); + if (c->txSeqBuf.size() > 400) + { + c->txSeqBuf.remove(0); + } + c->txSeqBuf.insert(p.seq, s); + c->txMutex.unlock(); udpMutex.lock(); c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port); udpMutex.unlock(); + if (c->idleTimer != Q_NULLPTR) c->idleTimer->start(100); @@ -1183,35 +1212,9 @@ void udpServer::sendTokenResponse(CLIENT* c, quint8 type) void udpServer::watchdog(CLIENT* c) { - if (c->txMutex.tryLock()) { - //qInfo(logUdpServer()) << c->ipAddress.toString() << ":" << c->port << ":Buffers tx:"<< c->txSeqBuf.length() << " rx:" << c->rxSeqBuf.length(); - // Erase old entries from the tx packet buffer. Keep the first 100 sent packets as we seem to get asked for these? - if (!c->txSeqBuf.isEmpty()) - { - c->txSeqBuf.erase(std::remove_if(c->txSeqBuf.begin(), c->txSeqBuf.end(), [](const SEQBUFENTRY& v) - { return v.timeSent.secsTo(QTime::currentTime()) > PURGE_SECONDS; }), c->txSeqBuf.end()); - } - c->txMutex.unlock(); - } - // Erase old entries from the missing packets buffer - if (c->missMutex.tryLock()) { - if (!c->rxMissing.isEmpty()) { - c->rxMissing.erase(std::remove_if(c->rxMissing.begin(), c->rxMissing.end(), [](const SEQBUFENTRY& v) - { return v.timeSent.secsTo(QTime::currentTime()) > PURGE_SECONDS; }), c->rxMissing.end()); - } - c->missMutex.unlock(); - } - if (c->rxMutex.tryLock()) { - if (!c->rxSeqBuf.isEmpty()) { - std::sort(c->rxSeqBuf.begin(), c->rxSeqBuf.end()); + Q_UNUSED(c); + // Do something! - if (c->rxSeqBuf.length() > 400) - { - c->rxSeqBuf.remove(0, 200); - } - } - c->rxMutex.unlock(); - } } void udpServer::sendStatus(CLIENT* c) @@ -1242,20 +1245,23 @@ void udpServer::sendStatus(CLIENT* c) // Send this to reject the request to tx/rx audio/civ //memcpy(p + 0x30, QByteArrayLiteral("\xff\xff\xff\xfe").constData(), 4); - - c->txSeq++; + SEQBUFENTRY s; + s.seqNum = p.seq; + s.timeSent = QTime::currentTime(); + s.retransmitCount = 0; + s.data = QByteArray::fromRawData((const char*)p.packet, sizeof(p)); c->txMutex.lock(); - c->txSeqBuf.append(SEQBUFENTRY()); - c->txSeqBuf.last().seqNum = p.seq; - c->txSeqBuf.last().timeSent = QTime::currentTime(); - c->txSeqBuf.last().retransmitCount = 0; - c->txSeqBuf.last().data = QByteArray::fromRawData((const char*)p.packet, sizeof(p)); + if (c->txSeqBuf.size() > 400) + { + c->txSeqBuf.remove(0); + } + c->txSeqBuf.insert(p.seq, s); c->txMutex.unlock(); - udpMutex.lock(); c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port); udpMutex.unlock(); + c->txSeq++; } @@ -1278,15 +1284,19 @@ void udpServer::dataForServer(QByteArray d) p.sendseq = client->innerSeq; QByteArray t = QByteArray::fromRawData((const char*)p.packet, sizeof(p)); t.append(d); - client->txMutex.lock(); - client->txSeqBuf.append(SEQBUFENTRY()); - client->txSeqBuf.last().seqNum = p.seq; - client->txSeqBuf.last().timeSent = QTime::currentTime(); - client->txSeqBuf.last().retransmitCount = 0; - client->txSeqBuf.last().data = t; - client->txMutex.unlock(); - //qInfo(logUdpServer()) << "Sending:" << d; + SEQBUFENTRY s; + s.seqNum = p.seq; + s.timeSent = QTime::currentTime(); + s.retransmitCount = 0; + s.data = t; + client->txMutex.lock(); + if (client->txSeqBuf.size() > 400) + { + client->txSeqBuf.remove(0); + } + client->txSeqBuf.insert(p.seq, s); + client->txMutex.unlock(); udpMutex.lock(); client->socket->writeDatagram(t, client->ipAddress, client->port); udpMutex.unlock(); @@ -1333,21 +1343,22 @@ void udpServer::receiveAudioData(const audioPacket& d) p.seq = client->txSeq; QByteArray t = QByteArray::fromRawData((const char*)p.packet, sizeof(p)); t.append(d.data); - client->txMutex.lock(); - client->txSeqBuf.append(SEQBUFENTRY()); - client->txSeqBuf.last().seqNum = p.seq; - client->txSeqBuf.last().timeSent = QTime::currentTime(); - client->txSeqBuf.last().retransmitCount = 0; - client->txSeqBuf.last().data = t; - client->txMutex.unlock(); - if (udpMutex.tryLock()) { - client->socket->writeDatagram(t, client->ipAddress, client->port); - udpMutex.unlock(); - } - else { - qDebug(logUdpServer()) << "Failed to lock udpMutex()"; + SEQBUFENTRY s; + s.seqNum = p.seq; + s.timeSent = QTime::currentTime(); + s.retransmitCount = 0; + s.data = t; + client->txMutex.lock(); + if (client->txSeqBuf.size() > 400) + { + client->txSeqBuf.remove(0); } + client->txSeqBuf.insert(p.seq, s); + client->txMutex.unlock(); + udpMutex.lock(); + client->socket->writeDatagram(t, client->ipAddress, client->port); + udpMutex.unlock(); client->txSeq++; client->sendAudioSeq++; } @@ -1363,69 +1374,71 @@ void udpServer::receiveAudioData(const audioPacket& d) /// void udpServer::sendRetransmitRequest(CLIENT* c) { - - c->missMutex.lock(); + // Find all gaps in received packets and then send requests for them. + // This will run every 100ms so out-of-sequence packets will not trigger a retransmit request. QByteArray missingSeqs; - auto i = std::adjacent_find(c->rxSeqBuf.begin(), c->rxSeqBuf.end(), [](quint16 l, quint16 r) {return l + 1 < r; }); - while (i != c->rxSeqBuf.end()) + c->rxMutex.lock(); + + if (!c->rxSeqBuf.empty() && c->rxSeqBuf.size() <= c->rxSeqBuf.lastKey() - c->rxSeqBuf.firstKey()) { - if (i + 1 != c->rxSeqBuf.end()) + if ((c->rxSeqBuf.lastKey() - c->rxSeqBuf.firstKey() - c->rxSeqBuf.size()) > 20) { - if (*(i + 1) - *i < 30) - { - for (quint16 j = *i + 1; j < *(i + 1); j++) - { - qInfo(logUdpServer()) << c->ipAddress.toString() << "(" << c->type << "): Found missing seq between " << *i << " : " << *(i + 1) << " (" << j << ")"; - auto s = std::find_if(c->rxMissing.begin(), c->rxMissing.end(), [&cs = j](SEQBUFENTRY& s) { return s.seqNum == cs; }); + // Too many packets to process, flush buffers and start again! + qDebug(logUdp()) << "Too many missing packets, flushing buffer: " << c->rxSeqBuf.lastKey() << "missing=" << c->rxSeqBuf.lastKey() - c->rxSeqBuf.firstKey() - c->rxSeqBuf.size() + 1; + c->missMutex.lock(); + c->rxMissing.clear(); + c->rxSeqBuf.clear(); + c->missMutex.unlock(); + } + else { + // We have at least 1 missing packet! + qDebug(logUdp()) << "Missing Seq: size=" << c->rxSeqBuf.size() << "firstKey=" << c->rxSeqBuf.firstKey() << "lastKey=" << c->rxSeqBuf.lastKey() << "missing=" << c->rxSeqBuf.lastKey() - c->rxSeqBuf.firstKey() - c->rxSeqBuf.size() + 1; + // We are missing packets so iterate through the buffer and add the missing ones to missing packet list + for (int i = 0; i < c->rxSeqBuf.keys().length() - 1; i++) { + c->missMutex.lock(); + for (quint16 j = c->rxSeqBuf.keys()[i] + 1; j < c->rxSeqBuf.keys()[i + 1]; j++) { + auto s = c->rxMissing.find(j); if (s == c->rxMissing.end()) { // We haven't seen this missing packet before - qInfo(logUdpServer()) << c->ipAddress.toString() << "(" << c->type << "): Adding to missing buffer (len=" << c->rxMissing.length() << "): " << j; - c->rxMissing.append(SEQBUFENTRY()); - c->rxMissing.last().seqNum = j; - c->rxMissing.last().retransmitCount = 0; - c->rxMissing.last().timeSent = QTime::currentTime(); - //packetsLost++; + qDebug(logUdp()) << this->metaObject()->className() << ": Adding to missing buffer (len=" << c->rxMissing.size() << "): " << j; + c->rxMissing.insert(j, 0); + if (c->rxSeqBuf.size() > 400) + { + c->rxSeqBuf.remove(0); + } + c->rxSeqBuf.insert(j, QTime::currentTime()); // Add this missing packet to the rxbuffer as we now long about it. } else { - if (s->retransmitCount == 4 && j != 0) + if (s.value() == 4) { // We have tried 4 times to request this packet, time to give up! s = c->rxMissing.erase(s); - c->rxMutex.lock(); - c->rxSeqBuf.append(j); // Final thing is to add to received buffer! - c->rxMutex.unlock(); } } } - } - else { - qInfo(logUdpServer()) << c->ipAddress.toString() << ": Too many missing, flushing buffers"; - c->rxMutex.lock(); - c->rxSeqBuf.clear(); - c->rxMutex.unlock(); - missingSeqs.clear(); - break; + c->missMutex.unlock(); } } - i++; } + c->rxMutex.unlock(); - + c->missMutex.lock(); for (auto it = c->rxMissing.begin(); it != c->rxMissing.end(); ++it) { - if (it->retransmitCount < 10) + if (it.value() < 10) { - missingSeqs.append(it->seqNum & 0xff); - missingSeqs.append(it->seqNum >> 8 & 0xff); - missingSeqs.append(it->seqNum & 0xff); - missingSeqs.append(it->seqNum >> 8 & 0xff); - it->retransmitCount++; + missingSeqs.append(it.key() & 0xff); + missingSeqs.append(it.key() >> 8 & 0xff); + missingSeqs.append(it.key() & 0xff); + missingSeqs.append(it.key() >> 8 & 0xff); + it.value()++; } } + if (missingSeqs.length() != 0) { control_packet p; @@ -1437,20 +1450,21 @@ void udpServer::sendRetransmitRequest(CLIENT* c) if (missingSeqs.length() == 4) // This is just a single missing packet so send using a control. { p.seq = (missingSeqs[0] & 0xff) | (quint16)(missingSeqs[1] << 8); - qInfo(logUdpServer()) << c->ipAddress.toString() << "(" << c->type << "): sending request for missing packet : " << hex << p.seq; + qDebug(logUdp()) << this->metaObject()->className() << ": sending request for missing packet : " << hex << p.seq; udpMutex.lock(); c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port); udpMutex.unlock(); } else { - qInfo(logUdpServer()) << c->ipAddress.toString() << "(" << c->type << "): sending request for multiple missing packets : " << missingSeqs.toHex(); - missingSeqs.insert(0, p.packet, sizeof(p.packet)); + qDebug(logUdp()) << this->metaObject()->className() << ": sending request for multiple missing packets : " << missingSeqs.toHex(); udpMutex.lock(); + missingSeqs.insert(0, p.packet, sizeof(p.packet)); c->socket->writeDatagram(missingSeqs, c->ipAddress, c->port); udpMutex.unlock(); } } + c->missMutex.unlock(); } @@ -1486,6 +1500,19 @@ void udpServer::deleteConnection(QList* l, CLIENT* c) connMutex.lock(); + c->rxMutex.lock(); + c->rxSeqBuf.clear(); + c->rxMutex.unlock(); + + c->txMutex.lock(); + c->txSeqBuf.clear(); + c->txMutex.unlock(); + + c->missMutex.lock(); + c->rxMissing.clear(); + c->missMutex.unlock(); + + QList::iterator it = l->begin(); while (it != l->end()) { CLIENT* client = *it; diff --git a/udpserver.h b/udpserver.h index 25083d8..78916c1 100644 --- a/udpserver.h +++ b/udpserver.h @@ -12,6 +12,7 @@ #include #include #include +#include // Allow easy endian-ness conversions #include @@ -102,9 +103,11 @@ private: quint16 txSampleRate; SERVERUSER user; - QVector txSeqBuf; - QVector rxSeqBuf; - QVector rxMissing; + + QMap rxSeqBuf; + QMap txSeqBuf; + QMap rxMissing; + QMutex txMutex; QMutex rxMutex; QMutex missMutex;