kopia lustrzana https://gitlab.com/eliggett/wfview
Many changes to mitigate network instability
rodzic
30b9c72208
commit
6f6145f726
|
@ -307,7 +307,7 @@ qint64 audioHandler::readData(char* data, qint64 maxlen)
|
|||
qint64 audioHandler::writeData(const char* data, qint64 len)
|
||||
{
|
||||
|
||||
// QMutexLocker locker(&mutex);
|
||||
QMutexLocker locker(&mutex);
|
||||
if (buffer.length() > bufferSize * 4)
|
||||
{
|
||||
qWarning() << "writeData() Buffer overflow";
|
||||
|
@ -359,9 +359,9 @@ void audioHandler::stateChanged(QAudio::State state)
|
|||
{
|
||||
if (state == QAudio::IdleState && audioOutput->error() == QAudio::UnderrunError) {
|
||||
qDebug(logAudio()) << this->metaObject()->className() << "RX:Buffer underrun";
|
||||
//if (buffer.length() < bufferSize) {
|
||||
// audioOutput->suspend();
|
||||
//}
|
||||
QMutexLocker locker(&mutex);
|
||||
audioOutput->suspend();
|
||||
buffer.clear();
|
||||
}
|
||||
//qDebug(logAudio()) << this->metaObject()->className() << ": state = " << state;
|
||||
}
|
||||
|
@ -372,7 +372,7 @@ void audioHandler::incomingAudio(const QByteArray& data)
|
|||
{
|
||||
//qDebug(logAudio()) << "Got " << data.length() << " samples";
|
||||
if (audioOutput != Q_NULLPTR && audioOutput->state() != QAudio::StoppedState) {
|
||||
//QMutexLocker locker(&mutex);
|
||||
QMutexLocker locker(&mutex);
|
||||
buffer.append(data);
|
||||
|
||||
if (audioOutput->state() == QAudio::SuspendedState) {
|
||||
|
|
539
udphandler.cpp
539
udphandler.cpp
|
@ -154,24 +154,24 @@ void udpHandler::dataReceived()
|
|||
latency += lastPingSentTime.msecsTo(QDateTime::currentDateTime());
|
||||
latency /= 2;
|
||||
quint32 totalsent = packetsSent;
|
||||
quint32 totallost = packetsLost / 2;
|
||||
quint32 totallost = packetsLost;
|
||||
if (audio != Q_NULLPTR) {
|
||||
totalsent = totalsent + audio->packetsSent;
|
||||
totallost = totallost + audio->packetsLost / 2;
|
||||
totallost = totallost + audio->packetsLost;
|
||||
}
|
||||
if (civ != Q_NULLPTR) {
|
||||
totalsent = totalsent + civ->packetsSent;
|
||||
totallost = totallost + civ->packetsLost / 2;
|
||||
totallost = totallost + civ->packetsLost;
|
||||
}
|
||||
//double perclost = 1.0 * totallost / totalsent * 100.0 ;
|
||||
emit haveNetworkStatus(" rtt: " + QString::number(latency) + " ms, loss: (" + QString::number(packetsLost) + "/" + QString::number(packetsSent) + ")");
|
||||
|
||||
emit haveNetworkStatus(" rtt: " + QString::number(latency) + " ms, loss: (" + QString::number(totallost) + "/" + QString::number(totalsent) + ")");
|
||||
}
|
||||
break;
|
||||
}
|
||||
case (TOKEN_SIZE): // Response to Token request
|
||||
{
|
||||
token_packet_t in = (token_packet_t)r.constData();
|
||||
if (in->res == 0x05)
|
||||
if (in->res == 0x05 && in->type != 0x01)
|
||||
{
|
||||
if (in->response == 0x0000)
|
||||
{
|
||||
|
@ -203,25 +203,27 @@ void udpHandler::dataReceived()
|
|||
case (STATUS_SIZE): // Status packet
|
||||
{
|
||||
status_packet_t in = (status_packet_t)r.constData();
|
||||
if (in->error == 0x00ffffff && !streamOpened)
|
||||
{
|
||||
emit haveNetworkError(radioIP.toString(), "Auth failed, try rebooting the radio.");
|
||||
qDebug(logUdp()) << this->metaObject()->className() << ": Auth failed, try rebooting the radio.";
|
||||
}
|
||||
else if (in->error == 0x00000000 && in->disc == 0x01)
|
||||
{
|
||||
emit haveNetworkError(radioIP.toString(), "Got radio disconnected.");
|
||||
qDebug(logUdp()) << this->metaObject()->className() << ": Got radio disconnected.";
|
||||
if (streamOpened) {
|
||||
// Close stream connections but keep connection open to the radio.
|
||||
if (audio != Q_NULLPTR) {
|
||||
delete audio;
|
||||
}
|
||||
if (in->type != 0x01) {
|
||||
if (in->error == 0x00ffffff && !streamOpened)
|
||||
{
|
||||
emit haveNetworkError(radioIP.toString(), "Auth failed, try rebooting the radio.");
|
||||
qDebug(logUdp()) << this->metaObject()->className() << ": Auth failed, try rebooting the radio.";
|
||||
}
|
||||
else if (in->error == 0x00000000 && in->disc == 0x01)
|
||||
{
|
||||
emit haveNetworkError(radioIP.toString(), "Got radio disconnected.");
|
||||
qDebug(logUdp()) << this->metaObject()->className() << ": Got radio disconnected.";
|
||||
if (streamOpened) {
|
||||
// Close stream connections but keep connection open to the radio.
|
||||
if (audio != Q_NULLPTR) {
|
||||
delete audio;
|
||||
}
|
||||
|
||||
if (civ != Q_NULLPTR) {
|
||||
delete civ;
|
||||
if (civ != Q_NULLPTR) {
|
||||
delete civ;
|
||||
}
|
||||
streamOpened = false;
|
||||
}
|
||||
streamOpened = false;
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
@ -229,93 +231,98 @@ void udpHandler::dataReceived()
|
|||
case(LOGIN_RESPONSE_SIZE): // Response to Login packet.
|
||||
{
|
||||
login_response_packet_t in = (login_response_packet_t)r.constData();
|
||||
if (in->error == 0xfeffffff)
|
||||
{
|
||||
emit haveNetworkStatus("Invalid Username/Password");
|
||||
qDebug(logUdp()) << this->metaObject()->className() << ": Invalid Username/Password";
|
||||
}
|
||||
else if (!isAuthenticated)
|
||||
{
|
||||
|
||||
if (in->tokrequest == tokRequest)
|
||||
if (in->type != 0x01) {
|
||||
if (in->error == 0xfeffffff)
|
||||
{
|
||||
emit haveNetworkStatus("Radio Login OK!");
|
||||
qDebug(logUdp()) << this->metaObject()->className() << ": Received matching token response to our request";
|
||||
token = in->token;
|
||||
sendToken(0x02);
|
||||
tokenTimer->start(TOKEN_RENEWAL); // Start token request timer
|
||||
isAuthenticated = true;
|
||||
emit haveNetworkStatus("Invalid Username/Password");
|
||||
qDebug(logUdp()) << this->metaObject()->className() << ": Invalid Username/Password";
|
||||
}
|
||||
else
|
||||
else if (!isAuthenticated)
|
||||
{
|
||||
qDebug(logUdp()) << this->metaObject()->className() << ": Token response did not match, sent:" << tokRequest << " got " << in->tokrequest;
|
||||
|
||||
if (in->tokrequest == tokRequest)
|
||||
{
|
||||
emit haveNetworkStatus("Radio Login OK!");
|
||||
qDebug(logUdp()) << this->metaObject()->className() << ": Received matching token response to our request";
|
||||
token = in->token;
|
||||
sendToken(0x02);
|
||||
tokenTimer->start(TOKEN_RENEWAL); // Start token request timer
|
||||
isAuthenticated = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
qDebug(logUdp()) << this->metaObject()->className() << ": Token response did not match, sent:" << tokRequest << " got " << in->tokrequest;
|
||||
}
|
||||
}
|
||||
|
||||
if (!strcmp(in->connection, "FTTH"))
|
||||
{
|
||||
highBandwidthConnection = true;
|
||||
}
|
||||
|
||||
qDebug(logUdp()) << this->metaObject()->className() << ": Detected connection speed " << in->connection;
|
||||
}
|
||||
|
||||
if (!strcmp(in->connection, "FTTH"))
|
||||
{
|
||||
highBandwidthConnection = true;
|
||||
}
|
||||
|
||||
qDebug(logUdp()) << this->metaObject()->className() << ": Detected connection speed " << in->connection;
|
||||
break;
|
||||
}
|
||||
case (CONNINFO_SIZE):
|
||||
{
|
||||
conninfo_packet_t in = (conninfo_packet_t)r.constData();
|
||||
|
||||
devName = in->name;
|
||||
QHostAddress ip = QHostAddress(qToBigEndian(in->ipaddress));
|
||||
if (!streamOpened && in->busy)
|
||||
{
|
||||
if (in->ipaddress != 0x00 && strcmp(in->computer,compName.toLocal8Bit()))
|
||||
if (in->type != 0x01) {
|
||||
devName = in->name;
|
||||
QHostAddress ip = QHostAddress(qToBigEndian(in->ipaddress));
|
||||
if (!streamOpened && in->busy)
|
||||
{
|
||||
emit haveNetworkStatus(devName + " in use by: " + in->computer + " (" + ip.toString() + ")");
|
||||
sendControl(false, 0x00, in->seq); // Respond with an idle
|
||||
if (in->ipaddress != 0x00 && strcmp(in->computer, compName.toLocal8Bit()))
|
||||
{
|
||||
emit haveNetworkStatus(devName + " in use by: " + in->computer + " (" + ip.toString() + ")");
|
||||
sendControl(false, 0x00, in->seq); // Respond with an idle
|
||||
}
|
||||
else {
|
||||
civ = new udpCivData(localIP, radioIP, civPort);
|
||||
audio = new udpAudio(localIP, radioIP, audioPort, rxBufferSize, rxSampleRate, rxCodec, txSampleRate, txCodec);
|
||||
|
||||
QObject::connect(civ, SIGNAL(receive(QByteArray)), this, SLOT(receiveFromCivStream(QByteArray)));
|
||||
QObject::connect(this, SIGNAL(haveChangeBufferSize(quint16)), audio, SLOT(changeBufferSize(quint16)));
|
||||
|
||||
|
||||
streamOpened = true;
|
||||
|
||||
emit haveNetworkStatus(devName);
|
||||
|
||||
qDebug(logUdp()) << this->metaObject()->className() << "Got serial and audio request success, device name: " << devName;
|
||||
|
||||
// Stuff can change in the meantime because of a previous login...
|
||||
remoteId = in->sentid;
|
||||
myId = in->rcvdid;
|
||||
tokRequest = in->tokrequest;
|
||||
token = in->token;
|
||||
}
|
||||
}
|
||||
else {
|
||||
civ = new udpCivData(localIP, radioIP, civPort);
|
||||
audio = new udpAudio(localIP, radioIP, audioPort, rxBufferSize, rxSampleRate, rxCodec, txSampleRate, txCodec);
|
||||
else if (!streamOpened && !in->busy)
|
||||
{
|
||||
emit haveNetworkStatus(devName + " available");
|
||||
|
||||
QObject::connect(civ, SIGNAL(receive(QByteArray)), this, SLOT(receiveFromCivStream(QByteArray)));
|
||||
QObject::connect(this, SIGNAL(haveChangeBufferSize(quint16)), audio, SLOT(changeBufferSize(quint16)));
|
||||
|
||||
identa = in->identa;
|
||||
identb = in->identb;
|
||||
|
||||
streamOpened = true;
|
||||
|
||||
emit haveNetworkStatus(devName);
|
||||
|
||||
qDebug(logUdp()) << this->metaObject()->className() << "Got serial and audio request success, device name: " << devName;
|
||||
|
||||
// Stuff can change in the meantime because of a previous login...
|
||||
remoteId = in->sentid;
|
||||
myId = in->rcvdid;
|
||||
tokRequest = in->tokrequest;
|
||||
token = in->token;
|
||||
sendRequestStream();
|
||||
}
|
||||
}
|
||||
else if (!streamOpened && !in->busy)
|
||||
{
|
||||
emit haveNetworkStatus(devName + " available");
|
||||
|
||||
identa = in->identa;
|
||||
identb = in->identb;
|
||||
|
||||
sendRequestStream();
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case (CAPABILITIES_SIZE):
|
||||
{
|
||||
capabilities_packet_t in = (capabilities_packet_t)r.constData();
|
||||
audioType = in->audio;
|
||||
devName = in->name;
|
||||
//replyId = r.mid(0x42, 16);
|
||||
qDebug(logUdp()) << this->metaObject()->className() << "Received radio capabilities, Name:" <<
|
||||
devName << " Audio:" <<
|
||||
audioType;
|
||||
|
||||
if (in->type != 0x01)
|
||||
{
|
||||
audioType = in->audio;
|
||||
devName = in->name;
|
||||
//replyId = r.mid(0x42, 16);
|
||||
qDebug(logUdp()) << this->metaObject()->className() << "Received radio capabilities, Name:" <<
|
||||
devName << " Audio:" <<
|
||||
audioType;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -426,7 +433,8 @@ void udpHandler::sendToken(uint8_t magic)
|
|||
|
||||
authInnerSendSeq++;
|
||||
sendTrackedPacket(QByteArray::fromRawData((const char *)p.packet, sizeof(p)));
|
||||
tokenTimer->start(100); // Set 100ms timer for retry (this will be cancelled if a response is received)
|
||||
// The radio should request a repeat of the token renewal!
|
||||
//tokenTimer->start(100); // Set 100ms timer for retry (this will be cancelled if a response is received)
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -533,7 +541,8 @@ void udpCivData::dataReceived()
|
|||
}
|
||||
default:
|
||||
{
|
||||
if (r.length() > 21) {
|
||||
control_packet_t in = (control_packet_t)r.constData();
|
||||
if (in->type != 0x01 && r.length() > 21) {
|
||||
// Process this packet, any re-transmit requests will happen later.
|
||||
//uint16_t gotSeq = qFromLittleEndian<quint16>(r.mid(6, 2));
|
||||
|
||||
|
@ -704,29 +713,33 @@ void udpAudio::dataReceived()
|
|||
|
||||
switch (r.length())
|
||||
{
|
||||
case (16): // Response to control packet handled in udpBase
|
||||
break;
|
||||
|
||||
default:
|
||||
{
|
||||
/* Audio packets start as follows:
|
||||
PCM 16bit and PCM8/uLAW stereo: 0x44,0x02 for first packet and 0x6c,0x05 for second.
|
||||
uLAW 8bit/PCM 8bit 0xd8,0x03 for all packets
|
||||
PCM 16bit stereo 0x6c,0x05 first & second 0x70,0x04 third
|
||||
|
||||
|
||||
*/
|
||||
if (r.mid(0, 2) == QByteArrayLiteral("\x6c\x05") ||
|
||||
r.mid(0, 2) == QByteArrayLiteral("\x44\x02") ||
|
||||
r.mid(0, 2) == QByteArrayLiteral("\xd8\x03") ||
|
||||
r.mid(0, 2) == QByteArrayLiteral("\x70\x04"))
|
||||
case (16): // Response to control packet handled in udpBase
|
||||
{
|
||||
// First check if we are missing any packets as seq should be sequential.
|
||||
|
||||
rxaudio->incomingAudio(r.mid(24));
|
||||
break;
|
||||
}
|
||||
default:
|
||||
{
|
||||
/* Audio packets start as follows:
|
||||
PCM 16bit and PCM8/uLAW stereo: 0x44,0x02 for first packet and 0x6c,0x05 for second.
|
||||
uLAW 8bit/PCM 8bit 0xd8,0x03 for all packets
|
||||
PCM 16bit stereo 0x6c,0x05 first & second 0x70,0x04 third
|
||||
|
||||
|
||||
*/
|
||||
control_packet_t in = (control_packet_t)r.constData();
|
||||
if (in->type != 0x01) {
|
||||
if (r.mid(0, 2) == QByteArrayLiteral("\x6c\x05") ||
|
||||
r.mid(0, 2) == QByteArrayLiteral("\x44\x02") ||
|
||||
r.mid(0, 2) == QByteArrayLiteral("\xd8\x03") ||
|
||||
r.mid(0, 2) == QByteArrayLiteral("\x70\x04"))
|
||||
{
|
||||
// First check if we are missing any packets as seq should be sequential.
|
||||
|
||||
rxaudio->incomingAudio(r.mid(24));
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
udpBase::dataReceived(r); // Call parent function to process the rest.
|
||||
|
@ -785,22 +798,9 @@ void udpBase::dataReceived(QByteArray r)
|
|||
case (CONTROL_SIZE): // Empty response used for simple comms and retransmit requests.
|
||||
{
|
||||
control_packet_t in = (control_packet_t)r.constData();
|
||||
|
||||
if (in->type == 0x04) {
|
||||
qDebug(logUdp()) << this->metaObject()->className() << ": Received I am here";
|
||||
areYouThereCounter = 0;
|
||||
// I don't think that we will ever receive an "I am here" other than in response to "Are you there?"
|
||||
remoteId = in->sentid;
|
||||
sendControl(false, 0x06, 0x01); // Send Are you ready - untracked.
|
||||
}
|
||||
else if (in->type == 0x06)
|
||||
if (in->type == 0x01)
|
||||
{
|
||||
// Just get the seqnum and ignore the rest.
|
||||
}
|
||||
else if (in->type == 0x01) // retransmit request
|
||||
{
|
||||
// retransmit request
|
||||
// Send an idle with the requested seqnum if not found.
|
||||
// Single packet request
|
||||
packetsLost++;
|
||||
|
||||
auto match = std::find_if(txSeqBuf.begin(), txSeqBuf.end(), [&cs = in->seq](SEQBUFENTRY& s) {
|
||||
|
@ -811,24 +811,23 @@ void udpBase::dataReceived(QByteArray r)
|
|||
// Found matching entry?
|
||||
// Send "untracked" as it has already been sent once.
|
||||
// Don't constantly retransmit the same packet, give-up eventually
|
||||
if (match->retransmitCount < 4) {
|
||||
QMutexLocker locker(&mutex);
|
||||
qDebug(logUdp()) << this->metaObject()->className() << ": Sending retransmit of " << match->seqNum;
|
||||
match->retransmitCount++;
|
||||
udp->writeDatagram(match->data, radioIP, port);
|
||||
}
|
||||
//else {
|
||||
// qDebug(logUdp()) << this->metaObject()->className() << ": Sending idle as retransmit count exceeded " << match->seqNum;
|
||||
// sendControl(false, 0, in->seq);
|
||||
//}
|
||||
break;
|
||||
}
|
||||
else {
|
||||
// Packet was not found in buffer
|
||||
qDebug(logUdp()) << this->metaObject()->className() << ": Could not find requested packet " << in->seq << ", sending idle.";
|
||||
sendControl(false, 0, in->seq);
|
||||
QMutexLocker locker(&mutex);
|
||||
qDebug(logUdp()) << this->metaObject()->className() << ": Sending retransmit of " << hex << match->seqNum;
|
||||
match->retransmitCount++;
|
||||
udp->writeDatagram(match->data, radioIP, port);
|
||||
}
|
||||
}
|
||||
if (in->type == 0x04) {
|
||||
qDebug(logUdp()) << this->metaObject()->className() << ": Received I am here";
|
||||
areYouThereCounter = 0;
|
||||
// I don't think that we will ever receive an "I am here" other than in response to "Are you there?"
|
||||
remoteId = in->sentid;
|
||||
sendControl(false, 0x06, 0x01); // Send Are you ready - untracked.
|
||||
}
|
||||
else if (in->type == 0x06)
|
||||
{
|
||||
// Just get the seqnum and ignore the rest.
|
||||
}
|
||||
break;
|
||||
}
|
||||
case (PING_SIZE): // ping packet
|
||||
|
@ -859,55 +858,16 @@ void udpBase::dataReceived(QByteArray r)
|
|||
}
|
||||
else {
|
||||
// Not sure what to do here, need to spend more time with the protocol but will try sending ping with same seq next time.
|
||||
qDebug(logUdp()) << "Received out-of-sequence ping response. Sent:" << pingSendSeq << " received " << in->seq;
|
||||
//qDebug(logUdp()) << this->metaObject()->className() << "Received out-of-sequence ping response. Sent:" << pingSendSeq << " received " << in->seq;
|
||||
}
|
||||
}
|
||||
else {
|
||||
qDebug(logUdp()) << "Unhandled response to ping. I have never seen this! 0x10=" << r[16];
|
||||
qDebug(logUdp()) << this->metaObject()->className() << "Unhandled response to ping. I have never seen this! 0x10=" << r[16];
|
||||
}
|
||||
|
||||
}
|
||||
break;
|
||||
}
|
||||
case (RETRANSMIT_RANGE_SIZE):
|
||||
{
|
||||
retransmit_range_packet_t in = (retransmit_range_packet_t)r.constData();
|
||||
|
||||
if (in->type==0x01)
|
||||
{ // retransmit range request
|
||||
qDebug(logUdp()) << this->metaObject()->className() << ": Retransmit range request for:" << in->first << ", " << in->second << ", " << in->third << ", " << in->fourth;
|
||||
|
||||
auto match = std::find_if(txSeqBuf.begin(), txSeqBuf.end(), [&ca = in->first, &cb = in->second, &cc = in->third, &cd = in->fourth](SEQBUFENTRY& s) {
|
||||
return s.seqNum == ca || s.seqNum == cb || s.seqNum == cc || s.seqNum == cd;
|
||||
});
|
||||
|
||||
if (match == txSeqBuf.end()) {
|
||||
qDebug(logUdp()) << this->metaObject()->className() << ": Could not find requested packet " << in->seq << ", sending idle.";
|
||||
sendControl(false, 0, in->seq);
|
||||
}
|
||||
else {
|
||||
while (match != txSeqBuf.end()) {
|
||||
// Found matching entry?
|
||||
// Send "untracked" as it has already been sent once.
|
||||
if (match->retransmitCount <4 && (match->seqNum == in->first || match->seqNum == in->second || match->seqNum == in->third || match->seqNum == in->fourth)) {
|
||||
qDebug(logUdp()) << this->metaObject()->className() << ": Sending retransmit (range) of " << match->seqNum;
|
||||
match->retransmitCount++;
|
||||
QMutexLocker locker(&mutex);
|
||||
udp->writeDatagram(match->data, radioIP, port);
|
||||
udp->writeDatagram(match->data, radioIP, port);
|
||||
}
|
||||
//else if (match->retransmitCount == 4)
|
||||
//{
|
||||
// // Just send idle packet.
|
||||
// sendControl(false, 0, match->seqNum);
|
||||
//}
|
||||
match++;
|
||||
packetsLost++;
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
{
|
||||
|
||||
|
@ -923,76 +883,141 @@ void udpBase::dataReceived(QByteArray r)
|
|||
|
||||
// All packets except ping and retransmit requests should trigger this.
|
||||
control_packet_t in = (control_packet_t)r.constData();
|
||||
|
||||
// This is a variable length retransmit request!
|
||||
if (in->type == 0x01 && in->len != 0x10)
|
||||
{
|
||||
for (quint16 i = 0x10; i < r.length(); i = i + 2)
|
||||
{
|
||||
quint16 seq = (quint8)r[i] | (quint8)r[i + 1] << 8;
|
||||
auto match = std::find_if(txSeqBuf.begin(), txSeqBuf.end(), [&cs = seq](SEQBUFENTRY& s) {
|
||||
return s.seqNum == cs;
|
||||
});
|
||||
if (match == txSeqBuf.end()) {
|
||||
qDebug(logUdp()) << this->metaObject()->className() << ": Requested packet " << hex << seq << " not found";
|
||||
// Just send idle packet.
|
||||
sendControl(false, 0, match->seqNum);
|
||||
}
|
||||
else {
|
||||
// Found matching entry?
|
||||
// Send "untracked" as it has already been sent once.
|
||||
qDebug(logUdp()) << this->metaObject()->className() << ": Sending retransmit (range) of " << hex << match->seqNum;
|
||||
match->retransmitCount++;
|
||||
QMutexLocker locker(&mutex);
|
||||
udp->writeDatagram(match->data, radioIP, port);
|
||||
match++;
|
||||
packetsLost++;
|
||||
}
|
||||
}
|
||||
} else
|
||||
if (in->len != PING_SIZE && in->type == 0x00 && in->seq != 0x00)
|
||||
{
|
||||
|
||||
if (in->seq < lastReceivedSeq)
|
||||
{
|
||||
qDebug(logUdp()) << this->metaObject()->className() << ": ******* seq number may have rolled over ****** previous highest: " << rxSeqBuf.back() << " current: " << in->seq;
|
||||
|
||||
// Looks like it has rolled over so clear buffer and start again.
|
||||
rxSeqBuf.clear();
|
||||
if (rxSeqBuf.isEmpty()) {
|
||||
rxSeqBuf.append(in->seq);
|
||||
lastReceivedSeq = in->seq;
|
||||
return;
|
||||
}
|
||||
|
||||
if (!rxSeqBuf.contains(in->seq))
|
||||
}
|
||||
else
|
||||
{
|
||||
rxSeqBuf.append(in->seq);
|
||||
}
|
||||
qSort(rxSeqBuf);
|
||||
if (in->seq < rxSeqBuf.front())
|
||||
{
|
||||
qDebug(logUdp()) << this->metaObject()->className() << ": ******* seq number may have rolled over ****** previous highest: " << hex << rxSeqBuf.back() << " current: " << hex << in->seq;
|
||||
|
||||
// Looks like it has rolled over so clear buffer and start again.
|
||||
rxSeqBuf.clear();
|
||||
}
|
||||
|
||||
if (!rxSeqBuf.contains(in->seq))
|
||||
{
|
||||
// Add incoming packet to the received buffer and if it is in the mising buffer, remove it.
|
||||
rxSeqBuf.append(in->seq);
|
||||
// Check whether this is one of our missing ones!
|
||||
auto s = std::find_if(rxMissing.begin(), rxMissing.end(), [&cs = in->seq](SEQBUFENTRY& s) { return s.seqNum == cs; });
|
||||
if (s != rxMissing.end())
|
||||
{
|
||||
qDebug(logUdp()) << this->metaObject()->className() << ": Missing SEQ has been received! " << hex << in->seq;
|
||||
s = rxMissing.erase(s);
|
||||
}
|
||||
qSort(rxSeqBuf); // Re-sort the buffer
|
||||
}
|
||||
|
||||
// Find all gaps in received packets
|
||||
|
||||
QByteArray missingSeqs;
|
||||
|
||||
if (!rxSeqBuf.isEmpty())
|
||||
{
|
||||
std::sort(rxSeqBuf.begin(), rxSeqBuf.end());
|
||||
// Find all gaps in received packets (in reverse order)
|
||||
quint16 first=0, second=0, third=0, fourth=0, count=0;
|
||||
auto i = std::adjacent_find(rxSeqBuf.begin(), rxSeqBuf.end(), [](quint16 l, quint16 r) {return l + 1 < r; });
|
||||
while (i != rxSeqBuf.end())
|
||||
{
|
||||
if (count == 0) {
|
||||
first = *i;
|
||||
second = *i;
|
||||
third = *i;
|
||||
fourth = *i;
|
||||
if (i + 1 != rxSeqBuf.end())
|
||||
{
|
||||
if (*(i + 1) - *i < 30)
|
||||
{
|
||||
for (quint16 j = *i + 1; j < *(i + 1); j++)
|
||||
{
|
||||
//qDebug(logUdp()) << this->metaObject()->className() << ": Found missing seq between " << *i << " : " << *(i + 1) << " (" << j << ")";
|
||||
auto s = std::find_if(rxMissing.begin(), rxMissing.end(), [&cs = j](SEQBUFENTRY& s) { return s.seqNum == cs; });
|
||||
if (s == rxMissing.end())
|
||||
{
|
||||
// We haven't seen this missing packet before
|
||||
//qDebug(logUdp()) << this->metaObject()->className() << ": Adding to missing buffer (len="<< rxMissing.length() << "): " << j;
|
||||
SEQBUFENTRY b;
|
||||
b.seqNum = j;
|
||||
b.retransmitCount = 0;
|
||||
b.timeSent = QTime::currentTime();
|
||||
rxMissing.append(b);
|
||||
packetsLost++;
|
||||
}
|
||||
else {
|
||||
if (s->retransmitCount == 10)
|
||||
{
|
||||
s = rxMissing.erase(s);
|
||||
rxSeqBuf.append(j); // Final thing is to add to received buffer!
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
qDebug(logUdp()) << this->metaObject()->className() << ": Too many missing, flushing buffers";
|
||||
rxSeqBuf.clear();
|
||||
missingSeqs.clear();
|
||||
break;
|
||||
}
|
||||
}
|
||||
else if (count == 1) {
|
||||
second = *i;
|
||||
third = *i;
|
||||
fourth = *i;
|
||||
}
|
||||
else if (count == 2) {
|
||||
third = *i;
|
||||
fourth = *i;
|
||||
}
|
||||
else if (count == 3) {
|
||||
fourth = *i;
|
||||
}
|
||||
else {
|
||||
break;
|
||||
}
|
||||
count++;
|
||||
i++;
|
||||
i++;
|
||||
}
|
||||
|
||||
if (abs(second-first) > 100 || abs(third-second) > 100 || abs(fourth-third) > 100) // Something bad happened, clear the buffer.
|
||||
{
|
||||
qDebug(logUdp()) << this->metaObject()->className() << ": Excessive packet lost difference, clearing buffer: " << count << " packets lost!";
|
||||
|
||||
rxSeqBuf.clear();
|
||||
rxSeqBuf.append(in->seq);
|
||||
lastReceivedSeq = in->seq;
|
||||
for (auto it = rxMissing.begin(); it != rxMissing.end(); ++it)
|
||||
{
|
||||
if (it->retransmitCount < 10)
|
||||
{
|
||||
missingSeqs.append(it->seqNum & 0xff);
|
||||
missingSeqs.append(it->seqNum >> 8 & 0xff);
|
||||
it->retransmitCount++;
|
||||
}
|
||||
}
|
||||
else if (count == 1)
|
||||
{
|
||||
qDebug(logUdp()) << this->metaObject()->className() << ": Requesting retransmit of: " << first;
|
||||
rxSeqBuf.append(first);
|
||||
sendControl(false, 0x01, first);
|
||||
}
|
||||
else if(count != 0)
|
||||
{
|
||||
qDebug(logUdp()) << this->metaObject()->className() << ": Requesting retransmit of: " << first << ", " << second << ", " << third <<", " << fourth;
|
||||
sendRetransmitRange(first, second, third,fourth);
|
||||
if (missingSeqs.length() != 0)
|
||||
{
|
||||
control_packet p;
|
||||
memset(p.packet, 0x0, sizeof(p)); // We can't be sure it is initialized with 0x00!
|
||||
p.type = 0x01;
|
||||
p.seq = 0x0000;
|
||||
p.sentid = myId;
|
||||
p.rcvdid = remoteId;
|
||||
if (missingSeqs.length() == 2)
|
||||
{
|
||||
p.seq = (missingSeqs[0] &0xff) |(quint16)(missingSeqs[1] << 8) ;
|
||||
qDebug(logUdp()) << this->metaObject()->className() << ": sending request for missing " << hex << p.seq;
|
||||
QMutexLocker locker(&mutex);
|
||||
udp->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), radioIP, port);
|
||||
}
|
||||
else
|
||||
{
|
||||
qDebug(logUdp()) << this->metaObject()->className() << ": sending multi request for missing: " <<missingSeqs.toHex();
|
||||
missingSeqs.insert(0, p.packet, sizeof(p.packet));
|
||||
QMutexLocker locker(&mutex);
|
||||
udp->writeDatagram(missingSeqs, radioIP, port);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1061,9 +1086,9 @@ void udpBase::sendTrackedPacket(QByteArray d)
|
|||
d[7] = (sendSeq >> 8) & 0xff;
|
||||
SEQBUFENTRY s;
|
||||
s.seqNum = sendSeq;
|
||||
s.timeSent = time(NULL);
|
||||
s.timeSent = QTime::currentTime();
|
||||
s.retransmitCount = 0;
|
||||
s.data = (d);
|
||||
s.data = d;
|
||||
txSeqBuf.append(s);
|
||||
purgeOldEntries(); // Delete entries older than PURGE_SECONDS seconds (currently 5)
|
||||
sendSeq++;
|
||||
|
@ -1077,23 +1102,25 @@ void udpBase::sendTrackedPacket(QByteArray d)
|
|||
return;
|
||||
}
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// Once a packet has reached PURGE_SECONDS old (currently 10) then it is not likely to be any use.
|
||||
/// </summary>
|
||||
void udpBase::purgeOldEntries()
|
||||
{
|
||||
for (int f = txSeqBuf.length() - 1; f >= 0; f--)
|
||||
{
|
||||
if (difftime(time(NULL), txSeqBuf[f].timeSent) > PURGE_SECONDS)
|
||||
{
|
||||
txSeqBuf.removeAt(f);
|
||||
}
|
||||
}
|
||||
|
||||
if (rxSeqBuf.length() > 2048) {
|
||||
// Erase old entries from the tx packet buffer
|
||||
txSeqBuf.erase(std::remove_if(txSeqBuf.begin(), txSeqBuf.end(), [](const SEQBUFENTRY& v)
|
||||
{ return v.timeSent.secsTo(QTime::currentTime()) > PURGE_SECONDS; }), txSeqBuf.end());
|
||||
|
||||
// If the buffer is over 2K, remove the first 1K.
|
||||
std::sort(rxSeqBuf.begin(), rxSeqBuf.end());
|
||||
rxSeqBuf.remove(0,1024);
|
||||
lastReceivedSeq = *rxSeqBuf.begin();
|
||||
qDebug(logUdp()) << this->metaObject()->className() << ": Purged buffer of old rx packets, new buffer: " << rxSeqBuf.first() << " - " << rxSeqBuf.last();
|
||||
|
||||
// Erase old entries from the missing packets buffer
|
||||
rxMissing.erase(std::remove_if(rxMissing.begin(), rxMissing.end(), [](const SEQBUFENTRY& v)
|
||||
{ return v.timeSent.secsTo(QTime::currentTime()) > PURGE_SECONDS; }), rxMissing.end());
|
||||
|
||||
qSort(rxSeqBuf);
|
||||
if (rxSeqBuf.length() > 400)
|
||||
{
|
||||
rxSeqBuf.remove(0, 200);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
12
udphandler.h
12
udphandler.h
|
@ -24,11 +24,11 @@
|
|||
#include "audiohandler.h"
|
||||
#include "packettypes.h"
|
||||
|
||||
#define PURGE_SECONDS 5
|
||||
#define PURGE_SECONDS 10
|
||||
#define TOKEN_RENEWAL 60000
|
||||
#define PING_PERIOD 100
|
||||
#define IDLE_PERIOD 100
|
||||
#define TXAUDIO_PERIOD 5
|
||||
#define TXAUDIO_PERIOD 20
|
||||
#define AREYOUTHERE_PERIOD 500
|
||||
|
||||
|
||||
|
@ -77,15 +77,17 @@ public:
|
|||
QMutex mutex;
|
||||
|
||||
struct SEQBUFENTRY {
|
||||
time_t timeSent;
|
||||
QTime timeSent;
|
||||
uint16_t seqNum;
|
||||
QByteArray data;
|
||||
quint8 retransmitCount;
|
||||
};
|
||||
|
||||
QVector<SEQBUFENTRY> txSeqBuf; //= QVector<SEQBUFENTRY>();
|
||||
QVector<SEQBUFENTRY> txSeqBuf;
|
||||
|
||||
QVector<quint16> rxSeqBuf; // = QVector<quint16>();
|
||||
QVector<quint16> rxSeqBuf;
|
||||
|
||||
QVector<SEQBUFENTRY> rxMissing;
|
||||
|
||||
void sendTrackedPacket(QByteArray d);
|
||||
void purgeOldEntries();
|
||||
|
|
Ładowanie…
Reference in New Issue