Use ring buffer with rtaudio to eliminate mutexes

merge-requests/5/head
Phil Taylor 2021-05-27 11:41:08 +01:00
rodzic 2cd316bee6
commit 596f2739b9
11 zmienionych plików z 923 dodań i 112 usunięć

Wyświetl plik

@ -28,6 +28,8 @@ audioHandler::~audioHandler()
audio.stopStream();
audio.closeStream();
}
if (ringBuf != Q_NULLPTR)
delete ringBuf;
}
bool audioHandler::init(const quint8 bits, const quint8 channels, const quint16 samplerate, const quint16 latency, const bool ulaw, const bool isinput, int port, quint8 resampleQuality)
@ -46,6 +48,9 @@ bool audioHandler::init(const quint8 bits, const quint8 channels, const quint16
// chunk size is always relative to Internal Sample Rate.
this->chunkSize = (INTERNAL_SAMPLE_RATE / 25) * radioChannels;
ringBuf = new wilt::Ring<audioPacket>(100); // Should be customizable.
tempBuf.sent = 0;
if (port > 0) {
aParams.deviceId = port;
}
@ -55,7 +60,7 @@ bool audioHandler::init(const quint8 bits, const quint8 channels, const quint16
else {
aParams.deviceId = audio.getDefaultOutputDevice();
}
aParams.nChannels = channels;
aParams.nChannels = 2; // Internally this is always 2 channels
aParams.firstChannel = 0;
try {
@ -140,67 +145,70 @@ int audioHandler::readData(void* outputBuffer, void* inputBuffer, unsigned int n
{
// Calculate output length, always full samples
int sentlen = 0;
qint16* buffer = (qint16*)outputBuffer;
//qDebug(logAudio()) << "looking for: " << nFrames << this->audioBuffer.size();
quint8* buffer = (quint8*)outputBuffer;
if (status == RTAUDIO_OUTPUT_UNDERFLOW)
qDebug(logAudio()) << "Underflow detected";
unsigned int nBytes = nFrames * 2 * 2; // This is ALWAYS 2 bytes per sample and 2 channels
if (!audioBuffer.isEmpty())
if (ringBuf->size()>0)
{
mutex.lock();
// Output buffer is ALWAYS 16 bit.
auto packet = audioBuffer.begin();
while (packet != audioBuffer.end() && sentlen < nFrames/2)
while (sentlen < nBytes)
{
int timediff = packet->time.msecsTo(QTime::currentTime());
if (timediff > (int)audioLatency * 2) {
qInfo(logAudio()) << (isInput ? "Input" : "Output") << "Packet " << hex << packet->seq <<
" arrived too late (increase output latency!) " <<
dec << packet->time.msecsTo(QTime::currentTime()) << "ms";
while (packet != audioBuffer.end() && timediff > (int)audioLatency) {
timediff = packet->time.msecsTo(QTime::currentTime());
lastSeq = packet->seq;
packet = audioBuffer.erase(packet); // returns next packet
}
if (packet == audioBuffer.end()) {
break;
}
}
// If we got here then packet time must be within latency threshold
if (packet->seq == lastSeq + 1 || packet->seq <= lastSeq)
audioPacket packet;
if (!ringBuf->try_read(packet))
{
int send = qMin((int)nFrames*2 - sentlen, packet->dataout.length() - packet->sent);
lastSeq = packet->seq;
//qInfo(logAudio()) << "Packet " << hex << packet->seq << " arrived on time " << Qt::dec << packet->time.msecsTo(QTime::currentTime()) << "ms";
memcpy(buffer + sentlen, packet->dataout.constData() + packet->sent, send);
qDebug() << "No more data available but buffer is not full! sentlen:" << sentlen << " nBytes:" << nBytes ;
return 0;
}
currentLatency = packet.time.msecsTo(QTime::currentTime());
// This shouldn't be required but if we did output a partial packet
// This will add the remaining packet data to the output buffer.
if (tempBuf.sent != tempBuf.data.length())
{
int send = qMin((int)nBytes - sentlen, tempBuf.data.length() - tempBuf.sent);
memcpy(buffer + sentlen, tempBuf.data.constData() + tempBuf.sent, send);
tempBuf.sent = tempBuf.sent + send;
sentlen = sentlen + send;
qDebug(logAudio()) << "Adding partial:" << send;
}
if (send == packet->dataout.length() - packet->sent)
{
//qInfo(logAudio()) << "Get next packet";
packet = audioBuffer.erase(packet); // returns next packet
}
else
{
// Store sent amount (could be zero if audioOutput buffer full) then break.
packet->sent = send;
break;
}
while (currentLatency > (int)audioLatency) {
qInfo(logAudio()) << (isInput ? "Input" : "Output") << "Packet " << hex << packet.seq <<
" arrived too late (increase output latency!) " <<
dec << packet.time.msecsTo(QTime::currentTime()) << "ms";
lastSeq = packet.seq;
if (!ringBuf->try_read(packet))
return sentlen;
currentLatency = packet.time.msecsTo(QTime::currentTime());
}
else {
qInfo(logAudio()) << (isInput ? "Input" : "Output") << "Missing audio packet(s) from: " << hex << lastSeq + 1 << " to " << hex << packet->seq - 1;
lastSeq = packet->seq;
int send = qMin((int)nBytes - sentlen, packet.data.length());
memcpy(buffer + sentlen, packet.data.constData(), send);
sentlen = sentlen + send;
if (send < packet.data.length())
{
qDebug(logAudio()) << "Asking for partial, sent:" << send << "packet length" << packet.data.length();
tempBuf = packet;
tempBuf.sent = tempBuf.sent + send;
//lastSeq = packet.seq;
//break;
}
if (packet.seq <= lastSeq) {
qInfo(logAudio()) << (isInput ? "Input" : "Output") << "Duplicate/early audio packet: " << hex << lastSeq << " got " << hex << packet.seq;
}
else if (packet.seq != lastSeq + 1) {
qInfo(logAudio()) << (isInput ? "Input" : "Output") << "Missing audio packet(s) from: " << hex << lastSeq + 1 << " to " << hex << packet.seq - 1;
}
lastSeq = packet.seq;
}
mutex.unlock();
}
//qDebug(logAudio()) << "looking for: " << nBytes << " got: " << sentlen;
return 0;
}
@ -231,13 +239,13 @@ int audioHandler::writeData(void* outputBuffer, void* inputBuffer, unsigned int
int send = qMin((int)(len - sentlen), (int)chunkSize - current->sent);
current->datain.append(QByteArray::fromRawData(data + sentlen, send));
current->data.append(QByteArray::fromRawData(data + sentlen, send));
sentlen = sentlen + send;
current->seq = 0; // Not used in TX
current->time = QTime::currentTime();
current->sent = current->datain.length();
current->sent = current->data.length();
if (current->sent == chunkSize)
{
@ -274,64 +282,82 @@ void audioHandler::stateChanged(QAudio::State state)
void audioHandler::incomingAudio(audioPacket data)
int audioHandler::incomingAudio(audioPacket data)
{
// No point buffering audio until stream is actually running.
if (!audio.isStreamRunning())
{
return;
qDebug(logAudio()) << "Packet received before stream was started";
return currentLatency;
}
// Incoming data is 8bits?
if (radioSampleBits == 8)
// Incoming data is 8bits?
if (radioSampleBits == 8)
{
QByteArray outPacket((int)data.data.length() * 2, (char)0xff);
qint16* out = (qint16*)outPacket.data();
for (int f = 0; f < data.data.length(); f++)
{
QByteArray inPacket((int)data.datain.length() * 2, (char)0xff);
qint16* in = (qint16*)inPacket.data();
for (int f = 0; f < data.datain.length(); f++)
if (isUlaw)
{
if (isUlaw)
{
in[f] = ulaw_decode[(quint8)data.datain[f]];
}
else
{
// Convert 8-bit sample to 16-bit
in[f] = (qint16)(((quint8)data.datain[f] << 8) - 32640);
}
out[f] = ulaw_decode[(quint8)data.data[f]];
}
else
{
// Convert 8-bit sample to 16-bit
out[f] = (qint16)(((quint8)data.data[f] << 8) - 32640);
}
data.datain = inPacket; // Replace incoming data with converted.
}
data.data.clear();
data.data = outPacket; // Replace incoming data with converted.
}
//qInfo(logAudio()) << "Adding packet to buffer:" << data.seq << ": " << data.datain.length();
//qInfo(logAudio()) << "Adding packet to buffer:" << data.seq << ": " << data.data.length();
/* We now have an array of 16bit samples in the NATIVE samplerate of the radio
If the radio sample rate is below 48000, we need to resample.
*/
/* We now have an array of 16bit samples in the NATIVE samplerate of the radio
If the radio sample rate is below 48000, we need to resample.
*/
if (ratioDen != 1) {
if (ratioDen != 1) {
// We need to resample
quint32 outFrames = ((data.datain.length() / 2) * ratioDen) / radioChannels;
quint32 inFrames = (data.datain.length() / 2) / radioChannels;
data.dataout.resize(outFrames * 2 * radioChannels); // Preset the output buffer size.
// We need to resample
quint32 outFrames = ((data.data.length() / 2) * ratioDen) / radioChannels;
quint32 inFrames = (data.data.length() / 2) / radioChannels;
QByteArray outPacket(outFrames * 2 * radioChannels, 0xff); // Preset the output buffer size.
int err = 0;
if (this->radioChannels == 1) {
err = wf_resampler_process_int(resampler, 0, (const qint16*)data.datain.constData(), &inFrames, (qint16*)data.dataout.data(), &outFrames);
}
else {
err = wf_resampler_process_interleaved_int(resampler, (const qint16*)data.datain.constData(), &inFrames, (qint16*)data.dataout.data(), &outFrames);
}
if (err) {
qInfo(logAudio()) << (isInput ? "Input" : "Output") << "Resampler error " << err << " inFrames:" << inFrames << " outFrames:" << outFrames;
}
int err = 0;
if (this->radioChannels == 1) {
err = wf_resampler_process_int(resampler, 0, (const qint16*)data.data.constData(), &inFrames, (qint16*)outPacket.data(), &outFrames);
}
else {
data.dataout = data.datain;
err = wf_resampler_process_interleaved_int(resampler, (const qint16*)data.data.constData(), &inFrames, (qint16*)outPacket.data(), &outFrames);
}
if (err) {
qInfo(logAudio()) << (isInput ? "Input" : "Output") << "Resampler error " << err << " inFrames:" << inFrames << " outFrames:" << outFrames;
}
data.data.clear();
data.data = outPacket; // Replace incoming data with converted.
}
mutex.lock();
audioBuffer.insert( data.seq, data );
mutex.unlock();
if (radioChannels == 1)
{
// Convert to stereo
QByteArray outPacket(data.data.length()*2, 0xff); // Preset the output buffer size.
qint16* in = (qint16*)data.data.data();
qint16* out = (qint16*)outPacket.data();
for (int f = 0; f < data.data.length()/2; f++)
{
*out++ = *in;
*out++ = *in++;
}
data.data.clear();
data.data = outPacket; // Replace incoming data with converted.
}
if (!ringBuf->try_write(data))
{
qDebug(logAudio()) << "Buffer full! capacity:" << ringBuf->capacity() << "length" << ringBuf->size();
}
return currentLatency;
}
void audioHandler::changeLatency(const quint16 newSize)
@ -367,7 +393,7 @@ void audioHandler::getNextAudioChunk(QByteArray& ret)
packet = audioBuffer.erase(packet); // returns next packet
}
else {
if (packet->datain.length() == chunkSize && ret.length() == 0)
if (packet->data.length() == chunkSize && ret.length() == 0)
{
// We now have an array of samples in the computer native format (48000)
// If the radio sample rate is below 48000, we need to resample.
@ -376,11 +402,11 @@ void audioHandler::getNextAudioChunk(QByteArray& ret)
if (ratioNum != 1)
{
// We need to resample (we are STILL 16 bit!)
quint32 outFrames = ((packet->datain.length() / 2) / ratioNum) / radioChannels;
quint32 inFrames = (packet->datain.length() / 2) / radioChannels;
quint32 outFrames = ((packet->data.length() / 2) / ratioNum) / radioChannels;
quint32 inFrames = (packet->data.length() / 2) / radioChannels;
packet->dataout.resize(outFrames * 2 * radioChannels); // Preset the output buffer size.
const qint16* in = (qint16*)packet->datain.constData();
const qint16* in = (qint16*)packet->data.constData();
qint16* out = (qint16*)packet->dataout.data();
int err = 0;
if (this->radioChannels == 1) {
@ -393,22 +419,22 @@ void audioHandler::getNextAudioChunk(QByteArray& ret)
qInfo(logAudio()) << (isInput ? "Input" : "Output") << "Resampler error " << err << " inFrames:" << inFrames << " outFrames:" << outFrames;
}
//qInfo(logAudio()) << "Resampler run " << err << " inFrames:" << inFrames << " outFrames:" << outFrames;
//qInfo(logAudio()) << "Resampler run inLen:" << packet->datain.length() << " outLen:" << packet->dataout.length();
//qInfo(logAudio()) << "Resampler run inLen:" << packet->data.length() << " outLen:" << packet->dataout.length();
if (radioSampleBits == 8)
{
packet->datain = packet->dataout; // Copy output packet back to input buffer.
packet->data = packet->dataout; // Copy output packet back to input buffer.
packet->dataout.clear(); // Buffer MUST be cleared ready to be re-filled by the upsampling below.
}
}
else if (radioSampleBits == 16) {
// Only copy buffer if radioSampleBits is 16, as it will be handled below otherwise.
packet->dataout = packet->datain;
packet->dataout = packet->data;
}
// Do we need to convert 16-bit to 8-bit?
if (radioSampleBits == 8) {
packet->dataout.resize(packet->datain.length() / 2);
qint16* in = (qint16*)packet->datain.data();
packet->dataout.resize(packet->data.length() / 2);
qint16* in = (qint16*)packet->data.data();
for (int f = 0; f < packet->dataout.length(); f++)
{
quint8 outdata = 0;

Wyświetl plik

@ -18,6 +18,7 @@ typedef signed short MY_TYPE;
#include <QTime>
#include <QMap>
#include "resampler/speex_resampler.h"
#include "ring/ring.h"
#include <QDebug>
@ -33,8 +34,7 @@ struct audioPacket {
quint32 seq;
QTime time;
quint16 sent;
QByteArray datain;
QByteArray dataout;
QByteArray data;
};
class audioHandler : public QObject
@ -57,10 +57,10 @@ public:
bool isSequential() const;
void getNextAudioChunk(QByteArray &data);
bool isChunkAvailable();
int incomingAudio(const audioPacket data);
private slots:
bool init(const quint8 bits, const quint8 channels, const quint16 samplerate, const quint16 latency, const bool isulaw, const bool isinput, int port, quint8 resampleQuality);
void incomingAudio(const audioPacket data);
void changeLatency(const quint16 newSize);
void notified();
void stateChanged(QAudio::State state);
@ -111,8 +111,12 @@ private:
unsigned int ratioNum;
unsigned int ratioDen;
QMutex mutex;
volatile bool lock=false;
wilt::Ring<audioPacket> *ringBuf=Q_NULLPTR;
volatile bool ready = false;
audioPacket tempBuf;
quint16 currentLatency;
};
#endif // AUDIOHANDLER_H

21
ring/LICENSE 100644
Wyświetl plik

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2018 Trevor Wilson
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

10
ring/README.md 100644
Wyświetl plik

@ -0,0 +1,10 @@
# Ring Library
## Overview
This library provides source for a multi-producer multi-consumer lock-free ring buffer. It provides a very simple interface for writing and reading from the buffer. The source includes a `Ring_` class, that provides the raw implementation and C-like facilities, as well as a templated `Ring<T>` class for typed reads and writes.
## Contact
If you have any questions, concerns, or recommendations please feel free to e-mail me at kmdreko@gmail.com. If you notice a bug or defect, create an issue to report it.

290
ring/ring.cpp 100644
Wyświetl plik

@ -0,0 +1,290 @@
////////////////////////////////////////////////////////////////////////////////
// FILE: ring.cpp
// DATE: 2016-02-25
// AUTH: Trevor Wilson
// DESC: Implements a lock-free, multi-consumer, multi-producer ring buffer
// class
////////////////////////////////////////////////////////////////////////////////
// Copyright (c) 2016 Trevor Wilson
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files(the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions :
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
#include "ring.h"
using namespace wilt;
#include <cstring>
// - std::memcpy
Ring_::Ring_()
: beg_(nullptr)
, end_(nullptr)
{
std::atomic_init(&used_, static_cast<std::ptrdiff_t>(0));
std::atomic_init(&free_, static_cast<std::ptrdiff_t>(0));
std::atomic_init(&rbuf_, static_cast<char*>(0));
std::atomic_init(&rptr_, static_cast<char*>(0));
std::atomic_init(&wptr_, static_cast<char*>(0));
std::atomic_init(&wbuf_, static_cast<char*>(0));
}
Ring_::Ring_(std::size_t size)
: beg_(new char[size])
, end_(beg_ + size)
{
std::atomic_init(&used_, static_cast<std::ptrdiff_t>(0));
std::atomic_init(&free_, static_cast<std::ptrdiff_t>(size));
std::atomic_init(&rbuf_, beg_);
std::atomic_init(&rptr_, beg_);
std::atomic_init(&wptr_, beg_);
std::atomic_init(&wbuf_, beg_);
}
Ring_::Ring_(Ring_&& ring)
: beg_(ring.beg_)
, end_(ring.end_)
{
std::atomic_init(&used_, ring.used_.load());
std::atomic_init(&free_, ring.free_.load());
std::atomic_init(&rbuf_, ring.rbuf_.load());
std::atomic_init(&rptr_, ring.rptr_.load());
std::atomic_init(&wptr_, ring.wptr_.load());
std::atomic_init(&wbuf_, ring.wbuf_.load());
ring.beg_ = nullptr;
ring.end_ = nullptr;
ring.used_.store(0);
ring.free_.store(0);
ring.rbuf_.store(nullptr);
ring.rptr_.store(nullptr);
ring.wptr_.store(nullptr);
ring.wbuf_.store(nullptr);
}
Ring_& Ring_::operator= (Ring_&& ring)
{
delete[] beg_;
beg_ = ring.beg_;
end_ = ring.end_;
used_.store(ring.used_.load());
free_.store(ring.free_.load());
rbuf_.store(ring.rbuf_.load());
rptr_.store(ring.rptr_.load());
wptr_.store(ring.wptr_.load());
wbuf_.store(ring.wbuf_.load());
ring.beg_ = nullptr;
ring.end_ = nullptr;
ring.used_.store(0);
ring.free_.store(0);
ring.rbuf_.store(nullptr);
ring.rptr_.store(nullptr);
ring.wptr_.store(nullptr);
ring.wbuf_.store(nullptr);
return *this;
}
Ring_::~Ring_()
{
delete[] beg_;
}
std::size_t Ring_::size() const
{
// The 'used' space can be negative in an over-reserved case, but it can be
// clamped to 0 for simplicity.
auto s = used_.load();
return s < 0 ? 0 : static_cast<std::size_t>(s);
}
std::size_t Ring_::capacity() const
{
return static_cast<std::size_t>(end_ - beg_);
}
void Ring_::read(void* data, std::size_t length) noexcept
{
auto block = acquire_read_block_(length);
copy_read_block_(block, (char*)data, length);
release_read_block_(block, length);
}
void Ring_::write(const void* data, std::size_t length) noexcept
{
auto block = acquire_write_block_(length);
copy_write_block_(block, (const char*)data, length);
release_write_block_(block, length);
}
bool Ring_::try_read(void* data, std::size_t length) noexcept
{
auto block = try_acquire_read_block_(length);
if (block == nullptr)
return false;
copy_read_block_(block, (char*)data, length);
release_read_block_(block, length);
return true;
}
bool Ring_::try_write(const void* data, std::size_t length) noexcept
{
auto block = try_acquire_write_block_(length);
if (block == nullptr)
return false;
copy_write_block_(block, (const char*)data, length);
release_write_block_(block, length);
return true;
}
char* Ring_::normalize_(char* ptr)
{
return ptr < end_ ? ptr : ptr - capacity();
}
char* Ring_::acquire_read_block_(std::size_t length)
{
auto size = static_cast<std::ptrdiff_t>(length);
while (true) // loop while conflict
{
auto old_rptr = rptr_.load(std::memory_order_consume); // read rptr
while (used_.load(std::memory_order_consume) < size) // check for data
; // spin until success
auto new_rptr = normalize_(old_rptr + size); // get block end
used_.fetch_sub(size); // reserve
if (rptr_.compare_exchange_strong(old_rptr, new_rptr)) // try commit
return old_rptr; // committed
used_.fetch_add(size, std::memory_order_relaxed); // un-reserve
}
}
char* Ring_::try_acquire_read_block_(std::size_t length)
{
auto size = static_cast<std::ptrdiff_t>(length);
while (true) // loop while conflict
{
auto old_rptr = rptr_.load(std::memory_order_consume); // read rptr
if (used_.load(std::memory_order_consume) < size) // check for data
return nullptr; // return failure
auto new_rptr = normalize_(old_rptr + size); // get block end
used_.fetch_sub(size); // reserve
if (rptr_.compare_exchange_strong(old_rptr, new_rptr)) // try commit
return old_rptr; // committed
used_.fetch_add(size, std::memory_order_relaxed); // un-reserve
}
}
void Ring_::copy_read_block_(const char* block, char* data, std::size_t length)
{
if (block + length < end_)
{
std::memcpy(data, block, length);
}
else
{
auto first = end_ - block;
std::memcpy(data, block, first);
std::memcpy(data + first, beg_, length - first);
}
}
void Ring_::release_read_block_(char* old_rptr, std::size_t length)
{
auto new_rptr = normalize_(old_rptr + length); // get block end
while (rbuf_.load() != old_rptr) // check for earlier reads
; // spin until reads complete
rbuf_.store(new_rptr); // finish commit
free_.fetch_add(length, std::memory_order_relaxed); // add to free space
}
char* Ring_::acquire_write_block_(std::size_t length)
{
auto size = static_cast<std::ptrdiff_t>(length);
while (true) // loop while conflict
{
auto old_wbuf = wbuf_.load(std::memory_order_consume); // read wbuf
while (free_.load(std::memory_order_consume) < size) // check for space
; // spin until success
auto new_wbuf = normalize_(old_wbuf + size); // get block end
free_.fetch_sub(size); // reserve
if (wbuf_.compare_exchange_strong(old_wbuf, new_wbuf)) // try commit
return old_wbuf; // committed
free_.fetch_add(size, std::memory_order_relaxed); // un-reserve
}
}
char* Ring_::try_acquire_write_block_(std::size_t length)
{
auto size = static_cast<std::ptrdiff_t>(length);
while (true) // loop while conflict
{
auto old_wbuf = wbuf_.load(std::memory_order_consume); // read wbuf
if (free_.load(std::memory_order_consume) < size) // check for space
return nullptr; // return failure
auto new_wbuf = normalize_(old_wbuf + size); // get block end
free_.fetch_sub(size); // reserve
if (wbuf_.compare_exchange_strong(old_wbuf, new_wbuf)) // try commit
return old_wbuf; // committed
free_.fetch_add(size, std::memory_order_relaxed); // un-reserve
}
}
void Ring_::copy_write_block_(char* block, const char* data, std::size_t length)
{
if (block + length < end_)
{
std::memcpy(block, data, length);
}
else
{
auto first = end_ - block;
std::memcpy(block, data, first);
std::memcpy(beg_, data + first, length - first);
}
}
void Ring_::release_write_block_(char* old_wbuf, std::size_t length)
{
auto new_wbuf = normalize_(old_wbuf + length); // get block end
while (wptr_.load() != old_wbuf) // wait for earlier writes
; // spin until writes complete
wptr_.store(new_wbuf); // finish commit
used_.fetch_add(length, std::memory_order_relaxed); // add to used space
}

440
ring/ring.h 100644
Wyświetl plik

@ -0,0 +1,440 @@
////////////////////////////////////////////////////////////////////////////////
// FILE: ring.h
// DATE: 2016-02-25
// AUTH: Trevor Wilson
// DESC: Defines a lock-free, multi-consumer, multi-producer ring buffer class
////////////////////////////////////////////////////////////////////////////////
// Copyright (c) 2016 Trevor Wilson
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files(the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions :
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
#ifndef WILT_RING_H
#define WILT_RING_H
#include <atomic>
// - std::atomic
#include <cstddef>
// - std::size_t
// - std::ptrdiff_t
#include <new>
// - ::new(ptr)
#include <type_traits>
// - std::is_nothrow_copy_constructible
// - std::is_nothrow_move_constructible
// - std::is_nothrow_move_assignable
// - std::is_nothrow_destructible
#include <utility>
// - std::move
namespace wilt
{
//////////////////////////////////////////////////////////////////////////////
// This structure aims to access elements in a ring buffer from multiple
// concurrent readers and writers in a lock-free manner.
//
// The class works by allocating the array and storing two pointers (for the
// beginning and end of the allocated space). Two atomic pointers are used to
// track the beginning and end of the currently used storage space. To
// facilitate concurrent reads and writes, theres a read buffer pointer before
// the read pointer for data currently being read, and a corresponding write
// buffer pointer beyond the write pointer for data currently being written.
// These buffer pointers cannot overlap. Just using these pointers suffer from
// some minute inefficiencies and a few ABA problems. Therfore, atomic
// integers are used to store the currently used and currently free sizes.
//
// It allows multiple readers and multiple writers by implementing a reserve-
// commit system. A thread wanting to read will check the used size to see if
// there's enough data. If there is, it subtracts from the used size to
// 'reserve' the read. It then does a compare-exchange to 'commit' by
// increasing the read pointer. If that fails, then it backs out ('un-
// reserves') by adding back to the used size and tries again. If it
// succeeds, then it proceeds to read the data. In order to complete, the
// reader must update the read buffer pointer to where it just finished
// reading from. However, because other readers that started before may not be
// done yet, the reader must wait until the read buffer pointer points to
// where the read started. Only, then is the read buffer pointer updated, and
// the free size increased. So while this implementation is lock-free, it is
// not wait-free. This same principle works the same when writing (ammended
// for the appropriate pointers).
//
// If two readers try to read at the same time and there is only enough data
// for one of them. The used size MAY be negative because they both 'reserve'
// the data. This is an over-reserved state. But the compare-exchange will
// only allow one reader to 'commit' to the read and the other will 'un-
// reserve' the read.
//
// |beg |rptr used=5 |wbuf - unused
// |----|----|++++|====|====|====|====|====|++++|----| + modifying
// free=3 |rbuf |wptr |end = used
//
// The diagram above shows a buffer of size 10 storing 5 bytes with a reader
// reading one byte and one writer reading one byte.
//
// Out of the box, the class works by reading and writing raw bytes from POD
// data types and arrays. A wrapper could allow for a nicer interface for
// pushing and popping elements. As it stands, this structure cannot be easily
// modified to store types of variable size.
class Ring_
{
private:
////////////////////////////////////////////////////////////////////////////
// TYPE DEFINITIONS
////////////////////////////////////////////////////////////////////////////
typedef char* data_ptr;
typedef std::atomic<std::ptrdiff_t> size_type;
typedef std::atomic<char*> atom_ptr;
private:
////////////////////////////////////////////////////////////////////////////
// PRIVATE MEMBERS
////////////////////////////////////////////////////////////////////////////
// Beginning and end pointers don't need to be atomic because they don't
// change. used_ and free_ can be negative in certain cases (and that's ok).
data_ptr beg_; // pointer to beginning of data block
data_ptr end_; // pointer to end of data block
alignas(64)
size_type used_; // size of unreserved used space
alignas(64)
size_type free_; // size of unreserved free space
alignas(64)
atom_ptr rbuf_; // pointer to beginning of data being read
atom_ptr rptr_; // pointer to beginning of data
alignas(64)
atom_ptr wptr_; // pointer to end of data
atom_ptr wbuf_; // pointer to end of data being written
public:
////////////////////////////////////////////////////////////////////////////
// CONSTRUCTORS AND DESTRUCTORS
////////////////////////////////////////////////////////////////////////////
// Constructs a ring without a buffer (capacity() == 0)
Ring_();
// Constructs a ring with a buffer with a size
Ring_(std::size_t size);
// Moves the buffer between rings, assumes no concurrent operations
Ring_(Ring_&& ring);
// Moves the buffer between rings, assumes no concurrent operations on
// either ring. Deallocates the buffer
Ring_& operator= (Ring_&& ring);
// No copying
Ring_(const Ring_&) = delete;
Ring_& operator= (const Ring_&) = delete;
// Deallocates the buffer
~Ring_();
public:
////////////////////////////////////////////////////////////////////////////
// QUERY FUNCTIONS
////////////////////////////////////////////////////////////////////////////
// Functions only report on the state of the ring
// Returns the current amount of non-reserved used space (amount of written
// data that a read hasn't yet reserved). Over-reserved scenarios mean this
// number is not the ultimate source of truth with concurrent operations,
// but its the closest safe approximation. This, of course, doesn't report
// writes that have not completed.
std::size_t size() const;
// Maximum amount of data that can be held
std::size_t capacity() const;
public:
////////////////////////////////////////////////////////////////////////////
// ACCESSORS AND MODIFIERS
////////////////////////////////////////////////////////////////////////////
// All operations assume object has not been moved. Blocking operations run
// until operation is completed. Non-blocking operations fail if there is
// not enough space
void read(void* data, std::size_t length) noexcept;
void write(const void* data, std::size_t length) noexcept;
bool try_read(void* data, std::size_t length) noexcept;
bool try_write(const void* data, std::size_t length) noexcept;
protected:
////////////////////////////////////////////////////////////////////////////
// PROTECTED FUNCTIONS
////////////////////////////////////////////////////////////////////////////
// Helper functions
// Wraps a pointer within the array. Assumes 'beg_ <= ptr < end_+capacity()'
char* normalize_(char*);
char* acquire_read_block_(std::size_t length);
char* try_acquire_read_block_(std::size_t length);
void copy_read_block_(const char* block, char* data, std::size_t length);
void release_read_block_(char* block, std::size_t length);
char* acquire_write_block_(std::size_t length);
char* try_acquire_write_block_(std::size_t length);
void copy_write_block_(char* block, const char* data, std::size_t length);
void release_write_block_(char* block, std::size_t length);
char* begin_alloc_() { return beg_; }
const char* begin_alloc_() const { return beg_; }
char* end_alloc_() { return end_; }
const char* end_alloc_() const { return end_; }
char* begin_data_() { return rptr_; }
const char* begin_data_() const { return rptr_; }
char* end_data_() { return wptr_; }
const char* end_data_() const { return wptr_; }
}; // class Ring_
template <class T>
class Ring : protected Ring_
{
public:
////////////////////////////////////////////////////////////////////////////
// CONSTRUCTORS AND DESTRUCTORS
////////////////////////////////////////////////////////////////////////////
// Constructs a ring without a buffer (capacity() == 0)
Ring();
// Constructs a ring with a buffer with a size
Ring(std::size_t size);
// Moves the buffer between rings, assumes no concurrent operations
Ring(Ring<T>&& ring);
// Moves the buffer between rings, assumes no concurrent operations on
// either ring. Deallocates the buffer
Ring<T>& operator= (Ring<T>&& ring);
// No copying
Ring(const Ring_&) = delete;
Ring& operator= (const Ring_&) = delete;
// Deallocates the buffer, destructs stored data.
~Ring();
public:
////////////////////////////////////////////////////////////////////////////
// QUERY FUNCTIONS
////////////////////////////////////////////////////////////////////////////
// Functions only report on the state of the ring
// Returns the current amount of non-reserved used space (amount of written
// data that a read hasn't yet reserved). Over-reserved scenarios mean this
// number is not the ultimate source of truth with concurrent operations,
// but its the closest safe approximation. This, of course, doesn't report
// writes that have not completed.
std::size_t size() const;
// Maximum amount of data that can be held
std::size_t capacity() const;
public:
////////////////////////////////////////////////////////////////////////////
// ACCESSORS AND MODIFIERS
////////////////////////////////////////////////////////////////////////////
// All operations assume object has not been moved. Blocking operations run
// until operation is completed. Non-blocking operations fail if there is
// not enough space
void read(T& data) noexcept; // blocking read
void write(const T& data) noexcept; // blocking write
void write(T&& data) noexcept; // blocking write
bool try_read(T& data) noexcept; // non-blocking read
bool try_write(const T& data) noexcept; // non-blocking write
bool try_write(T&& data) noexcept; // non-blocking write
private:
////////////////////////////////////////////////////////////////////////////
// PRIVATE HELPER FUNCTIONS
////////////////////////////////////////////////////////////////////////////
void destruct_();
}; // class Ring<T>
template <class T>
Ring<T>::Ring()
: Ring_()
{ }
template <class T>
Ring<T>::Ring(std::size_t size)
: Ring_(size * sizeof(T))
{ }
template <class T>
Ring<T>::Ring(Ring<T>&& ring)
: Ring_(std::move(ring))
{ }
template <class T>
Ring<T>& Ring<T>::operator= (Ring<T>&& ring)
{
destruct_();
Ring_::operator= (ring);
return *this;
}
template <class T>
Ring<T>::~Ring()
{
destruct_();
}
template <class T>
void Ring<T>::destruct_()
{
if (size() == 0)
return;
auto itr = begin_data_();
auto end = end_data_();
do
{
auto t = reinterpret_cast<T*>(itr);
t->~T();
itr = normalize_(itr + sizeof(T));
} while (itr != end);
}
template <class T>
std::size_t Ring<T>::size() const
{
return Ring_::size() / sizeof(T);
}
template <class T>
std::size_t Ring<T>::capacity() const
{
return Ring_::capacity() / sizeof(T);
}
template <class T>
void Ring<T>::read(T& data) noexcept
{
static_assert(std::is_nothrow_move_assignable<T>::value, "T move assignment must not throw");
static_assert(std::is_nothrow_destructible<T>::value, "T destructor must not throw");
auto block = acquire_read_block_(sizeof(T));
// critical section
auto t = reinterpret_cast<T*>(block);
data = std::move(*t);
t->~T();
release_read_block_(block, sizeof(T));
}
template <class T>
void Ring<T>::write(const T& data) noexcept
{
static_assert(std::is_nothrow_copy_constructible<T>::value, "T copy constructor must not throw");
auto block = acquire_write_block_(sizeof(T));
// critical section
new(block) T(data);
release_write_block_(block, sizeof(T));
}
template <class T>
void Ring<T>::write(T&& data) noexcept
{
static_assert(std::is_nothrow_move_constructible<T>::value, "T move constructor must not throw");
auto block = acquire_write_block_(sizeof(T));
// critical section
new(block) T(std::move(data));
release_write_block_(block, sizeof(T));
}
template <class T>
bool Ring<T>::try_read(T& data) noexcept
{
static_assert(std::is_nothrow_move_assignable<T>::value, "T move assignment must not throw");
static_assert(std::is_nothrow_destructible<T>::value, "T destructor must not throw");
auto block = try_acquire_read_block_(sizeof(T));
if (block == nullptr)
return false;
// critical section
auto t = reinterpret_cast<T*>(block);
data = std::move(*t);
t->~T();
release_read_block_(block, sizeof(T));
return true;
}
template <class T>
bool Ring<T>::try_write(const T& data) noexcept
{
static_assert(std::is_nothrow_copy_constructible<T>::value, "T copy constructor must not throw");
auto block = try_acquire_write_block_(sizeof(T));
if (block == nullptr)
return false;
// critical section
new(block) T(data);
release_write_block_(block, sizeof(T));
return true;
}
template <class T>
bool Ring<T>::try_write(T&& data) noexcept
{
static_assert(std::is_nothrow_move_constructible<T>::value, "T move constructor must not throw");
auto block = try_acquire_write_block_(sizeof(T));
if (block == nullptr)
return false;
// critical section
new(block) T(std::move(data));
release_write_block_(block, sizeof(T));
return true;
}
} // namespace wilt
#endif // !WILT_RING_H

Wyświetl plik

@ -189,7 +189,16 @@ void udpHandler::dataReceived()
totallost = totallost + civ->packetsLost;
}
emit haveNetworkStatus(" rtt: " + QString::number(latency) + " ms, loss: (" + QString::number(totallost) + "/" + QString::number(totalsent) + ")");
QString tempLatency;
if (rxLatency > audio->audioLatency)
{
tempLatency = QString::number(audio->audioLatency) + "ms";
}
else {
tempLatency = "<span style = \"color:red\">" + QString::number(audio->audioLatency) + "ms</span>";
}
emit haveNetworkStatus("rx latency: " + tempLatency + " ms / rtt: " + QString::number(latency) + " ms, loss: (" + QString::number(totallost) + "/" + QString::number(totalsent) + ")");
}
break;
}
@ -936,11 +945,11 @@ void udpAudio::dataReceived()
tempAudio.seq = (quint32)seqPrefix << 16 | in->seq;
tempAudio.time = lastReceived;
tempAudio.sent = 0;
tempAudio.datain = r.mid(0x18);
tempAudio.data = r.mid(0x18);
// Prefer signal/slot to forward audio as it is thread/safe
// Need to do more testing but latency appears fine.
emit haveAudioData(tempAudio);
//rxaudio->incomingAudio(tempAudio);
//emit haveAudioData(tempAudio);
audioLatency=rxaudio->incomingAudio(tempAudio);
}
break;
}

Wyświetl plik

@ -178,6 +178,8 @@ public:
udpAudio(QHostAddress local, QHostAddress ip, quint16 aport, quint16 rxlatency, quint16 txlatency, quint16 rxsample, quint8 rxcodec, quint16 txsample, quint8 txcodec, int outputPort, int inputPort, quint8 resampleQuality);
~udpAudio();
int audioLatency = 0;
signals:
void haveAudioData(audioPacket data);

Wyświetl plik

@ -695,7 +695,7 @@ void udpServer::audioReceived()
tempAudio.seq = (quint32)current->seqPrefix << 16 | in->seq;
tempAudio.time = QTime::currentTime();;
tempAudio.sent = 0;
tempAudio.datain = r.mid(0x18);
tempAudio.data = r.mid(0x18);
//qInfo(logUdpServer()) << "sending tx audio " << in->seq;
emit haveAudioData(tempAudio);
}
@ -1305,9 +1305,9 @@ void udpServer::sendRxAudio()
while (len < audio.length()) {
audioPacket partial;
partial.datain = audio.mid(len, 1364);
partial.data = audio.mid(len, 1364);
receiveAudioData(partial);
len = len + partial.datain.length();
len = len + partial.data.length();
}
}
}
@ -1322,15 +1322,15 @@ void udpServer::receiveAudioData(const audioPacket &d)
if (client != Q_NULLPTR && client->connected) {
audio_packet p;
memset(p.packet, 0x0, sizeof(p)); // We can't be sure it is initialized with 0x00!
p.len = sizeof(p) + d.datain.length();
p.len = sizeof(p) + d.data.length();
p.sentid = client->myId;
p.rcvdid = client->remoteId;
p.ident = 0x0080; // audio is always this?
p.datalen = (quint16)qToBigEndian((quint16)d.datain.length());
p.datalen = (quint16)qToBigEndian((quint16)d.data.length());
p.sendseq = (quint16)qToBigEndian((quint16)client->sendAudioSeq); // THIS IS BIG ENDIAN!
p.seq = client->txSeq;
QByteArray t = QByteArray::fromRawData((const char*)p.packet, sizeof(p));
t.append(d.datain);
t.append(d.data);
client->txMutex.lock();
client->txSeqBuf.append(SEQBUFENTRY());
client->txSeqBuf.last().seqNum = p.seq;

Wyświetl plik

@ -199,6 +199,7 @@
</QtUic>
</ItemDefinitionGroup>
<ItemGroup>
<ClCompile Include="ring\ring.cpp" />
<ClCompile Include="rtaudio\RtAudio.cpp" />
<ClCompile Include="audiohandler.cpp" />
<ClCompile Include="calibrationwindow.cpp" />
@ -222,6 +223,7 @@
<ClCompile Include="wfmain.cpp" />
</ItemGroup>
<ItemGroup>
<ClInclude Include="ring\ring.h" />
<ClInclude Include="rtaudio\RtAudio.h" />
<ClInclude Include="opus-tools\src\arch.h" />
<ClInclude Include="ulaw.h" />

Wyświetl plik

@ -120,6 +120,9 @@
<ClCompile Include="wfmain.cpp">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="ring\ring.cpp">
<Filter>Source Files</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="rtaudio\RtAudio.h">
@ -197,6 +200,9 @@
<ClInclude Include="ulaw.h">
<Filter>Header Files</Filter>
</ClInclude>
<ClInclude Include="ring\ring.h">
<Filter>Header Files</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<CustomBuild Include="debug\moc_predefs.h.cbt">
@ -354,6 +360,7 @@
<Filter>Resource Files</Filter>
</None>
<None Include="resources\wfview.png" />
<None Include="resources\wfview.png" />
</ItemGroup>
<ItemGroup>
<None Include="resources\install.sh">