Fixes to UDP server

merge-requests/4/head
Phil Taylor 2021-05-14 09:55:02 +01:00
rodzic 08f9b6e2da
commit 7d50e433f5
2 zmienionych plików z 161 dodań i 53 usunięć

Wyświetl plik

@ -56,13 +56,13 @@ void udpServer::init()
udpAudio = new QUdpSocket(this);
udpAudio->bind(config.audioPort);
QUdpSocket::connect(udpAudio, &QUdpSocket::readyRead, this, &udpServer::audioReceived);
}
udpServer::~udpServer()
{
qDebug(logUdpServer()) << "Closing udpServer";
connMutex.lock();
foreach(CLIENT * client, controlClients)
{
@ -153,6 +153,9 @@ udpServer::~udpServer()
udpAudio->close();
delete udpAudio;
}
connMutex.unlock();
}
@ -187,6 +190,7 @@ void udpServer::controlReceived()
if (current == Q_NULLPTR)
{
current = new CLIENT();
current->type = "Control";
current->connected = true;
current->isStreaming = false;
current->timeConnected = QDateTime::currentDateTime();
@ -217,8 +221,11 @@ void udpServer::controlReceived()
current->commonCap = 0x8010;
qDebug(logUdpServer()) << "New Control connection created from :" << current->ipAddress.toString() << ":" << QString::number(current->port);
qDebug(logUdpServer()) << current->ipAddress.toString() << ": New Control connection created";
connMutex.lock();
controlClients.append(current);
connMutex.unlock();
}
current->lastHeard = QDateTime::currentDateTime();
@ -352,7 +359,7 @@ void udpServer::controlReceived()
sendStatus(current);
current->authInnerSeq = 0x00;
sendConnectionInfo(current);
qDebug(logUdpServer()) << "rxCodec:" << current->rxCodec << " txCodec:" << current->txCodec <<
qDebug(logUdpServer()) << current->ipAddress.toString() << ": rxCodec:" << current->rxCodec << " txCodec:" << current->txCodec <<
" rxSampleRate" << current->rxSampleRate <<
" txSampleRate" << current->rxSampleRate <<
" txBufferLen" << current->txBufferLen;
@ -470,6 +477,8 @@ void udpServer::civReceived()
if (current == Q_NULLPTR)
{
current = new CLIENT();
current->type = "CIV";
current->civId = 0;
current->connected = true;
current->timeConnected = QDateTime::currentDateTime();
current->ipAddress = datagram.senderAddress();
@ -485,7 +494,7 @@ void udpServer::civReceived()
current->idleTimer = new QTimer();
connect(current->idleTimer, &QTimer::timeout, this, std::bind(&udpServer::sendControl, this, current, 0x00, (quint16)0x00));
current->idleTimer->start(100);
//current->idleTimer->start(100); // Start idleTimer after receiving iamready.
current->wdTimer = new QTimer();
connect(current->wdTimer, &QTimer::timeout, this, std::bind(&udpServer::watchdog, this, current));
@ -495,15 +504,19 @@ void udpServer::civReceived()
connect(current->retransmitTimer, &QTimer::timeout, this, std::bind(&udpServer::sendRetransmitRequest, this, current));
current->retransmitTimer->start(RETRANSMIT_PERIOD);
qDebug(logUdpServer()) << "New CIV connection created from :" << current->ipAddress.toString() << ":" << QString::number(current->port);
qDebug(logUdpServer()) << current->ipAddress.toString() << "(" << current->type << "): New connection created";
connMutex.lock();
civClients.append(current);
connMutex.unlock();
}
switch (r.length())
{
case (CONTROL_SIZE):
/* case (CONTROL_SIZE):
{
}
*/
case (PING_SIZE):
{
ping_packet_t in = (ping_packet_t)r.constData();
@ -534,20 +547,40 @@ void udpServer::civReceived()
default:
{
if (r.length() > 0x15) {
if (r.length() > 0x18) {
data_packet_t in = (data_packet_t)r.constData();
if (in->type != 0x01)
{
if (quint16(in->datalen + 0x15) == (quint16)in->len)
{
// Strip all '0xFE' command preambles first:
int lastFE = r.lastIndexOf((char)0xfe);
//qDebug(logUdpServer()) << "Got:" << r.mid(lastFE);
if (current->civId == 0 && r.length() > lastFE + 2 && (quint8)r[lastFE + 2] > (quint8)0xdf && (quint8)r[lastFE + 2] < (quint8)0xef) {
// This is (should be) the remotes CIV id.
current->civId = (quint8)r[lastFE + 2];
qDebug(logUdpServer()) << current->ipAddress.toString() << ": Detected remote CI-V:" << hex << current->civId;
}
else if (current->civId != 0 && r.length() > lastFE + 2 && (quint8)r[lastFE + 2] != current->civId)
{
current->civId = (quint8)r[lastFE + 2];
qDebug(logUdpServer()) << current->ipAddress.toString() << ": Detected different remote CI-V:" << hex << current->civId;
}
emit haveDataFromServer(r.mid(0x15));
}
else {
qDebug(logUdpServer()) << current->ipAddress.toString() << ": Datalen mismatch " << quint16(in->datalen + 0x15) << ":" << (quint16)in->len;
}
}
}
break;
//break;
}
}
commonReceived(&civClients, current, r);
if (current != Q_NULLPTR) {
udpServer::commonReceived(&civClients, current, r);
}
}
}
@ -577,6 +610,7 @@ void udpServer::audioReceived()
if (current == Q_NULLPTR)
{
current = new CLIENT();
current->type = "Audio";
current->connected = true;
current->timeConnected = QDateTime::currentDateTime();
current->ipAddress = datagram.senderAddress();
@ -598,10 +632,14 @@ void udpServer::audioReceived()
connect(current->retransmitTimer, &QTimer::timeout, this, std::bind(&udpServer::sendRetransmitRequest, this, current));
current->retransmitTimer->start(RETRANSMIT_PERIOD);
current->seqPrefix = 0;
qDebug(logUdpServer()) << "New Audio connection created from :" << current->ipAddress.toString() << ":" << QString::number(current->port);
qDebug(logUdpServer()) << current->ipAddress.toString() << "(" << current->type << "): New connection created";
connMutex.lock();
audioClients.append(current);
connMutex.unlock();
}
switch (r.length())
{
case (PING_SIZE):
@ -666,7 +704,7 @@ void udpServer::audioReceived()
}
if (current != Q_NULLPTR) {
commonReceived(&audioClients, current, r);
udpServer::commonReceived(&audioClients, current, r);
}
}
}
@ -680,7 +718,7 @@ void udpServer::commonReceived(QList<CLIENT*>* l,CLIENT* current, QByteArray r)
if (r.length() < 0x10)
{
// Invalid packet
qDebug(logUdpServer()) << current->ipAddress.toString() << ": Invalid packet received, len: " << r.length();
qDebug(logUdpServer()) << current->ipAddress.toString() << "(" << current->type << "): Invalid packet received, len: " << r.length();
return;
}
@ -690,20 +728,23 @@ void udpServer::commonReceived(QList<CLIENT*>* l,CLIENT* current, QByteArray r)
{
control_packet_t in = (control_packet_t)r.constData();
if (in->type == 0x03) {
qDebug(logUdpServer()) << current->ipAddress.toString() << ": Received 'are you there'";
qDebug(logUdpServer()) << current->ipAddress.toString() << "(" << current->type << "): Received 'are you there'";
current->remoteId = in->sentid;
sendControl(current, 0x04, in->seq);
} // This is This is "Are you ready" in response to "I am here".
else if (in->type == 0x06)
{
qDebug(logUdpServer()) << current->ipAddress.toString() << ": Received 'Are you ready'";
qDebug(logUdpServer()) << current->ipAddress.toString() << "(" << current->type << "): Received 'Are you ready'";
current->remoteId = in->sentid;
sendControl(current, 0x06, in->seq);
if (current->idleTimer != Q_NULLPTR && !current->idleTimer->isActive()) {
current->idleTimer->start(100);
}
} // This is a retransmit request
else if (in->type == 0x01)
{
// Single packet request
qDebug(logUdpServer()) << current->ipAddress.toString() << ": Received 'retransmit' request for " << in->seq;
qDebug(logUdpServer()) << current->ipAddress.toString() << "(" << current->type << "): Received 'retransmit' request for " << in->seq;
auto match = std::find_if(current->txSeqBuf.begin(), current->txSeqBuf.end(), [&cs = in->seq](SEQBUFENTRY& s) {
return s.seqNum == cs;
@ -712,10 +753,11 @@ void udpServer::commonReceived(QList<CLIENT*>* l,CLIENT* current, QByteArray r)
if (match != current->txSeqBuf.end() && match->retransmitCount < 5) {
// Found matching entry?
// Don't constantly retransmit the same packet, give-up eventually
QMutexLocker locker(&mutex);
qDebug(logUdpServer()) << this->metaObject()->className() << ": Sending retransmit of " << hex << match->seqNum;
qDebug(logUdpServer()) << current->ipAddress.toString() << "(" << current->type << "): Sending retransmit of " << hex << match->seqNum;
match->retransmitCount++;
udpMutex.lock();
current->socket->writeDatagram(match->data, current->ipAddress, current->port);
udpMutex.unlock();
} else {
// Just send an idle!
sendControl(current, 0x00, in->seq);
@ -725,7 +767,7 @@ void udpServer::commonReceived(QList<CLIENT*>* l,CLIENT* current, QByteArray r)
}
default:
{
break;
//break;
}
}
@ -740,7 +782,7 @@ void udpServer::commonReceived(QList<CLIENT*>* l,CLIENT* current, QByteArray r)
return s.seqNum == cs;
});
if (match == current->txSeqBuf.end()) {
qDebug(logUdpServer()) << current->ipAddress.toString() << ":" << current->port << ": Requested packet " << hex << in->seq << " not found";
qDebug(logUdpServer()) << current->ipAddress.toString() << "(" << current->type << "): Requested packet " << hex << in->seq << " not found";
// Just send idle packet.
sendControl(current, 0, in->seq);
}
@ -748,18 +790,22 @@ void udpServer::commonReceived(QList<CLIENT*>* l,CLIENT* current, QByteArray r)
{
// Found matching entry?
// Send "untracked" as it has already been sent once.
QMutexLocker locker(&mutex);
qDebug(logUdpServer()) << current->ipAddress.toString() << ":" << current->port << ": Sending retransmit of " << hex << match->seqNum;
qDebug(logUdpServer()) << current->ipAddress.toString() << "(" << current->type << "): Sending retransmit of " << hex << match->seqNum;
match->retransmitCount++;
udpMutex.lock();
current->socket->writeDatagram(match->data, current->ipAddress, current->port);
udpMutex.unlock();
match++;
}
}
}
else if (in->len != PING_SIZE && in->type == 0x00 && in->seq != 0x00)
else if (in->type == 0x00 && in->seq != 0x00)
{
QMutexLocker locker(&mutex);
if (current->rxSeqBuf.isEmpty())
//if (current->type == "CIV") {
// qDebug(logUdpServer()) << "Got:" << in->seq;
//}
current->rxMutex.lock();
if (current->rxSeqBuf.isEmpty())
{
current->rxSeqBuf.append(in->seq);
}
@ -768,25 +814,30 @@ void udpServer::commonReceived(QList<CLIENT*>* l,CLIENT* current, QByteArray r)
std::sort(current->rxSeqBuf.begin(), current->rxSeqBuf.end());
if (in->seq < current->rxSeqBuf.front())
{
qDebug(logUdpServer()) << current->ipAddress.toString() << ": ******* seq number may have rolled over ****** previous highest: " << hex << current->rxSeqBuf.back() << " current: " << hex << in->seq;
qDebug(logUdpServer()) << current->ipAddress.toString() << "(" << current->type << "): ******* seq number may have rolled over ****** previous highest: " << hex << current->rxSeqBuf.back() << " current: " << hex << in->seq;
// Looks like it has rolled over so clear buffer and start again.
current->rxSeqBuf.clear();
current->rxMutex.unlock(); // Must unlock the Mutex!
return;
}
if (!current->rxSeqBuf.contains(in->seq))
{
// Add incoming packet to the received buffer and if it is in the missing buffer, remove it.
current->rxSeqBuf.append(in->seq);
// Check whether this is one of our missing ones!
current->missMutex.lock();
auto s = std::find_if(current->rxMissing.begin(), current->rxMissing.end(), [&cs = in->seq](SEQBUFENTRY& s) { return s.seqNum == cs; });
if (s != current->rxMissing.end())
{
qDebug(logUdpServer()) << current->ipAddress.toString() << ": Missing SEQ has been received! " << hex << in->seq;
qDebug(logUdpServer()) << current->ipAddress.toString() << "(" << current->type << "): Missing SEQ has been received! " << hex << in->seq;
s = current->rxMissing.erase(s);
}
current->missMutex.unlock();
}
}
current->rxMutex.unlock();
}
}
@ -796,7 +847,6 @@ void udpServer::sendControl(CLIENT* c, quint8 type, quint16 seq)
{
//qDebug(logUdpServer()) << c->ipAddress.toString() << ": Sending control packet: " << type;
QMutexLocker locker(&mutex);
control_packet p;
memset(p.packet, 0x0, CONTROL_SIZE); // We can't be sure it is initialized with 0x00!
@ -808,19 +858,26 @@ void udpServer::sendControl(CLIENT* c, quint8 type, quint16 seq)
if (seq == 0x00)
{
p.seq = c->txSeq;
c->txMutex.lock();
c->txSeqBuf.append(SEQBUFENTRY());
c->txSeqBuf.last().seqNum = seq;
c->txSeqBuf.last().timeSent = QTime::currentTime();
c->txSeqBuf.last().retransmitCount = 0;
c->txSeqBuf.last().data = QByteArray::fromRawData((const char*)p.packet, sizeof(p));
udpMutex.lock();
c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port);
udpMutex.unlock();
c->txSeq++;
if (c->idleTimer != Q_NULLPTR)
c->idleTimer->start(100);
//if (c->idleTimer != Q_NULLPTR) {
// c->idleTimer->start(100);
//}
c->txMutex.unlock();
}
else {
p.seq = seq;
udpMutex.lock();
c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port);
udpMutex.unlock();
}
return;
@ -830,13 +887,12 @@ void udpServer::sendControl(CLIENT* c, quint8 type, quint16 seq)
void udpServer::sendPing(QList<CLIENT*> *l,CLIENT* c, quint16 seq, bool reply)
{
QMutexLocker locker(&mutex);
// Also use to detect "stale" connections
QDateTime now = QDateTime::currentDateTime();
if (c->lastHeard.secsTo(now) > STALE_CONNECTION)
{
qDebug(logUdpServer()) << "Deleting stale connection " << c->ipAddress.toString();
qDebug(logUdpServer()) << c->ipAddress.toString() << "(" << c->type << "): Deleting stale connection ";
deleteConnection(l, c);
return;
}
@ -867,14 +923,16 @@ void udpServer::sendPing(QList<CLIENT*> *l,CLIENT* c, quint16 seq, bool reply)
p.time = pingTime;
p.reply = (char)reply;
udpMutex.lock();
c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port);
udpMutex.unlock();
return;
}
void udpServer::sendLoginResponse(CLIENT* c, bool allowed)
{
qDebug(logUdpServer()) << c->ipAddress.toString() << ": Sending Login response: " << c->txSeq;
qDebug(logUdpServer()) << c->ipAddress.toString() << "(" << c->type << "): Sending Login response: " << c->txSeq;
login_response_packet p;
memset(p.packet, 0x0, sizeof(p)); // We can't be sure it is initialized with 0x00!
@ -904,24 +962,27 @@ void udpServer::sendLoginResponse(CLIENT* c, bool allowed)
strcpy(p.connection,"WFVIEW");
}
QMutexLocker locker(&mutex);
c->txMutex.lock();
c->txSeqBuf.append(SEQBUFENTRY());
c->txSeqBuf.last().seqNum = c->txSeq;
c->txSeqBuf.last().timeSent = QTime::currentTime();
c->txSeqBuf.last().retransmitCount = 0;
c->txSeqBuf.last().data = QByteArray::fromRawData((const char*)p.packet, sizeof(p));
udpMutex.lock();
c->socket->writeDatagram(c->txSeqBuf.last().data, c->ipAddress, c->port);
udpMutex.unlock();
c->txSeq++;
if (c->idleTimer != Q_NULLPTR)
c->idleTimer->start(100);
c->txMutex.unlock();
return;
}
void udpServer::sendCapabilities(CLIENT* c)
{
qDebug(logUdpServer()) << c->ipAddress.toString() << ": Sending Capabilities :" << c->txSeq;
qDebug(logUdpServer()) << c->ipAddress.toString() << "(" << c->type << "): Sending Capabilities :" << c->txSeq;
capabilities_packet p;
memset(p.packet, 0x0, sizeof(p)); // We can't be sure it is initialized with 0x00!
@ -1016,19 +1077,23 @@ void udpServer::sendCapabilities(CLIENT* c)
p.capf = 0x5001;
p.capg = 0x0190;
QMutexLocker locker(&mutex);
c->txMutex.lock();
c->txSeqBuf.append(SEQBUFENTRY());
c->txSeqBuf.last().seqNum = p.seq;
c->txSeqBuf.last().timeSent = QTime::currentTime();
c->txSeqBuf.last().retransmitCount = 0;
c->txSeqBuf.last().data = QByteArray::fromRawData((const char*)p.packet, sizeof(p));
udpMutex.lock();
c->socket->writeDatagram(c->txSeqBuf.last().data, c->ipAddress, c->port);
udpMutex.unlock();
if (c->idleTimer != Q_NULLPTR)
c->idleTimer->start(100);
c->txSeq++;
c->txMutex.unlock();
return;
}
@ -1036,7 +1101,7 @@ void udpServer::sendCapabilities(CLIENT* c)
// Also used to display currently connected used information.
void udpServer::sendConnectionInfo(CLIENT* c)
{
qDebug(logUdpServer()) << c->ipAddress.toString() << ": Sending ConnectionInfo :" << c->txSeq;
qDebug(logUdpServer()) << c->ipAddress.toString() << "(" << c->type << "): Sending ConnectionInfo :" << c->txSeq;
conninfo_packet p;
memset(p.packet, 0x0, sizeof(p));
p.len = sizeof(p);
@ -1066,25 +1131,28 @@ void udpServer::sendConnectionInfo(CLIENT* c)
p.identb = c->identb;
}
QMutexLocker locker(&mutex);
c->txMutex.lock();
c->txSeqBuf.append(SEQBUFENTRY());
c->txSeqBuf.last().seqNum = p.seq;
c->txSeqBuf.last().timeSent = QTime::currentTime();
c->txSeqBuf.last().retransmitCount = 0;
c->txSeqBuf.last().data = QByteArray::fromRawData((const char*)p.packet, sizeof(p));
udpMutex.lock();
c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port);
udpMutex.unlock();
if (c->idleTimer != Q_NULLPTR)
c->idleTimer->start(100);
c->txSeq++;
c->txMutex.unlock();
return;
}
void udpServer::sendTokenResponse(CLIENT* c, quint8 type)
{
qDebug(logUdpServer()) << c->ipAddress.toString() << ": Sending Token response for type: " << type;
qDebug(logUdpServer()) << c->ipAddress.toString() << "(" << c->type << "): Sending Token response for type: " << type;
token_packet p;
memset(p.packet, 0x0, sizeof(p)); // We can't be sure it is initialized with 0x00!
@ -1102,19 +1170,22 @@ void udpServer::sendTokenResponse(CLIENT* c, quint8 type)
p.commoncap = c->commonCap;
p.res = type;
QMutexLocker locker(&mutex);
c->txMutex.lock();
c->txSeqBuf.append(SEQBUFENTRY());
c->txSeqBuf.last().seqNum = p.seq;
c->txSeqBuf.last().timeSent = QTime::currentTime();
c->txSeqBuf.last().retransmitCount = 0;
c->txSeqBuf.last().data = QByteArray::fromRawData((const char*)p.packet, sizeof(p));
udpMutex.lock();
c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port);
udpMutex.unlock();
if (c->idleTimer != Q_NULLPTR)
c->idleTimer->start(100);
c->txSeq++;
c->txMutex.unlock();
return;
}
@ -1122,22 +1193,25 @@ void udpServer::sendTokenResponse(CLIENT* c, quint8 type)
void udpServer::watchdog(CLIENT* c)
{
c->txMutex.lock();
//qDebug(logUdpServer()) << c->ipAddress.toString() << ":" << c->port << ":Buffers tx:"<< c->txSeqBuf.length() << " rx:" << c->rxSeqBuf.length();
QMutexLocker locker(&mutex);
// Erase old entries from the tx packet buffer. Keep the first 100 sent packets as we seem to get asked for these?
if (!c->txSeqBuf.isEmpty())
{
c->txSeqBuf.erase(std::remove_if(c->txSeqBuf.begin(), c->txSeqBuf.end(), [](const SEQBUFENTRY& v)
{ return v.timeSent.secsTo(QTime::currentTime()) > PURGE_SECONDS; }), c->txSeqBuf.end());
}
c->txMutex.unlock();
// Erase old entries from the missing packets buffer
c->missMutex.lock();
if (!c->rxMissing.isEmpty()) {
c->rxMissing.erase(std::remove_if(c->rxMissing.begin(), c->rxMissing.end(), [](const SEQBUFENTRY& v)
{ return v.timeSent.secsTo(QTime::currentTime()) > PURGE_SECONDS; }), c->rxMissing.end());
}
c->missMutex.unlock();
c->rxMutex.lock();
if (!c->rxSeqBuf.isEmpty()) {
std::sort(c->rxSeqBuf.begin(), c->rxSeqBuf.end());
@ -1146,12 +1220,13 @@ void udpServer::watchdog(CLIENT* c)
c->rxSeqBuf.remove(0, 200);
}
}
c->rxMutex.unlock();
}
void udpServer::sendStatus(CLIENT* c)
{
qDebug(logUdpServer()) << c->ipAddress.toString() << ": Sending Status";
qDebug(logUdpServer()) << c->ipAddress.toString() << "(" << c->type << "): Sending Status";
status_packet p;
memset(p.packet, 0x0, sizeof(p)); // We can't be sure it is initialized with 0x00!
@ -1178,14 +1253,17 @@ void udpServer::sendStatus(CLIENT* c)
c->txSeq++;
QMutexLocker locker(&mutex);
c->txMutex.lock();
c->txSeqBuf.append(SEQBUFENTRY());
c->txSeqBuf.last().seqNum = p.seq;
c->txSeqBuf.last().timeSent = QTime::currentTime();
c->txSeqBuf.last().retransmitCount = 0;
c->txSeqBuf.last().data = QByteArray::fromRawData((const char*)p.packet, sizeof(p));
c->txMutex.unlock();
udpMutex.lock();
c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port);
udpMutex.unlock();
}
@ -1196,7 +1274,8 @@ void udpServer::dataForServer(QByteArray d)
//qDebug(logUdpServer()) << "Server got:" << d;
foreach(CLIENT * client, civClients)
{
if (client != Q_NULLPTR && client->connected) {
int lastFE = d.lastIndexOf((quint8)0xfe);
if (client != Q_NULLPTR && client->connected && d.length() > lastFE+2 && ((quint8)d[lastFE + 1] == client->civId || (quint8)d[lastFE + 2] == client->civId)) {
data_packet p;
memset(p.packet, 0x0, sizeof(p)); // We can't be sure it is initialized with 0x00!
p.len = (quint16)d.length() + sizeof(p);
@ -1208,15 +1287,18 @@ void udpServer::dataForServer(QByteArray d)
p.sendseq = client->innerSeq;
QByteArray t = QByteArray::fromRawData((const char*)p.packet, sizeof(p));
t.append(d);
QMutexLocker locker(&mutex);
client->txMutex.lock();
client->txSeqBuf.append(SEQBUFENTRY());
client->txSeqBuf.last().seqNum = p.seq;
client->txSeqBuf.last().timeSent = QTime::currentTime();
client->txSeqBuf.last().retransmitCount = 0;
client->txSeqBuf.last().data = t;
client->txMutex.unlock();
//qDebug(logUdpServer()) << "Sending:" << d;
udpMutex.lock();
client->socket->writeDatagram(t, client->ipAddress, client->port);
udpMutex.unlock();
client->txSeq++;
client->innerSeq++;
}
@ -1260,13 +1342,17 @@ void udpServer::receiveAudioData(const audioPacket &d)
p.seq = client->txSeq;
QByteArray t = QByteArray::fromRawData((const char*)p.packet, sizeof(p));
t.append(d.datain);
QMutexLocker locker(&mutex);
client->txMutex.lock();
client->txSeqBuf.append(SEQBUFENTRY());
client->txSeqBuf.last().seqNum = p.seq;
client->txSeqBuf.last().timeSent = QTime::currentTime();
client->txSeqBuf.last().retransmitCount = 0;
client->txSeqBuf.last().data = t;
client->txMutex.unlock();
udpMutex.lock();
client->socket->writeDatagram(t, client->ipAddress, client->port);
udpMutex.unlock();
client->txSeq++;
client->sendAudioSeq++;
@ -1284,7 +1370,7 @@ void udpServer::receiveAudioData(const audioPacket &d)
void udpServer::sendRetransmitRequest(CLIENT *c)
{
QMutexLocker locker(&mutex);
c->missMutex.lock();
QByteArray missingSeqs;
@ -1297,12 +1383,12 @@ void udpServer::sendRetransmitRequest(CLIENT *c)
{
for (quint16 j = *i + 1; j < *(i + 1); j++)
{
qDebug(logUdpServer()) << this->metaObject()->className() << ": Found missing seq between " << *i << " : " << *(i + 1) << " (" << j << ")";
qDebug(logUdpServer()) << c->ipAddress.toString() << "(" << c->type << "): Found missing seq between " << *i << " : " << *(i + 1) << " (" << j << ")";
auto s = std::find_if(c->rxMissing.begin(), c->rxMissing.end(), [&cs = j](SEQBUFENTRY& s) { return s.seqNum == cs; });
if (s == c->rxMissing.end())
{
// We haven't seen this missing packet before
qDebug(logUdpServer()) << this->metaObject()->className() << ": Adding to missing buffer (len="<< c->rxMissing.length() << "): " << j;
qDebug(logUdpServer()) << c->ipAddress.toString() << "(" << c->type << "): Adding to missing buffer (len="<< c->rxMissing.length() << "): " << j;
c->rxMissing.append(SEQBUFENTRY());
c->rxMissing.last().seqNum = j;
c->rxMissing.last().retransmitCount = 0;
@ -1314,7 +1400,9 @@ void udpServer::sendRetransmitRequest(CLIENT *c)
{
// We have tried 4 times to request this packet, time to give up!
s = c->rxMissing.erase(s);
c->rxMutex.lock();
c->rxSeqBuf.append(j); // Final thing is to add to received buffer!
c->rxMutex.unlock();
}
}
@ -1322,7 +1410,9 @@ void udpServer::sendRetransmitRequest(CLIENT *c)
}
else {
qDebug(logUdpServer()) << c->ipAddress.toString() << ": Too many missing, flushing buffers";
c->rxMutex.lock();
c->rxSeqBuf.clear();
c->rxMutex.unlock();
missingSeqs.clear();
break;
}
@ -1353,16 +1443,22 @@ void udpServer::sendRetransmitRequest(CLIENT *c)
if (missingSeqs.length() == 4) // This is just a single missing packet so send using a control.
{
p.seq = (missingSeqs[0] & 0xff) | (quint16)(missingSeqs[1] << 8);
qDebug(logUdpServer()) << c->ipAddress.toString() << ": sending request for missing packet : " << hex << p.seq;
qDebug(logUdpServer()) << c->ipAddress.toString() << "(" << c->type << "): sending request for missing packet : " << hex << p.seq;
udpMutex.lock();
c->socket->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), c->ipAddress, c->port);
udpMutex.unlock();
}
else
{
qDebug(logUdpServer()) << c->ipAddress.toString() << ": sending request for multiple missing packets : " << missingSeqs.toHex();
qDebug(logUdpServer()) << c->ipAddress.toString() << "(" << c->type << "): sending request for multiple missing packets : " << missingSeqs.toHex();
missingSeqs.insert(0, p.packet, sizeof(p.packet));
udpMutex.lock();
c->socket->writeDatagram(missingSeqs, c->ipAddress, c->port);
udpMutex.unlock();
}
}
c->missMutex.unlock();
}
@ -1374,6 +1470,8 @@ void udpServer::sendRetransmitRequest(CLIENT *c)
/// <param name="c"></param>
void udpServer::deleteConnection(QList<CLIENT*> *l, CLIENT* c)
{
connMutex.lock();
qDebug(logUdpServer()) << "Deleting connection to: " << c->ipAddress.toString() << ":" << QString::number(c->port);
if (c->idleTimer != Q_NULLPTR) {
c->idleTimer->stop();
@ -1430,4 +1528,6 @@ void udpServer::deleteConnection(QList<CLIENT*> *l, CLIENT* c)
}
}
connMutex.unlock();
}

Wyświetl plik

@ -58,6 +58,7 @@ private:
struct CLIENT {
bool connected = false;
QString type;
QHostAddress ipAddress;
quint16 port;
QByteArray clientName;
@ -102,7 +103,13 @@ private:
QVector <SEQBUFENTRY> txSeqBuf;
QVector <quint16> rxSeqBuf;
QVector <SEQBUFENTRY> rxMissing;
QMutex txMutex;
QMutex rxMutex;
QMutex missMutex;
quint16 seqPrefix;
quint8 civId;
};
void controlReceived();
@ -136,7 +143,8 @@ private:
quint8 rigciv = 0xa2;
QMutex mutex; // Used for critical operations.
QMutex udpMutex; // Used for critical operations.
QMutex connMutex;
QList <CLIENT*> controlClients = QList<CLIENT*>();
QList <CLIENT*> civClients = QList<CLIENT*>();