From 7ab743d05b3b56d47f2a04d317885a29e61fb03e Mon Sep 17 00:00:00 2001 From: AlexandreRouma Date: Fri, 2 Feb 2024 04:11:29 +0100 Subject: [PATCH] finish iq exporter and fix network lib send not closing socket on error --- core/src/utils/net.cpp | 11 +- misc_modules/iq_exporter/src/main.cpp | 158 ++++++++++++++++++++------ 2 files changed, 134 insertions(+), 35 deletions(-) diff --git a/core/src/utils/net.cpp b/core/src/utils/net.cpp index 99ff8390..2abd6239 100644 --- a/core/src/utils/net.cpp +++ b/core/src/utils/net.cpp @@ -138,7 +138,16 @@ namespace net { } int Socket::send(const uint8_t* data, size_t len, const Address* dest) { - return sendto(sock, (const char*)data, len, 0, (sockaddr*)(dest ? &dest->addr : (raddr ? &raddr->addr : NULL)), sizeof(sockaddr_in)); + // Send data + int err = sendto(sock, (const char*)data, len, 0, (sockaddr*)(dest ? &dest->addr : (raddr ? &raddr->addr : NULL)), sizeof(sockaddr_in)); + + // On error, close socket + if (err <= 0 && !WOULD_BLOCK) { + close(); + return err; + } + + return err; } int Socket::sendstr(const std::string& str, const Address* dest) { diff --git a/misc_modules/iq_exporter/src/main.cpp b/misc_modules/iq_exporter/src/main.cpp index e206e109..a4b6db67 100644 --- a/misc_modules/iq_exporter/src/main.cpp +++ b/misc_modules/iq_exporter/src/main.cpp @@ -8,6 +8,8 @@ #include #include #include +#include +#include #include SDRPP_MOD_INFO{ @@ -27,7 +29,8 @@ enum Mode { }; enum Protocol { - PROTOCOL_TCP, + PROTOCOL_TCP_SERVER, + PROTOCOL_TCP_CLIENT, PROTOCOL_UDP }; @@ -62,7 +65,8 @@ public: } // Define protocols - protocols.define("TCP", PROTOCOL_TCP); + protocols.define("TCP (Server)", PROTOCOL_TCP_SERVER); + protocols.define("TCP (Client)", PROTOCOL_TCP_CLIENT); protocols.define("UDP", PROTOCOL_UDP); // Define sample types @@ -71,6 +75,13 @@ public: sampleTypes.define("Int32", SAMPLE_TYPE_INT32); sampleTypes.define("Float32", SAMPLE_TYPE_FLOAT32); + // Define packet sizes + for (int i = 8; i <= 32768; i <<= 1) { + char buf[16]; + sprintf(buf, "%d Bytes", i); + packetSizes.define(i, buf, i); + } + // Load config bool autoStart = false; Mode nMode = MODE_BASEBAND; @@ -91,6 +102,10 @@ public: std::string sampTypeStr = config.conf[name]["sampleType"]; if (sampleTypes.keyExists(sampTypeStr)) { sampType = sampleTypes.value(sampleTypes.keyId(sampTypeStr)); } } + if (config.conf[name].contains("packetSize")) { + int size = config.conf[name]["packetSize"]; + if (packetSizes.keyExists(size)) { packetSize = packetSizes.value(packetSizes.keyId(size)); } + } if (config.conf[name].contains("host")) { std::string hostStr = config.conf[name]["host"]; strcpy(hostname, hostStr.c_str()); @@ -109,12 +124,14 @@ public: srId = samplerates.valueId(samplerate); protoId = protocols.valueId(proto); sampTypeId = sampleTypes.valueId(sampType); + packetSizeId = packetSizes.valueId(packetSize); // Allocate buffer buffer = dsp::buffer::alloc(STREAM_BUFFER_SIZE * sizeof(dsp::complex_t)); // Init DSP - handler.init(&iqStream, dataHandler, this); + reshape.init(&iqStream, packetSize/sampleSize(), 0); + handler.init(&reshape.out, dataHandler, this); // Set operating mode setMode(nMode); @@ -143,10 +160,27 @@ public: void postInit() {} void enable() { + // Rebind streams and start DSP + setMode(mode, true); + + // Restart networking if it was running + if (wasRunning) { start(); } + + // Mark as running enabled = true; } void disable() { + // Save running state + wasRunning = running; + + // Stop networking + stop(); + + // Stop the DSP and unbind streams + setMode(MODE_NONE); + + // Mark as disabled enabled = false; } @@ -162,13 +196,17 @@ public: // Start listening or open UDP socket try { - if (proto == PROTOCOL_TCP) { + if (proto == PROTOCOL_TCP_SERVER) { // Create listener listener = net::listen(hostname, port); // Start listen worker listenWorkerThread = std::thread(&IQExporterModule::listenWorker, this); } + else if (proto == PROTOCOL_TCP_CLIENT) { + // Connect to TCP server + sock = net::connect(hostname, port); + } else { // Open UDP socket sock = net::openudp(hostname, port, "0.0.0.0", 0, true); @@ -176,6 +214,9 @@ public: } catch (const std::exception& e) { flog::error("[IQExporter] Could not start socket: {}", e.what()); + errorStr = e.what(); + showError = true; + return; } running = true; @@ -188,7 +229,7 @@ public: std::lock_guard lck1(sockMtx); // Stop listening or close UDP socket - if (proto == PROTOCOL_TCP) { + if (proto == PROTOCOL_TCP_SERVER) { // Stop listener if (listener) { listener->stop(); @@ -207,7 +248,7 @@ public: } } else { - // Close UDP socket and free it + // Close socket and free it if (sock) { sock->close(); sock.reset(); @@ -235,6 +276,11 @@ private: static void menuHandler(void* ctx) { IQExporterModule* _this = (IQExporterModule*)ctx; float menuWidth = ImGui::GetContentRegionAvail().x; + + // Error message box + ImGui::GenericDialog("##iq_exporter_err_", _this->showError, GENERIC_DIALOG_BUTTONS_OK, [=](){ + ImGui::Text("Error: %s", _this->errorStr.c_str()); + }); if (!_this->enabled) { ImGui::BeginDisabled(); } @@ -281,11 +327,23 @@ private: ImGui::FillWidth(); if (ImGui::Combo(("##iq_exporter_samp_" + _this->name).c_str(), &_this->sampTypeId, _this->sampleTypes.txt)) { _this->sampType = _this->sampleTypes.value(_this->sampTypeId); + _this->reshape.setKeep(_this->packetSize/_this->sampleSize()); config.acquire(); config.conf[_this->name]["sampleType"] = _this->sampleTypes.key(_this->sampTypeId); config.release(true); } + // Packet size selector + ImGui::LeftLabel("Packet size"); + ImGui::FillWidth(); + if (ImGui::Combo(("##iq_exporter_pkt_sz_" + _this->name).c_str(), &_this->packetSizeId, _this->packetSizes.txt)) { + _this->packetSize = _this->packetSizes.value(_this->packetSizeId); + _this->reshape.setKeep(_this->packetSize/_this->sampleSize()); + config.acquire(); + config.conf[_this->name]["packetSize"] = _this->packetSizes.key(_this->packetSizeId); + config.release(true); + } + // Hostname and port field if (ImGui::InputText(("##iq_exporter_host_" + _this->name).c_str(), _this->hostname, sizeof(_this->hostname))) { config.acquire(); @@ -304,7 +362,7 @@ private: if (_this->running) { ImGui::EndDisabled(); } // Start/Stop buttons - if (_this->running) { + if (_this->running || (!_this->enabled && _this->wasRunning)) { if (ImGui::Button(("Stop##iq_exporter_stop_" + _this->name).c_str(), ImVec2(menuWidth, 0))) { _this->stop(); config.acquire(); @@ -321,75 +379,78 @@ private: } } + // Check if the socket is open by attempting a read + bool sockOpen; + { + uint8_t dummy; + sockOpen = !(!_this->sock || !_this->sock->isOpen() || (_this->proto != PROTOCOL_UDP && _this->sock->recv(&dummy, 1, false, net::NONBLOCKING) == 0)); + } + // Status text ImGui::TextUnformatted("Status:"); ImGui::SameLine(); - if (_this->sock && _this->sock->isOpen()) { - ImGui::TextColored(ImVec4(0.0, 1.0, 0.0, 1.0), (_this->proto == PROTOCOL_TCP) ? "Connected" : "Sending"); + if (sockOpen) { + ImGui::TextColored(ImVec4(0.0, 1.0, 0.0, 1.0), (_this->proto == PROTOCOL_TCP_SERVER || _this->proto == PROTOCOL_TCP_CLIENT) ? "Connected" : "Sending"); } else if (_this->listener && _this->listener->listening()) { ImGui::TextColored(ImVec4(1.0, 1.0, 0.0, 1.0), "Listening"); } + else if (!_this->enabled) { + ImGui::TextUnformatted("Disabled"); + } else { + // If we're idle and still supposed to be running, the server has closed the connection (TODO: kinda jank...) + if (_this->running) { _this->stop(); } + ImGui::TextUnformatted("Idle"); } if (!_this->enabled) { ImGui::EndDisabled(); } } - void setMode(Mode newMode) { + void setMode(Mode newMode, bool fromDisabled = false) { // If there is no mode to change, do nothing - flog::debug("Mode change"); - if (mode == newMode) { - flog::debug("New mode same as existing mode, doing nothing"); - return; - } + if (!fromDisabled && mode == newMode) { return; } // Stop the DSP - flog::debug("Stopping DSP"); + reshape.stop(); handler.stop(); // Delete VFO or unbind IQ stream if (vfo) { - flog::debug("Deleting old VFO"); sigpath::vfoManager.deleteVFO(vfo); vfo = NULL; } - if (mode == MODE_BASEBAND) { - flog::debug("Unbinding old stream"); + if (mode == MODE_BASEBAND && !fromDisabled) { sigpath::iqFrontEnd.unbindIQStream(&iqStream); } // If the mode was none, we're done if (newMode == MODE_NONE) { - flog::debug("Exiting, new mode is NONE"); return; } // Create VFO or bind IQ stream if (newMode == MODE_VFO) { - flog::debug("Creating new VFO"); // Create VFO vfo = sigpath::vfoManager.createVFO(name, ImGui::WaterfallVFO::REF_CENTER, 0, samplerate, samplerate, samplerate, samplerate, true); // Set its output as the input to the DSP - handler.setInput(vfo->output); + reshape.setInput(vfo->output); } else { - flog::debug("Binding IQ stream"); // Bind IQ stream sigpath::iqFrontEnd.bindIQStream(&iqStream); // Set its output as the input to the DSP - handler.setInput(&iqStream); + reshape.setInput(&iqStream); } // Start DSP - flog::debug("Starting DSP"); + reshape.start(); handler.start(); // Update mode - flog::debug("Updating mode"); mode = newMode; modeId = modes.valueId(newMode); } @@ -405,21 +466,37 @@ private: std::lock_guard lck(sockMtx); sock = newSock; } + } + } - // Wait until disconnection - // TODO + int sampleSize() { + switch (sampType) { + case SAMPLE_TYPE_INT8: + return sizeof(int8_t)*2; + case SAMPLE_TYPE_INT16: + return sizeof(int16_t)*2; + case SAMPLE_TYPE_INT32: + return sizeof(int32_t)*2; + case SAMPLE_TYPE_FLOAT32: + return sizeof(dsp::complex_t); + default: + return -1; } } static void dataHandler(dsp::complex_t* data, int count, void* ctx) { IQExporterModule* _this = (IQExporterModule*)ctx; - // Acquire lock on socket - std::lock_guard lck(_this->sockMtx); + // Try to cquire lock on socket + if (!_this->sockMtx.try_lock()) { return; } // If not valid or open, give uo - if (!_this->sock || !_this->sock->isOpen()) { return; } - + if (!_this->sock || !_this->sock->isOpen()) { + // Unlock socket mutex + _this->sockMtx.unlock(); + return; + } + // Convert the samples or send directory for float32 int size; switch (_this->sampType) { @@ -428,7 +505,7 @@ private: size = sizeof(int8_t)*2; break; case SAMPLE_TYPE_INT16: - volk_32fc_convert_16ic((lv_16sc_t*)_this->buffer, (lv_32fc_t*)data, count); + volk_32f_s32f_convert_16i((int16_t*)_this->buffer, (float*)data, 32768.0f, count*2); size = sizeof(int16_t)*2; break; case SAMPLE_TYPE_INT32: @@ -438,11 +515,16 @@ private: case SAMPLE_TYPE_FLOAT32: _this->sock->send((uint8_t*)data, count*sizeof(dsp::complex_t)); default: + // Unlock socket mutex + _this->sockMtx.unlock(); return; } // Send converted samples _this->sock->send(_this->buffer, count*size); + + // Unlock socket mutex + _this->sockMtx.unlock(); } std::string name; @@ -452,21 +534,29 @@ private: int modeId; int samplerate = 1000000.0; int srId; - Protocol proto = PROTOCOL_TCP; + Protocol proto = PROTOCOL_TCP_SERVER; int protoId; SampleType sampType = SAMPLE_TYPE_INT16; int sampTypeId; + int packetSize = 1024; + int packetSizeId; char hostname[1024] = "localhost"; int port = 1234; bool running = false; + bool wasRunning = false; + + bool showError = false; + std::string errorStr = ""; OptionList modes; OptionList samplerates; OptionList protocols; OptionList sampleTypes; + OptionList packetSizes; VFOManager::VFO* vfo = NULL; dsp::stream iqStream; + dsp::buffer::Reshaper reshape; dsp::sink::Handler handler; uint8_t* buffer = NULL;