Conditional variable support added

pull/189/head
Marcin Kondej 2023-05-21 01:21:38 +02:00
rodzic d1dc1fed84
commit 019f39c220
5 zmienionych plików z 117 dodań i 79 usunięć

Wyświetl plik

@ -36,7 +36,8 @@
#include <csignal> #include <csignal>
#include <unistd.h> #include <unistd.h>
bool stop = false; std::mutex mtx;
bool enable = true;
Transmitter *transmitter = nullptr; Transmitter *transmitter = nullptr;
void sigIntHandler(int sigNum) void sigIntHandler(int sigNum)
@ -44,7 +45,7 @@ void sigIntHandler(int sigNum)
if (transmitter != nullptr) { if (transmitter != nullptr) {
std::cout << "Stopping..." << std::endl; std::cout << "Stopping..." << std::endl;
transmitter->Stop(); transmitter->Stop();
stop = true; enable = false;
} }
} }
@ -97,14 +98,14 @@ int main(int argc, char** argv)
if ((optind == argc) && loop) { if ((optind == argc) && loop) {
optind = filesOffset; optind = filesOffset;
} }
WaveReader reader(filename != "-" ? filename : std::string(), stop); WaveReader reader(filename != "-" ? filename : std::string(), enable, mtx);
WaveHeader header = reader.GetHeader(); WaveHeader header = reader.GetHeader();
std::cout << "Playing: " << reader.GetFilename() << ", " std::cout << "Playing: " << reader.GetFilename() << ", "
<< header.sampleRate << " Hz, " << header.sampleRate << " Hz, "
<< header.bitsPerSample << " bits, " << header.bitsPerSample << " bits, "
<< ((header.channels > 0x01) ? "stereo" : "mono") << std::endl; << ((header.channels > 0x01) ? "stereo" : "mono") << std::endl;
transmitter->Transmit(reader, frequency, bandwidth, dmaChannel, optind < argc); transmitter->Transmit(reader, frequency, bandwidth, dmaChannel, optind < argc);
} while (!stop && (optind < argc)); } while (enable && (optind < argc));
} catch (std::exception &catched) { } catch (std::exception &catched) {
std::cout << "Error: " << catched.what() << std::endl; std::cout << "Error: " << catched.what() << std::endl;
result = EXIT_FAILURE; result = EXIT_FAILURE;

Wyświetl plik

@ -32,7 +32,7 @@
*/ */
#include "transmitter.hpp" #include "transmitter.hpp"
#include "mailbox.h" #include "mailbox.hpp"
#include <bcm_host.h> #include <bcm_host.h>
#include <thread> #include <thread>
#include <chrono> #include <chrono>
@ -354,7 +354,7 @@ class DMAController : public Device
}; };
Transmitter::Transmitter() Transmitter::Transmitter()
: output(nullptr), stop(true) : output(nullptr), enable(false)
{ {
} }
@ -366,7 +366,10 @@ Transmitter::~Transmitter() {
void Transmitter::Transmit(WaveReader &reader, float frequency, float bandwidth, unsigned dmaChannel, bool preserveCarrier) void Transmitter::Transmit(WaveReader &reader, float frequency, float bandwidth, unsigned dmaChannel, bool preserveCarrier)
{ {
stop = false; {
std::lock_guard<std::mutex> lock(mtx);
enable = true;
}
WaveHeader header = reader.GetHeader(); WaveHeader header = reader.GetHeader();
unsigned bufferSize = static_cast<unsigned>(static_cast<unsigned long long>(header.sampleRate) * BUFFER_TIME / 1000000); unsigned bufferSize = static_cast<unsigned>(static_cast<unsigned long long>(header.sampleRate) * BUFFER_TIME / 1000000);
@ -399,50 +402,64 @@ void Transmitter::Transmit(WaveReader &reader, float frequency, float bandwidth,
void Transmitter::Stop() void Transmitter::Stop()
{ {
stop = true; std::unique_lock<std::mutex> lock(mtx);
enable = false;
lock.unlock();
cv.notify_all();
} }
void Transmitter::TransmitViaCpu(WaveReader &reader, ClockOutput &output, unsigned sampleRate, unsigned bufferSize, unsigned clockDivisor, unsigned divisorRange) void Transmitter::TransmitViaCpu(WaveReader &reader, ClockOutput &output, unsigned sampleRate, unsigned bufferSize, unsigned clockDivisor, unsigned divisorRange)
{ {
std::vector<Sample> samples = reader.GetSamples(bufferSize, stop); std::vector<Sample> samples;
if (samples.empty()) {
return;
}
unsigned sampleOffset = 0; unsigned sampleOffset = 0;
bool eof = samples.size() < bufferSize, txStop = false;
std::thread transmitterThread(Transmitter::TransmitterThread, this, &output, sampleRate, clockDivisor, divisorRange, &sampleOffset, &samples, &txStop);
std::this_thread::sleep_for(std::chrono::microseconds(BUFFER_TIME / 2)); bool eof = false, stop = false, start = true;
std::thread transmitterThread(Transmitter::TransmitterThread, this, &output, sampleRate, clockDivisor, divisorRange, &sampleOffset, &samples, &stop);
auto finally = [&]() { auto finally = [&]() {
{ {
std::lock_guard<std::mutex> lock(access); std::lock_guard<std::mutex> lock(mtx);
txStop = true; stop = true;
cv.notify_one();
} }
transmitterThread.join(); transmitterThread.join();
samples.clear(); samples.clear();
stop = true; enable = false;
}; };
try { try {
while (!eof && !stop) { while (!eof) {
{ std::unique_lock<std::mutex> lock(mtx);
std::lock_guard<std::mutex> lock(access); if (!start) {
if (txStop) { cv.wait(lock, [&]() -> bool {
throw std::runtime_error("Transmitter thread has unexpectedly exited"); return samples.empty() || !enable || stop;
} });
if (samples.empty()) { } else {
if (!reader.SetSampleOffset(sampleOffset + bufferSize)) { start = false;
break; }
} if (!enable) {
samples = reader.GetSamples(bufferSize, stop); break;
if (samples.empty()) { }
break; if (stop) {
} throw std::runtime_error("Transmitter thread has unexpectedly exited");
eof = samples.size() < bufferSize; }
} if (samples.empty()) {
if (!reader.SetSampleOffset(sampleOffset + bufferSize)) {
break;
}
lock.unlock();
samples = reader.GetSamples(bufferSize, enable, mtx);
lock.lock();
if (samples.empty()) {
break;
}
eof = samples.size() < bufferSize;
lock.unlock();
cv.notify_one();
} else {
lock.unlock();
} }
std::this_thread::sleep_for(std::chrono::microseconds(BUFFER_TIME / 2));
} }
} catch (...) { } catch (...) {
finally(); finally();
@ -459,7 +476,7 @@ void Transmitter::TransmitViaDma(WaveReader &reader, ClockOutput &output, unsign
AllocatedMemory allocated(sizeof(uint32_t) * bufferSize + sizeof(DMAControllBlock) * (2 * bufferSize) + sizeof(uint32_t)); AllocatedMemory allocated(sizeof(uint32_t) * bufferSize + sizeof(DMAControllBlock) * (2 * bufferSize) + sizeof(uint32_t));
std::vector<Sample> samples = reader.GetSamples(bufferSize, stop); std::vector<Sample> samples = reader.GetSamples(bufferSize, enable, mtx);
if (samples.empty()) { if (samples.empty()) {
return; return;
} }
@ -509,11 +526,18 @@ void Transmitter::TransmitViaDma(WaveReader &reader, ClockOutput &output, unsign
std::this_thread::sleep_for(std::chrono::milliseconds(1)); std::this_thread::sleep_for(std::chrono::milliseconds(1));
} }
samples.clear(); samples.clear();
stop = true; std::lock_guard<std::mutex> lock(mtx);
enable = false;
}; };
try { try {
while (!eof && !stop) { while (!eof) {
samples = reader.GetSamples(bufferSize, stop); {
std::lock_guard<std::mutex> lock(mtx);
if (!enable) {
break;
}
}
samples = reader.GetSamples(bufferSize, enable, mtx);
if (!samples.size()) { if (!samples.size()) {
break; break;
} }
@ -542,28 +566,25 @@ void Transmitter::TransmitterThread(Transmitter *instance, ClockOutput *output,
volatile TimerRegisters *timer = reinterpret_cast<TimerRegisters *>(peripherals.GetVirtualAddress(TIMER_BASE_OFFSET)); volatile TimerRegisters *timer = reinterpret_cast<TimerRegisters *>(peripherals.GetVirtualAddress(TIMER_BASE_OFFSET));
uint64_t current = *(reinterpret_cast<volatile uint64_t *>(&timer->low)); uint64_t current = *(reinterpret_cast<volatile uint64_t *>(&timer->low));
uint64_t playbackStart = current; uint64_t playbackStart = current, start = current;
while (true) { while (true) {
std::vector<Sample> loadedSamples; std::vector<Sample> loadedSamples;
while (true) {
{
std::lock_guard<std::mutex> lock(instance->access);
if (*stop) {
return;
}
loadedSamples = std::move(*samples);
current = *(reinterpret_cast<volatile uint64_t *>(&timer->low));
if (!loadedSamples.empty()) {
*sampleOffset = (current - playbackStart) * sampleRate / 1000000;
break;
}
}
std::this_thread::sleep_for(std::chrono::microseconds(1));
};
uint64_t start = current; std::unique_lock<std::mutex> lock(instance->mtx);
unsigned offset = (current - start) * sampleRate / 1000000; instance->cv.wait(lock, [&]() -> bool {
return !samples->empty() || *stop;
});
if (*stop) {
break;
}
start = current = *(reinterpret_cast<volatile uint64_t *>(&timer->low));
*sampleOffset = (current - playbackStart) * sampleRate / 1000000;
loadedSamples = std::move(*samples);
lock.unlock();
instance->cv.notify_one();
unsigned offset = 0;
while (true) { while (true) {
if (offset >= loadedSamples.size()) { if (offset >= loadedSamples.size()) {
@ -573,14 +594,16 @@ void Transmitter::TransmitterThread(Transmitter *instance, ClockOutput *output,
float value = loadedSamples[offset].GetMonoValue(); float value = loadedSamples[offset].GetMonoValue();
instance->output->SetDivisor(clockDivisor - static_cast<int>(round(value * divisorRange))); instance->output->SetDivisor(clockDivisor - static_cast<int>(round(value * divisorRange)));
while (offset == prevOffset) { while (offset == prevOffset) {
std::this_thread::sleep_for(std::chrono::microseconds(1)); // asm("nop"); std::this_thread::yield(); // asm("nop");
current = *(reinterpret_cast<volatile uint64_t *>(&timer->low));; current = *(reinterpret_cast<volatile uint64_t *>(&timer->low));;
offset = (current - start) * sampleRate / 1000000; offset = (current - start) * sampleRate / 1000000;
} }
} }
} }
} catch (...) { } catch (...) {
std::lock_guard<std::mutex> lock(instance->access); std::unique_lock<std::mutex> lock(instance->mtx);
*stop = true; *stop = true;
lock.unlock();
instance->cv.notify_one();
} }
} }

Wyświetl plik

@ -34,7 +34,7 @@
#pragma once #pragma once
#include "wave_reader.hpp" #include "wave_reader.hpp"
#include <mutex> #include <condition_variable>
class ClockOutput; class ClockOutput;
@ -53,7 +53,8 @@ class Transmitter
void TransmitViaDma(WaveReader &reader, ClockOutput &output, unsigned sampleRate, unsigned bufferSize, unsigned clockDivisor, unsigned divisorRange, unsigned dmaChannel); void TransmitViaDma(WaveReader &reader, ClockOutput &output, unsigned sampleRate, unsigned bufferSize, unsigned clockDivisor, unsigned divisorRange, unsigned dmaChannel);
static void TransmitterThread(Transmitter *instance, ClockOutput *output, unsigned sampleRate, unsigned clockDivisor, unsigned divisorRange, unsigned *sampleOffset, std::vector<Sample> *samples, bool *stop); static void TransmitterThread(Transmitter *instance, ClockOutput *output, unsigned sampleRate, unsigned clockDivisor, unsigned divisorRange, unsigned *sampleOffset, std::vector<Sample> *samples, bool *stop);
std::condition_variable cv;
ClockOutput *output; ClockOutput *output;
std::mutex access; std::mutex mtx;
bool stop; bool enable;
}; };

Wyświetl plik

@ -65,7 +65,7 @@ float Sample::GetMonoValue() const
return value; return value;
} }
WaveReader::WaveReader(const std::string &filename, bool &stop) : WaveReader::WaveReader(const std::string &filename, bool &enable, std::mutex &mtx) :
filename(filename), headerOffset(0), currentDataOffset(0) filename(filename), headerOffset(0), currentDataOffset(0)
{ {
if (!filename.empty()) { if (!filename.empty()) {
@ -80,12 +80,12 @@ WaveReader::WaveReader(const std::string &filename, bool &stop) :
} }
try { try {
ReadData(sizeof(WaveHeader::chunkID) + sizeof(WaveHeader::chunkSize) + sizeof(WaveHeader::format), true, stop); ReadData(sizeof(WaveHeader::chunkID) + sizeof(WaveHeader::chunkSize) + sizeof(WaveHeader::format), true, enable, mtx);
if ((std::string(reinterpret_cast<char *>(header.chunkID), 4) != std::string("RIFF")) || (std::string(reinterpret_cast<char *>(header.format), 4) != std::string("WAVE"))) { if ((std::string(reinterpret_cast<char *>(header.chunkID), 4) != std::string("RIFF")) || (std::string(reinterpret_cast<char *>(header.format), 4) != std::string("WAVE"))) {
throw std::runtime_error(std::string("Error while opening ") + GetFilename() + std::string(", WAVE file expected")); throw std::runtime_error(std::string("Error while opening ") + GetFilename() + std::string(", WAVE file expected"));
} }
ReadData(sizeof(WaveHeader::subchunk1ID) + sizeof(WaveHeader::subchunk1Size), true, stop); ReadData(sizeof(WaveHeader::subchunk1ID) + sizeof(WaveHeader::subchunk1Size), true, enable, mtx);
unsigned subchunk1MinSize = sizeof(WaveHeader::audioFormat) + sizeof(WaveHeader::channels) + unsigned subchunk1MinSize = sizeof(WaveHeader::audioFormat) + sizeof(WaveHeader::channels) +
sizeof(WaveHeader::sampleRate) + sizeof(WaveHeader::byteRate) + sizeof(WaveHeader::blockAlign) + sizeof(WaveHeader::sampleRate) + sizeof(WaveHeader::byteRate) + sizeof(WaveHeader::blockAlign) +
sizeof(WaveHeader::bitsPerSample); sizeof(WaveHeader::bitsPerSample);
@ -93,7 +93,7 @@ WaveReader::WaveReader(const std::string &filename, bool &stop) :
throw std::runtime_error(std::string("Error while opening ") + GetFilename() + std::string(", data corrupted")); throw std::runtime_error(std::string("Error while opening ") + GetFilename() + std::string(", data corrupted"));
} }
ReadData(header.subchunk1Size, true, stop); ReadData(header.subchunk1Size, true, enable, mtx);
if ((header.audioFormat != WAVE_FORMAT_PCM) || if ((header.audioFormat != WAVE_FORMAT_PCM) ||
(header.byteRate != (header.bitsPerSample >> 3) * header.channels * header.sampleRate) || (header.byteRate != (header.bitsPerSample >> 3) * header.channels * header.sampleRate) ||
(header.blockAlign != (header.bitsPerSample >> 3) * header.channels) || (header.blockAlign != (header.bitsPerSample >> 3) * header.channels) ||
@ -101,7 +101,7 @@ WaveReader::WaveReader(const std::string &filename, bool &stop) :
throw std::runtime_error(std::string("Error while opening ") + GetFilename() + std::string(", unsupported WAVE format")); throw std::runtime_error(std::string("Error while opening ") + GetFilename() + std::string(", unsupported WAVE format"));
} }
ReadData(sizeof(WaveHeader::subchunk2ID) + sizeof(WaveHeader::subchunk2Size), true, stop); ReadData(sizeof(WaveHeader::subchunk2ID) + sizeof(WaveHeader::subchunk2Size), true, enable, mtx);
if (std::string(reinterpret_cast<char *>(header.subchunk2ID), 4) != std::string("data")) { if (std::string(reinterpret_cast<char *>(header.subchunk2ID), 4) != std::string("data")) {
throw std::runtime_error(std::string("Error while opening ") + GetFilename() + std::string(", data corrupted")); throw std::runtime_error(std::string("Error while opening ") + GetFilename() + std::string(", data corrupted"));
} }
@ -134,7 +134,7 @@ const WaveHeader &WaveReader::GetHeader() const
return header; return header;
} }
std::vector<Sample> WaveReader::GetSamples(unsigned quantity, bool &stop) { std::vector<Sample> WaveReader::GetSamples(unsigned quantity, bool &enable, std::mutex &mtx) {
unsigned bytesPerSample = (header.bitsPerSample >> 3) * header.channels; unsigned bytesPerSample = (header.bitsPerSample >> 3) * header.channels;
unsigned bytesToRead = quantity * bytesPerSample; unsigned bytesToRead = quantity * bytesPerSample;
unsigned bytesLeft = header.subchunk2Size - currentDataOffset; unsigned bytesLeft = header.subchunk2Size - currentDataOffset;
@ -143,7 +143,7 @@ std::vector<Sample> WaveReader::GetSamples(unsigned quantity, bool &stop) {
quantity = bytesToRead / bytesPerSample; quantity = bytesToRead / bytesPerSample;
} }
std::vector<uint8_t> data = std::move(ReadData(bytesToRead, false, stop)); std::vector<uint8_t> data = std::move(ReadData(bytesToRead, false, enable, mtx));
if (data.size() < bytesToRead) { if (data.size() < bytesToRead) {
quantity = data.size() / bytesPerSample; quantity = data.size() / bytesPerSample;
} }
@ -166,12 +166,18 @@ bool WaveReader::SetSampleOffset(unsigned offset) {
return true; return true;
} }
std::vector<uint8_t> WaveReader::ReadData(unsigned bytesToRead, bool headerBytes, bool &stop) std::vector<uint8_t> WaveReader::ReadData(unsigned bytesToRead, bool headerBytes, bool &enable, std::mutex &mtx)
{ {
unsigned bytesRead = 0; unsigned bytesRead = 0;
std::vector<uint8_t> data; std::vector<uint8_t> data;
data.resize(bytesToRead); data.resize(bytesToRead);
while ((bytesRead < bytesToRead) && !stop) { while (bytesRead < bytesToRead) {
{
std::lock_guard<std::mutex> lock(mtx);
if (!enable) {
break;
}
}
int bytes = read(fileDescriptor, &data[bytesRead], bytesToRead - bytesRead); int bytes = read(fileDescriptor, &data[bytesRead], bytesToRead - bytesRead);
if (((bytes == -1) && ((fileDescriptor != STDIN_FILENO) || (errno != EAGAIN))) || if (((bytes == -1) && ((fileDescriptor != STDIN_FILENO) || (errno != EAGAIN))) ||
((static_cast<unsigned>(bytes) < bytesToRead) && headerBytes && (fileDescriptor != STDIN_FILENO))) { ((static_cast<unsigned>(bytes) < bytesToRead) && headerBytes && (fileDescriptor != STDIN_FILENO))) {
@ -191,14 +197,20 @@ std::vector<uint8_t> WaveReader::ReadData(unsigned bytesToRead, bool headerBytes
} }
if (headerBytes) { if (headerBytes) {
if (stop) { {
throw std::runtime_error("Cannot obtain header, program interrupted"); std::lock_guard<std::mutex> lock(mtx);
if (!enable) {
throw std::runtime_error("Cannot obtain header, program interrupted");
}
} }
std::memcpy(&(reinterpret_cast<uint8_t *>(&header))[headerOffset], data.data(), bytesRead); std::memcpy(&(reinterpret_cast<uint8_t *>(&header))[headerOffset], data.data(), bytesRead);
headerOffset += bytesRead; headerOffset += bytesRead;
} else { } else {
if (stop) { {
data.resize(bytesRead); std::lock_guard<std::mutex> lock(mtx);
if (!enable) {
data.resize(bytesRead);
}
} }
currentDataOffset += bytesRead; currentDataOffset += bytesRead;
} }

Wyświetl plik

@ -36,6 +36,7 @@
#include <cstdint> #include <cstdint>
#include <string> #include <string>
#include <vector> #include <vector>
#include <mutex>
#define WAVE_FORMAT_PCM 0x0001 #define WAVE_FORMAT_PCM 0x0001
@ -68,17 +69,17 @@ protected:
class WaveReader class WaveReader
{ {
public: public:
WaveReader(const std::string &filename, bool &stop); WaveReader(const std::string &filename, bool &enable, std::mutex &mtx);
virtual ~WaveReader(); virtual ~WaveReader();
WaveReader(const WaveReader &) = delete; WaveReader(const WaveReader &) = delete;
WaveReader(WaveReader &&) = delete; WaveReader(WaveReader &&) = delete;
WaveReader &operator=(const WaveReader &) = delete; WaveReader &operator=(const WaveReader &) = delete;
std::string GetFilename() const; std::string GetFilename() const;
const WaveHeader &GetHeader() const; const WaveHeader &GetHeader() const;
std::vector<Sample> GetSamples(unsigned quantity, bool &stop); std::vector<Sample> GetSamples(unsigned quantity, bool &enable, std::mutex &mtx);
bool SetSampleOffset(unsigned offset); bool SetSampleOffset(unsigned offset);
private: private:
std::vector<uint8_t> ReadData(unsigned bytesToRead, bool headerBytes, bool &stop); std::vector<uint8_t> ReadData(unsigned bytesToRead, bool headerBytes, bool &enable, std::mutex &mtx);
std::string filename; std::string filename;
WaveHeader header; WaveHeader header;