Add audio handling to UDP server

merge-requests/2/head
Phil Taylor 2021-02-28 20:10:07 +00:00
rodzic 6748a5130c
commit aef03a6b7e
10 zmienionych plików z 657 dodań i 301 usunięć

Wyświetl plik

@ -864,11 +864,19 @@ void audioHandler::start()
}
if (isInput) {
this->open(QIODevice::WriteOnly | QIODevice::Unbuffered);
#ifdef Q_OS_WIN
this->open(QIODevice::WriteOnly | QIODevice::Unbuffered);
#else
this->open(QIODevice::WriteOnly);
#endif
audioInput->start(this);
}
else {
#ifdef Q_OS_WIN
this->open(QIODevice::ReadOnly | QIODevice::Unbuffered);
#else
this->open(QIODevice::ReadOnly);
#endif
audioOutput->start(this);
}
}
@ -921,13 +929,6 @@ qint64 audioHandler::readData(char* data, qint64 maxlen)
// We must lock the mutex for the entire time that the buffer may be modified.
QMutexLocker locker(&mutex);
// Sort the buffer by seq number. This is important and audio packets may have arrived out-of-order
std::sort(audioBuffer.begin(), audioBuffer.end(),
[](const AUDIOPACKET& a, const AUDIOPACKET& b) -> bool
{
return a.seq < b.seq;
});
// Output buffer is ALWAYS 16 bit.
int divisor = 16 / radioSampleBits;
@ -1102,6 +1103,14 @@ void audioHandler::incomingAudio(const AUDIOPACKET data)
qDebug(logAudio()) << "RX Audio Suspended, Resuming...";
audioOutput->resume();
}
// Sort the buffer by seq number. This is important and audio packets may have arrived out-of-order
std::sort(audioBuffer.begin(), audioBuffer.end(),
[](const AUDIOPACKET& a, const AUDIOPACKET& b) -> bool
{
return a.seq < b.seq;
});
}
}

Wyświetl plik

@ -19,7 +19,7 @@
// Variable size packets + payload
#define CIV_SIZE 0x15
#define TXAUDIO_SIZE 0x18
#define AUDIO_SIZE 0x18
#define DATA_SIZE 0x15
// 0x10 length control packet (connect/disconnect/idle.)
@ -94,8 +94,8 @@ typedef union openclose_packet {
} *startstop_packet_t;
// 0x18 length txaudio packet
typedef union txaudio_packet {
// 0x18 length audio packet
typedef union audio_packet {
struct
{
quint32 len; // 0x00
@ -108,8 +108,8 @@ typedef union txaudio_packet {
quint16 unused; // 0x14
quint16 datalen; // 0x16
};
char packet[TXAUDIO_SIZE];
} *txaudio_packet_t;
char packet[AUDIO_SIZE];
} *audio_packet_t;
// 0x18 length retransmit_range packet
typedef union retransmit_range_packet {
@ -181,7 +181,8 @@ typedef union status_packet {
quint16 unknown; // 0x26
char unusede; // 0x28
char unusedf[2]; // 0x29
char value[5]; // 0x2b
char identa; // 0x2b
quint32 identb; // 0x2c
quint32 error; // 0x30
char unusedg[12]; // 0x34
char disc; // 0x40
@ -299,33 +300,37 @@ typedef union conninfo_packet {
typedef union capabilities_packet {
struct
{
quint32 len; // 0x00
quint16 type; // 0x04
quint16 seq; // 0x06
quint32 sentid; // 0x08
quint32 rcvdid; // 0x0c
quint32 len; // 0x00
quint16 type; // 0x04
quint16 seq; // 0x06
quint32 sentid; // 0x08
quint32 rcvdid; // 0x0c
char unuseda[3]; // 0x10
quint16 code; // 0x13
quint16 res; // 0x15
quint16 innerseq; // 0x17
quint16 code; // 0x13
quint16 res; // 0x15
quint16 innerseq; // 0x17
char unusedc; // 0x19
quint16 tokrequest; // 0x1a
quint32 token; // 0x1c
quint16 tokrequest; // 0x1a
quint32 token; // 0x1c
char unusedd[33]; // 0x20
char capa; // 0x41
char unusede[7]; // 0x42
quint16 commoncap; // 0x49
char unusedf[2]; // 0x4b
char capc; // 0x4d
quint32 capd; // 0x4e
quint16 commoncap; // 0x49
char unused; // 0x4b
char macaddress[6]; // 0x4c
char name[32]; // 0x52
char audio[32]; // 0x72
char caparray[10]; // 0x92
char unusedh[2]; // 0x9c
quint16 cape; // 0x9e
quint16 capf; // 0xa0
quint16 conntype; // 0x92
char civ; // 0x94
quint16 lena; // 0x95
quint16 lenb; // 0x97
quint8 enablea; // 0x99
quint8 enableb; // 0x9a
quint8 enablec; // 0x9b
quint32 baudrate; // 0x9c
quint16 capf; // 0xa0
char unusedi; // 0xa2
quint16 capg; // 0xa3
quint16 capg; // 0xa3
char unusedj[3]; // 0xa5
};
char packet[CAPABILITIES_SIZE];

Wyświetl plik

@ -104,6 +104,7 @@ void rigCommander::commSetup(unsigned char rigCivAddr, QString ip, quint16 cport
connect(udp, SIGNAL(haveDataFromPort(QByteArray)), this, SLOT(handleNewData(QByteArray)));
connect(udp, SIGNAL(haveAudioData(AUDIOPACKET)), this, SLOT(receiveAudioData(AUDIOPACKET)));
// data from the program to the comm port:
connect(this, SIGNAL(dataForComm(QByteArray)), udp, SLOT(receiveDataFromUserToRig(QByteArray)));
@ -631,12 +632,17 @@ void rigCommander::setCIVAddr(unsigned char civAddr)
this->civAddr = civAddr;
}
void rigCommander::handleNewData(const QByteArray &data)
{
void rigCommander::handleNewData(const QByteArray& data)
{
emit haveDataForServer(data);
parseData(data);
}
void rigCommander::receiveAudioData(const AUDIOPACKET& data)
{
emit haveAudioData(data);
}
void rigCommander::parseData(QByteArray dataInput)
{
// TODO: Clean this up.

Wyświetl plik

@ -136,7 +136,8 @@ public slots:
void getRefAdjustFine();
void setRefAdjustCourse(unsigned char level);
void setRefAdjustFine(unsigned char level);
void handleNewData(const QByteArray &data);
void handleNewData(const QByteArray& data);
void receiveAudioData(const AUDIOPACKET& data);
void handleSerialPortError(const QString port, const QString errorText);
void handleStatusUpdate(const QString text);
void changeLatency(const quint16 value);
@ -199,6 +200,7 @@ signals:
void haveATUStatus(unsigned char status);
void haveChangeLatency(quint16 value);
void haveDataForServer(QByteArray outData);
void haveAudioData(AUDIOPACKET data);
void initUdpHandler();
private:

Wyświetl plik

@ -121,6 +121,11 @@ void udpHandler::receiveFromCivStream(QByteArray data)
emit haveDataFromPort(data);
}
void udpHandler::receiveAudioData(const AUDIOPACKET &data)
{
emit haveAudioData(data);
}
void udpHandler::receiveDataFromUserToRig(QByteArray data)
{
if (civ != Q_NULLPTR)
@ -295,6 +300,7 @@ void udpHandler::dataReceived()
audio = new udpAudio(localIP, radioIP, audioPort, rxLatency, txLatency, rxSampleRate, rxCodec, txSampleRate, txCodec);
QObject::connect(civ, SIGNAL(receive(QByteArray)), this, SLOT(receiveFromCivStream(QByteArray)));
QObject::connect(audio, SIGNAL(haveAudioData(AUDIOPACKET)), this, SLOT(receiveAudioData(AUDIOPACKET)));
QObject::connect(this, SIGNAL(haveChangeLatency(quint16)), audio, SLOT(changeLatency(quint16)));
@ -604,19 +610,21 @@ void udpCivData::dataReceived()
}
default:
{
control_packet_t in = (control_packet_t)r.constData();
if (in->type != 0x01 && r.length() > 21) {
// Process this packet, any re-transmit requests will happen later.
//uint16_t gotSeq = qFromLittleEndian<quint16>(r.mid(6, 2));
// We have received some Civ data so stop sending Start packets!
if (startCivDataTimer != Q_NULLPTR) {
startCivDataTimer->stop();
}
lastReceived = QTime::currentTime();
quint8 temp = r[0] - 0x15;
if ((quint8)r[16] == 0xc1 && (quint8)r[17] == temp)
{
emit receive(r.mid(0x15));
if (r.length() > 21) {
data_packet_t in = (data_packet_t)r.constData();
if (in->type != 0x01) {
// Process this packet, any re-transmit requests will happen later.
//uint16_t gotSeq = qFromLittleEndian<quint16>(r.mid(6, 2));
// We have received some Civ data so stop sending Start packets!
if (startCivDataTimer != Q_NULLPTR) {
startCivDataTimer->stop();
}
lastReceived = QTime::currentTime();
if (in->datalen + 0x15 == in->len)
{
emit receive(r.mid(0x15));
}
}
}
break;
@ -772,9 +780,9 @@ void udpAudio::sendTxAudio()
while (len < audio.length()) {
QByteArray partial = audio.mid(len, 1364);
txaudio_packet p;
audio_packet p;
memset(p.packet, 0x0, sizeof(p)); // We can't be sure it is initialized with 0x00!
p.len = sizeof(p)+partial.length();
p.len = sizeof(p) + partial.length();
p.sentid = myId;
p.rcvdid = remoteId;
p.ident = 0x0080; // TX audio is always this?
@ -838,7 +846,7 @@ void udpAudio::dataReceived()
tempAudio.seq = in->seq;
tempAudio.time = lastReceived;
tempAudio.sent = 0;
tempAudio.data = r.mid(24);
tempAudio.data = r.mid(0x18);
// Prefer signal/slot to forward audio as it is thread/safe
// Need to do more testing but latency appears fine.
emit haveAudioData(tempAudio);
@ -1043,6 +1051,7 @@ void udpBase::dataReceived(QByteArray r)
else if (in->len != PING_SIZE && in->type == 0x00 && in->seq != 0x00)
{
if (rxSeqBuf.isEmpty()) {
QMutexLocker locker(&mutex);
rxSeqBuf.append(in->seq);
}
else
@ -1053,6 +1062,7 @@ void udpBase::dataReceived(QByteArray r)
qDebug(logUdp()) << this->metaObject()->className() << ": ******* seq number may have rolled over ****** previous highest: " << hex << rxSeqBuf.back() << " current: " << hex << in->seq;
// Looks like it has rolled over so clear buffer and start again.
QMutexLocker locker(&mutex);
rxSeqBuf.clear();
return;
}
@ -1066,6 +1076,7 @@ void udpBase::dataReceived(QByteArray r)
if (s != rxMissing.end())
{
qDebug(logUdp()) << this->metaObject()->className() << ": Missing SEQ has been received! " << hex << in->seq;
QMutexLocker locker(&mutex);
s = rxMissing.erase(s);
}
}
@ -1102,6 +1113,7 @@ void udpBase::sendRetransmitRequest()
b.seqNum = j;
b.retransmitCount = 0;
b.timeSent = QTime::currentTime();
QMutexLocker locker(&mutex);
rxMissing.append(b);
packetsLost++;
}
@ -1109,6 +1121,7 @@ void udpBase::sendRetransmitRequest()
if (s->retransmitCount == 4)
{
// We have tried 4 times to request this packet, time to give up!
QMutexLocker locker(&mutex);
s = rxMissing.erase(s);
rxSeqBuf.append(j); // Final thing is to add to received buffer!
}
@ -1118,6 +1131,7 @@ void udpBase::sendRetransmitRequest()
}
else {
qDebug(logUdp()) << this->metaObject()->className() << ": Too many missing, flushing buffers";
QMutexLocker locker(&mutex);
rxSeqBuf.clear();
missingSeqs.clear();
break;
@ -1148,16 +1162,16 @@ void udpBase::sendRetransmitRequest()
p.rcvdid = remoteId;
if (missingSeqs.length() == 4) // This is just a single missing packet so send using a control.
{
QMutexLocker locker(&mutex);
p.seq = (missingSeqs[0] & 0xff) | (quint16)(missingSeqs[1] << 8);
qDebug(logUdp()) << this->metaObject()->className() << ": sending request for missing packet : " << hex << p.seq;
QMutexLocker locker(&mutex);
udp->writeDatagram(QByteArray::fromRawData((const char*)p.packet, sizeof(p)), radioIP, port);
}
else
{
QMutexLocker locker(&mutex);
qDebug(logUdp()) << this->metaObject()->className() << ": sending request for multiple missing packets : " << missingSeqs.toHex();
missingSeqs.insert(0, p.packet, sizeof(p.packet));
QMutexLocker locker(&mutex);
udp->writeDatagram(missingSeqs, radioIP, port);
}
}
@ -1232,11 +1246,11 @@ void udpBase::sendTrackedPacket(QByteArray d)
s.timeSent = QTime::currentTime();
s.retransmitCount = 0;
s.data = d;
QMutexLocker locker(&mutex);
txSeqBuf.append(s);
purgeOldEntries(); // Delete entries older than PURGE_SECONDS seconds (currently 5)
sendSeq++;
QMutexLocker locker(&mutex);
udp->writeDatagram(d, radioIP, port);
if (idleTimer != Q_NULLPTR && idleTimer->isActive()) {
idleTimer->start(IDLE_PERIOD); // Reset idle counter if it's running

Wyświetl plik

@ -217,11 +217,13 @@ public:
public slots:
void receiveDataFromUserToRig(QByteArray); // This slot will send data on to
void receiveFromCivStream(QByteArray);
void receiveAudioData(const AUDIOPACKET &data);
void changeLatency(quint16 value);
void init();
signals:
void haveDataFromPort(QByteArray data); // emit this when we have data, connect to rigcommander
void haveAudioData(AUDIOPACKET data); // emit this when we have data, connect to rigcommander
void haveNetworkError(QString, QString);
void haveNetworkStatus(QString);
void haveChangeLatency(quint16 value);

Plik diff jest za duży Load Diff

Wyświetl plik

@ -4,12 +4,14 @@
#include <QObject>
#include <QUdpSocket>
#include <QNetworkDatagram>
#include <QNetworkInterface>
#include <QHostInfo>
#include <QTimer>
#include <QMutex>
#include <QDateTime>
#include <QByteArray>
#include <QList>
#include <QVector>
// Allow easy endian-ness conversions
#include <QtEndian>
@ -18,10 +20,19 @@
#include <udpserversetup.h>
#include "packettypes.h"
#include "rigidentities.h"
#include "audiohandler.h"
extern void passcode(QString in,QByteArray& out);
extern QByteArray parseNullTerminatedString(QByteArray c, int s);
struct SEQBUFENTRY {
QTime timeSent;
uint16_t seqNum;
QByteArray data;
quint8 retransmitCount;
};
class udpServer : public QObject
{
Q_OBJECT
@ -33,6 +44,8 @@ public:
public slots:
void init();
void dataForServer(QByteArray);
void receiveAudioData(const AUDIOPACKET &data);
void receiveRigCaps(rigCapabilities caps);
signals:
void haveDataFromServer(QByteArray);
@ -52,15 +65,16 @@ private:
quint16 txBufferLen;
quint32 myId;
quint32 remoteId;
quint32 txSeq=0;
quint32 rxSeq;
quint32 connSeq;
quint16 txSeq=0;
quint16 rxSeq;
quint16 connSeq;
quint16 pingSeq;
quint32 rxPingTime; // 32bit as has other info
quint32 authInnerSeq;
quint16 authSeq;
quint16 innerPingSeq;
quint16 innerSeq;
quint16 sendAudioSeq;
quint32 ident;
quint16 tokenRx;
quint32 tokenTx;
quint32 commonCap;
@ -71,6 +85,7 @@ private:
QTimer* pingTimer;
QTimer* idleTimer;
QTimer* wdTimer;
QTimer* retransmitTimer;
// Only used for audio.
quint8 rxCodec;
@ -78,30 +93,36 @@ private:
quint16 rxSampleRate;
quint16 txSampleRate;
SERVERUSER user;
QVector <SEQBUFENTRY> txSeqBuf;
QVector <quint16> rxSeqBuf;
QVector <SEQBUFENTRY> rxMissing;
};
void controlReceived();
void civReceived();
void audioReceived();
void commonReceived(QList<CLIENT*>* l,CLIENT* c, QByteArray r);
void sendPing(QList<CLIENT*> *l,CLIENT* c, quint16 seq, bool reply);
void sendControl(CLIENT* c, quint8 type, quint16 seq);
void sendLoginResponse(CLIENT* c, quint16 seq, bool allowed);
void sendLoginResponse(CLIENT* c, bool allowed);
void sendCapabilities(CLIENT* c);
void sendConnectionInfo(CLIENT* c);
void sendTokenResponse(CLIENT* c,quint8 type);
void sendWatchdog(CLIENT* c);
void sendStatus(CLIENT* c);
void sendRetransmitRequest(CLIENT* c);
void watchdog(CLIENT* c);
void deleteConnection(QList<CLIENT*> *l, CLIENT* c);
SERVERCONFIG config;
QUdpSocket* udpControl = Q_NULLPTR;
QUdpSocket* udpCiv = Q_NULLPTR;
QUdpSocket* udpAudio = Q_NULLPTR;
QHostAddress localIP;
QString macAddress;
quint32 controlId = 0;
quint32 civId = 0;
@ -110,19 +131,13 @@ private:
QString rigname = "IC-9700";
quint8 rigciv = 0xa2;
struct SEQBUFENTRY {
time_t timeSent;
uint16_t seqNum;
QByteArray data;
};
QMutex mutex; // Used for critical operations.
QList <CLIENT*> controlClients = QList<CLIENT*>();
QList <CLIENT*> civClients = QList<CLIENT*>();
QList <CLIENT*> audioClients = QList<CLIENT*>();
QTime timeStarted;
rigCapabilities rigCaps;
};

Wyświetl plik

@ -30,8 +30,7 @@ wfmain::wfmain(const QString serialPortCL, const QString hostCL, QWidget *parent
connect(this, SIGNAL(sendServerConfig(SERVERCONFIG)), srv, SLOT(receiveServerConfig(SERVERCONFIG)));
connect(srv, SIGNAL(serverConfig(SERVERCONFIG, bool)), this, SLOT(serverConfigRequested(SERVERCONFIG, bool)));
haveRigCaps = false;
@ -208,6 +207,9 @@ wfmain::wfmain(const QString serialPortCL, const QString hostCL, QWidget *parent
serverThread->start();
emit initServer();
connect(this, SIGNAL(sendRigCaps(rigCapabilities)), udp, SLOT(receiveRigCaps(rigCapabilities)));
}
plot = ui->plot; // rename it waterfall.
@ -397,6 +399,7 @@ wfmain::wfmain(const QString serialPortCL, const QString hostCL, QWidget *parent
if (serverConfig.enabled && udp != Q_NULLPTR) {
// Server
connect(rig, SIGNAL(haveAudioData(AUDIOPACKET)), udp, SLOT(receiveAudioData(AUDIOPACKET)));
connect(rig, SIGNAL(haveDataForServer(QByteArray)), udp, SLOT(dataForServer(QByteArray)));
connect(udp, SIGNAL(haveDataFromServer(QByteArray)), rig, SLOT(dataFromServer(QByteArray)));
}
@ -1761,6 +1764,8 @@ void wfmain::receiveRigID(rigCapabilities rigCaps)
this->rigCaps = rigCaps;
this->spectWidth = rigCaps.spectLenMax; // used once haveRigCaps is true.
haveRigCaps = true;
// Added so that server receives rig capabilities.
emit sendRigCaps(rigCaps);
if(rigCaps.model==model7850)
{
ui->modeSelectCombo->addItem("PSK", 0x12);

Wyświetl plik

@ -113,6 +113,7 @@ signals:
void sendChangeLatency(quint16 latency);
void initServer();
void sendServerConfig(SERVERCONFIG conf);
void sendRigCaps(rigCapabilities caps);
private slots:
void shortcutF1();