Initial rework of websocket server

webclient_js
Michal Fratczak 2019-09-17 13:21:35 +02:00
rodzic e524932a95
commit 5d9c6d1ae6
15 zmienionych plików z 1169 dodań i 443 usunięć

Wyświetl plik

@ -26,6 +26,7 @@
#include <complex>
#include <chrono>
#include <mutex>
#include <functional>
#include "IQVector.h"
#include "Decimator.h"
@ -125,7 +126,7 @@ public:
bool livePrint() const { return live_print_; }
void livePrint(bool i_live) { live_print_ = i_live; }
void (*success_callback_)(std::string, std::string, std::string) {0}; // callback on successfull sentence decode
std::function<void(std::string, std::string, std::string)> success_callback_; // callback on each successfull sentence decode
private:
// IQ buffers

Wyświetl plik

@ -31,6 +31,7 @@ if ("${CMAKE_CXX_COMPILER_ID}" STREQUAL "GNU")
set( CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-class-memaccess" )
set( CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-unused-variable" )
set( CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-unused-parameter" )
set( CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-maybe-uninitialized" )
set( CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror=return-type" )
elseif ("${CMAKE_CXX_COMPILER_ID}" STREQUAL "MSVC")
SET(CMAKE_CXX_FLAGS "/EHsc")
@ -44,7 +45,12 @@ endif()
set ( websocketServer_src
program_options.h program_options.cpp
server.h server.cpp
habdec_ws_protocol.h habdec_ws_protocol.cpp
http_session.h http_session.cpp
listener.h listener.cpp
websocket_session.h websocket_session.cpp
ws_server.h ws_server.cpp
CompressedVector.h CompressedVector.cpp
${CMAKE_SOURCE_DIR}/habitat/habitat_upload.cpp
${CMAKE_SOURCE_DIR}/habitat/habitat_list_flights.cpp
${CMAKE_SOURCE_DIR}/common/http_request.cpp

Wyświetl plik

@ -0,0 +1,241 @@
/*
Copyright 2018 Michal Fratczak
This file is part of habdec.
habdec is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
habdec is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with habdec. If not, see <https://www.gnu.org/licenses/>.
*/
#include <vector>
#include <algorithm>
#include <iostream>
#include "CompressedVector.h"
namespace habdec
{
// specialization: float --> double
template<>
template<>
void CompressedVector<double>::copyValues(const std::vector<float>& rhs, double i_min, double i_max)
{
if(!rhs.size())
{
values_.clear();
return;
}
values_.resize(rhs.size());
std::copy(rhs.begin(), rhs.end(), values_.begin());
}
// specialization: double --> float
template<>
template<>
void CompressedVector<float>::copyValues(const std::vector<double>& rhs, double i_min, double i_max)
{
// std::cout<<"specialization: double --> float"<<std::endl;
if(!rhs.size())
{
values_.clear();
return;
}
values_.resize(rhs.size());
std::copy(rhs.begin(), rhs.end(), values_.begin());
}
// specialization: float --> unsigned char
template<>
template<>
void CompressedVector<unsigned char>::copyValues(const std::vector<float>& rhs, double i_min, double i_max)
{
// std::cout<<"specialization: float --> unsigned char"<<std::endl;
values_.clear();
// normalized_ = true; // ALWAYS NORMALIZED FOR NON-FLOAT TYPES
if(!rhs.size())
return;
values_.reserve(rhs.size());
for(auto rhs_v : rhs)
{
// if(!rhs.normalized_)
rhs_v = float(rhs_v - i_min) / (i_max - i_min);
unsigned char v_out = rhs_v * std::numeric_limits<unsigned char>::max();
values_.push_back(v_out);
}
}
// specialization: float --> uint16_t
template<>
template<>
void CompressedVector<uint16_t>::copyValues(const std::vector<float>& rhs, double i_min, double i_max)
{
// std::cout<<"specialization: float --> uint16_t"<<std::endl;
values_.clear();
// normalized_ = true; // ALWAYS NORMALIZED FOR NON-FLOAT TYPES
if(!rhs.size())
return;
values_.reserve(rhs.size());
for(auto rhs_v : rhs)
{
// if(!rhs.normalized_)
rhs_v = float(rhs_v - i_min) / (i_max - i_min);
uint16_t v_out = rhs_v * std::numeric_limits<uint16_t>::max();
values_.push_back(v_out);
}
}
// specialization: double --> unsigned char
template<>
template<>
void CompressedVector<unsigned char>::copyValues(const std::vector<double>& rhs, double i_min, double i_max)
{
// std::cout<<"specialization: double --> unsigned char"<<std::endl;
values_.clear();
// normalized_ = true; // ALWAYS NORMALIZED FOR NON-FLOAT TYPES
if(!rhs.size())
return;
values_.reserve(rhs.size());
for(auto rhs_v : rhs)
{
// if(!rhs.normalized_)
rhs_v = double(rhs_v - i_min) / (i_max - i_min);
unsigned char v_out = rhs_v * std::numeric_limits<unsigned char>::max();
values_.push_back(v_out);
}
}
// specialization: double --> uint16_t
template<>
template<>
void CompressedVector<uint16_t>::copyValues(const std::vector<double>& rhs, double i_min, double i_max)
{
// std::cout<<"specialization: double --> uint16_t"<<std::endl;
values_.clear();
// normalized_ = true; // ALWAYS NORMALIZED FOR NON-FLOAT TYPES
if(!rhs.size())
return;
values_.reserve(rhs.size());
for(auto rhs_v : rhs)
{
// if(!rhs.normalized_)
rhs_v = double(rhs_v - i_min) / (i_max - i_min);
uint16_t v_out = rhs_v * std::numeric_limits<uint16_t>::max();
values_.push_back(v_out);
}
}
// specialization: unsigned char --> float
template<>
template<>
void CompressedVector<float>::copyValues(const std::vector<unsigned char>& rhs, double i_min, double i_max)
{
// std::cout<<"specialization: unsigned char --> float"<<std::endl;
values_.clear();
// normalized_ = true; // ALWAYS NORMALIZED FOR NON-FLOAT TYPES
if(!rhs.size())
return;
values_.reserve(rhs.size());
for(auto rhs_v : rhs)
{
float rhs_v_0_1 = float(rhs_v) / std::numeric_limits<unsigned char>::max();
values_.push_back(rhs_v_0_1);
}
}
// specialization: uint16_t --> float
template<>
template<>
void CompressedVector<float>::copyValues(const std::vector<uint16_t>& rhs, double i_min, double i_max)
{
// std::cout<<"specialization: uint16_t --> float"<<std::endl;
values_.clear();
// normalized_ = true; // ALWAYS NORMALIZED FOR NON-FLOAT TYPES
if(!rhs.size())
return;
values_.reserve(rhs.size());
for(auto rhs_v : rhs)
{
float rhs_v_0_1 = float(rhs_v) / std::numeric_limits<uint16_t>::max();
values_.push_back(rhs_v_0_1);
}
}
// specialization: unsigned char --> double
template<>
template<>
void CompressedVector<double>::copyValues(const std::vector<unsigned char>& rhs, double i_min, double i_max)
{
// std::cout<<"specialization: unsigned char --> double"<<std::endl;
values_.clear();
// normalized_ = true; // ALWAYS NORMALIZED FOR NON-FLOAT TYPES
if(!rhs.size())
return;
values_.reserve(rhs.size());
for(auto rhs_v : rhs)
{
double rhs_v_0_1 = double(rhs_v) / std::numeric_limits<unsigned char>::max();
values_.push_back(rhs_v_0_1);
}
}
// specialization: uint16_t --> double
template<>
template<>
void CompressedVector<double>::copyValues(const std::vector<uint16_t>& rhs, double i_min, double i_max)
{
// std::cout<<"specialization: uint16_t --> double"<<std::endl;
values_.clear();
// normalized_ = true; // ALWAYS NORMALIZED FOR NON-FLOAT TYPES
if(!rhs.size())
return;
values_.reserve(rhs.size());
for(auto rhs_v : rhs)
{
double rhs_v_0_1 = double(rhs_v) / std::numeric_limits<uint16_t>::max();
values_.push_back(rhs_v_0_1);
}
}
} // namespace habdec

Wyświetl plik

@ -98,214 +98,4 @@ private:
};
// specialization: float --> double
template<>
template<>
void CompressedVector<double>::copyValues(const std::vector<float>& rhs, double i_min, double i_max)
{
if(!rhs.size())
{
values_.clear();
return;
}
values_.resize(rhs.size());
std::copy(rhs.begin(), rhs.end(), values_.begin());
}
// specialization: double --> float
template<>
template<>
void CompressedVector<float>::copyValues(const std::vector<double>& rhs, double i_min, double i_max)
{
// std::cout<<"specialization: double --> float"<<std::endl;
if(!rhs.size())
{
values_.clear();
return;
}
values_.resize(rhs.size());
std::copy(rhs.begin(), rhs.end(), values_.begin());
}
// specialization: float --> unsigned char
template<>
template<>
void CompressedVector<unsigned char>::copyValues(const std::vector<float>& rhs, double i_min, double i_max)
{
// std::cout<<"specialization: float --> unsigned char"<<std::endl;
values_.clear();
// normalized_ = true; // ALWAYS NORMALIZED FOR NON-FLOAT TYPES
if(!rhs.size())
return;
values_.reserve(rhs.size());
for(auto rhs_v : rhs)
{
// if(!rhs.normalized_)
rhs_v = float(rhs_v - i_min) / (i_max - i_min);
unsigned char v_out = rhs_v * std::numeric_limits<unsigned char>::max();
values_.push_back(v_out);
}
}
// specialization: float --> uint16_t
template<>
template<>
void CompressedVector<uint16_t>::copyValues(const std::vector<float>& rhs, double i_min, double i_max)
{
// std::cout<<"specialization: float --> uint16_t"<<std::endl;
values_.clear();
// normalized_ = true; // ALWAYS NORMALIZED FOR NON-FLOAT TYPES
if(!rhs.size())
return;
values_.reserve(rhs.size());
for(auto rhs_v : rhs)
{
// if(!rhs.normalized_)
rhs_v = float(rhs_v - i_min) / (i_max - i_min);
uint16_t v_out = rhs_v * std::numeric_limits<uint16_t>::max();
values_.push_back(v_out);
}
}
// specialization: double --> unsigned char
template<>
template<>
void CompressedVector<unsigned char>::copyValues(const std::vector<double>& rhs, double i_min, double i_max)
{
// std::cout<<"specialization: double --> unsigned char"<<std::endl;
values_.clear();
// normalized_ = true; // ALWAYS NORMALIZED FOR NON-FLOAT TYPES
if(!rhs.size())
return;
values_.reserve(rhs.size());
for(auto rhs_v : rhs)
{
// if(!rhs.normalized_)
rhs_v = double(rhs_v - i_min) / (i_max - i_min);
unsigned char v_out = rhs_v * std::numeric_limits<unsigned char>::max();
values_.push_back(v_out);
}
}
// specialization: double --> uint16_t
template<>
template<>
void CompressedVector<uint16_t>::copyValues(const std::vector<double>& rhs, double i_min, double i_max)
{
// std::cout<<"specialization: double --> uint16_t"<<std::endl;
values_.clear();
// normalized_ = true; // ALWAYS NORMALIZED FOR NON-FLOAT TYPES
if(!rhs.size())
return;
values_.reserve(rhs.size());
for(auto rhs_v : rhs)
{
// if(!rhs.normalized_)
rhs_v = double(rhs_v - i_min) / (i_max - i_min);
uint16_t v_out = rhs_v * std::numeric_limits<uint16_t>::max();
values_.push_back(v_out);
}
}
// specialization: unsigned char --> float
template<>
template<>
void CompressedVector<float>::copyValues(const std::vector<unsigned char>& rhs, double i_min, double i_max)
{
// std::cout<<"specialization: unsigned char --> float"<<std::endl;
values_.clear();
// normalized_ = true; // ALWAYS NORMALIZED FOR NON-FLOAT TYPES
if(!rhs.size())
return;
values_.reserve(rhs.size());
for(auto rhs_v : rhs)
{
float rhs_v_0_1 = float(rhs_v) / std::numeric_limits<unsigned char>::max();
values_.push_back(rhs_v_0_1);
}
}
// specialization: uint16_t --> float
template<>
template<>
void CompressedVector<float>::copyValues(const std::vector<uint16_t>& rhs, double i_min, double i_max)
{
// std::cout<<"specialization: uint16_t --> float"<<std::endl;
values_.clear();
// normalized_ = true; // ALWAYS NORMALIZED FOR NON-FLOAT TYPES
if(!rhs.size())
return;
values_.reserve(rhs.size());
for(auto rhs_v : rhs)
{
float rhs_v_0_1 = float(rhs_v) / std::numeric_limits<uint16_t>::max();
values_.push_back(rhs_v_0_1);
}
}
// specialization: unsigned char --> double
template<>
template<>
void CompressedVector<double>::copyValues(const std::vector<unsigned char>& rhs, double i_min, double i_max)
{
// std::cout<<"specialization: unsigned char --> double"<<std::endl;
values_.clear();
// normalized_ = true; // ALWAYS NORMALIZED FOR NON-FLOAT TYPES
if(!rhs.size())
return;
values_.reserve(rhs.size());
for(auto rhs_v : rhs)
{
double rhs_v_0_1 = double(rhs_v) / std::numeric_limits<unsigned char>::max();
values_.push_back(rhs_v_0_1);
}
}
// specialization: uint16_t --> double
template<>
template<>
void CompressedVector<double>::copyValues(const std::vector<uint16_t>& rhs, double i_min, double i_max)
{
// std::cout<<"specialization: uint16_t --> double"<<std::endl;
values_.clear();
// normalized_ = true; // ALWAYS NORMALIZED FOR NON-FLOAT TYPES
if(!rhs.size())
return;
values_.reserve(rhs.size());
for(auto rhs_v : rhs)
{
double rhs_v_0_1 = double(rhs_v) / std::numeric_limits<uint16_t>::max();
values_.push_back(rhs_v_0_1);
}
}
} // namespace habdec

Wyświetl plik

@ -1,3 +1,4 @@
/*
Copyright 2018 Michal Fratczak
@ -18,6 +19,8 @@
along with habdec. If not, see <https://www.gnu.org/licenses/>.
*/
#include "habdec_ws_protocol.h"
#include <cstdlib>
#include <memory>
@ -41,97 +44,136 @@ namespace websocket = boost::beast::websocket;
#include "NetTransport.h"
#include "habitat/habitat_interface.h"
extern bool G_DO_EXIT;
// i_param == "" --> sends all parameters
void EchoParameter(const std::string i_param, websocket::stream<tcp::socket>& ws)
std::vector<std::shared_ptr<HabdecMessage> > EchoParameter(const std::string i_param)
{
using namespace std;
std::vector<std::shared_ptr<HabdecMessage> > result; // response messages
smatch match;
ws.text(true);
if(i_param == "frequency" || i_param == "")
{
double frequency = 0;
GLOBALS::get().p_iq_source_->getOption("frequency_double", &frequency);
frequency /= 1e6;
string o_command = "cmd::set:frequency=" + to_string(frequency);
ws.write( boost::asio::buffer(o_command.c_str(), o_command.size()) );
auto p_msg = std::make_shared<HabdecMessage>();
p_msg->to_all_clients_ = true;
p_msg->data_stream_<<"cmd::set:frequency="<<frequency;
result.push_back(p_msg);
}
if(i_param == "ppm" || i_param == "")
{
double ppm = 0;
GLOBALS::get().p_iq_source_->getOption("ppm_double", &ppm);
string o_command = "cmd::set:ppm=" + to_string(ppm);
ws.write( boost::asio::buffer(o_command.c_str(), o_command.size()) );
auto p_msg = std::make_shared<HabdecMessage>();
p_msg->to_all_clients_ = true;
p_msg->data_stream_<<"cmd::set:ppm="<<ppm;
result.push_back(p_msg);
}
if(i_param == "gain" || i_param == "")
{
double gain = 0;
GLOBALS::get().p_iq_source_->getOption("gain_double", &gain);
string o_command = "cmd::set:gain=" + to_string(gain);
ws.write( boost::asio::buffer(o_command.c_str(), o_command.size()) );
auto p_msg = std::make_shared<HabdecMessage>();
p_msg->to_all_clients_ = true;
p_msg->data_stream_<<"cmd::set:gain="<<gain;
result.push_back(p_msg);
}
if(i_param == "baud" || i_param == "")
{
size_t baud = GLOBALS::get().decoder_.baud();
string o_command = "cmd::set:baud=" + to_string(baud);
ws.write( boost::asio::buffer(o_command.c_str(), o_command.size()) );
auto p_msg = std::make_shared<HabdecMessage>();
p_msg->to_all_clients_ = true;
p_msg->data_stream_<<"cmd::set:baud="<<baud;
result.push_back(p_msg);
}
if(i_param == "rtty_bits" || i_param == "")
{
size_t rtty_bits = GLOBALS::get().decoder_.rtty_bits();
string o_command = "cmd::set:rtty_bits=" + to_string(rtty_bits);
ws.write( boost::asio::buffer(o_command.c_str(), o_command.size()) );
auto p_msg = std::make_shared<HabdecMessage>();
p_msg->to_all_clients_ = true;
p_msg->data_stream_<<"cmd::set:rtty_bits="<<rtty_bits;
result.push_back(p_msg);
}
if(i_param == "rtty_stops" || i_param == "")
{
float rtty_stops = GLOBALS::get().decoder_.rtty_stops();
string o_command = "cmd::set:rtty_stops=" + to_string(rtty_stops);
ws.write( boost::asio::buffer(o_command.c_str(), o_command.size()) );
auto p_msg = std::make_shared<HabdecMessage>();
p_msg->to_all_clients_ = true;
p_msg->data_stream_<<"cmd::set:rtty_stops="<<rtty_stops;
result.push_back(p_msg);
}
if(i_param == "lowpass_bw" || i_param == "")
{
string o_command = "cmd::set:lowpass_bw=" + to_string(GLOBALS::get().decoder_.lowpass_bw());
ws.write( boost::asio::buffer(o_command.c_str(), o_command.size()) );
auto p_msg = std::make_shared<HabdecMessage>();
p_msg->to_all_clients_ = true;
p_msg->data_stream_<<"cmd::set:lowpass_bw="<<GLOBALS::get().decoder_.lowpass_bw();
result.push_back(p_msg);
}
if(i_param == "lowpass_trans" || i_param == "")
{
string o_command = "cmd::set:lowpass_trans=" + to_string(GLOBALS::get().decoder_.lowpass_trans());
ws.write( boost::asio::buffer(o_command.c_str(), o_command.size()) );
auto p_msg = std::make_shared<HabdecMessage>();
p_msg->to_all_clients_ = true;
p_msg->data_stream_<<"cmd::set:lowpass_trans="<<GLOBALS::get().decoder_.lowpass_trans();
result.push_back(p_msg);
}
if(i_param == "biastee" || i_param == "")
{
double biastee = 0;
GLOBALS::get().p_iq_source_->getOption("biastee_double", &biastee);
string o_command = "cmd::set:biastee=" + to_string(biastee);
ws.write( boost::asio::buffer(o_command.c_str(), o_command.size()) );
auto p_msg = std::make_shared<HabdecMessage>();
p_msg->to_all_clients_ = true;
p_msg->data_stream_<<"cmd::set:biastee="<<biastee;
result.push_back(p_msg);
}
if(i_param == "afc" || i_param == "")
{
string o_command = "cmd::set:afc=" + to_string(GLOBALS::get().par_.afc_);
ws.write( boost::asio::buffer(o_command.c_str(), o_command.size()) );
auto p_msg = std::make_shared<HabdecMessage>();
p_msg->to_all_clients_ = true;
p_msg->data_stream_<<"cmd::set:afc="<<GLOBALS::get().par_.afc_;
result.push_back(p_msg);
}
if(i_param == "decimation" || i_param == "")
{
int decim_factor_log = std::log2( GLOBALS::get().decoder_.getDecimationFactor() );
string o_command = "cmd::set:decimation=" + to_string(decim_factor_log);
ws.write( boost::asio::buffer(o_command.c_str(), o_command.size()) );
}
if(i_param == "sampling_rate" || i_param == "")
{
double sr = 0;
GLOBALS::get().p_iq_source_->getOption("sampling_rate_double", &sr);
string o_command = "cmd::info:sampling_rate=" + to_string(sr);
ws.write( boost::asio::buffer(o_command.c_str(), o_command.size()) );
auto p_msg = std::make_shared<HabdecMessage>();
p_msg->to_all_clients_ = true;
p_msg->data_stream_<<"cmd::set:decimation="<<decim_factor_log;
result.push_back(p_msg);
}
if(i_param == "dc_remove" || i_param == "")
{
double dc_rem = GLOBALS::get().decoder_.dc_remove();
string o_command = "cmd::set:dc_remove=" + to_string(dc_rem);
ws.write( boost::asio::buffer(o_command.c_str(), o_command.size()) );
auto p_msg = std::make_shared<HabdecMessage>();
p_msg->to_all_clients_ = true;
p_msg->data_stream_<<"cmd::set:dc_remove="<<dc_rem;
result.push_back(p_msg);
}
if(i_param == "datasize" || i_param == "")
{
int datasize = 1;
@ -142,88 +184,107 @@ void EchoParameter(const std::string i_param, websocket::stream<tcp::socket>& ws
if( GLOBALS::get().par_.transport_data_type_ == GLOBALS::TransportDataType::kFloat )
datasize = 4;
string o_command = "cmd::set:datasize=" + to_string(datasize);
ws.write( boost::asio::buffer(o_command.c_str(), o_command.size()) );
auto p_msg = std::make_shared<HabdecMessage>();
p_msg->to_all_clients_ = true;
p_msg->data_stream_<<"cmd::set:datasize="<<datasize;
result.push_back(p_msg);
}
if(i_param == "sampling_rate" || i_param == "")
{
double sr = 0;
GLOBALS::get().p_iq_source_->getOption("sampling_rate_double", &sr);
auto p_msg = std::make_shared<HabdecMessage>();
p_msg->to_all_clients_ = true;
p_msg->data_stream_<<"cmd::info:sampling_rate="<<sr;
result.push_back(p_msg);
}
return result;
}
bool HandleCommand(const std::string i_command, websocket::stream<tcp::socket>& ws)
std::vector< std::shared_ptr<HabdecMessage> > HandleCommand(const std::string i_command)
{
std::vector< std::shared_ptr<HabdecMessage> > result; // response messages
if(!GLOBALS::get().p_iq_source_)
return false;
return result;
using namespace std;
smatch match;
// GET
// GET:
if( i_command.size() > 4 && i_command.substr(0,4) == "get:" )
EchoParameter( i_command.substr(4), ws );
// SET
{
result = EchoParameter( i_command.substr(4) );
}
// SET:
else if( regex_match(i_command, match, regex(R"_(set\:frequency=([+-]?([0-9]*[.])?[0-9]+))_")) && match.size() > 1 )
{
double frequency = stod(match[1]);
frequency *= 1e6;
GLOBALS::get().p_iq_source_->setOption("frequency_double", &frequency);
GLOBALS::get().par_.frequency_ = frequency;
EchoParameter("frequency", ws);
result = EchoParameter("frequency");
}
else if( regex_match(i_command, match, regex(R"_(set\:decimation=(\d+))_")) && match.size() > 1 )
{
int decim_factor_log = stoi(match[1]);
GLOBALS::get().decoder_.setupDecimationStagesFactor( pow(2,decim_factor_log) );
GLOBALS::get().par_.decimation_ = decim_factor_log;
EchoParameter("decimation", ws);
result = EchoParameter("decimation");
}
else if( regex_match(i_command, match, regex(R"_(set\:ppm=([+-]?([0-9]*[.])?[0-9]+))_")) && match.size() > 1 )
{
double ppm = stod(match[1]);
GLOBALS::get().p_iq_source_->setOption("ppm_double", &ppm);
GLOBALS::get().par_.ppm_ = ppm;
EchoParameter("ppm", ws);
result = EchoParameter("ppm");
}
else if( regex_match(i_command, match, regex(R"_(set\:gain=([+-]?([0-9]*[.])?[0-9]+))_")) && match.size() > 1 )
{
double gain = stod(match[1]);
GLOBALS::get().p_iq_source_->setOption("gain_double", &gain);
GLOBALS::get().par_.gain_ = gain;
EchoParameter("gain", ws);
result = EchoParameter("gain");
}
else if( regex_match(i_command, match, regex(R"_(set\:lowpass_bw=([+-]?([0-9]*[.])?[0-9]+))_")) && match.size() > 1 )
{
float lpbw = stof(match[1]);
GLOBALS::get().decoder_.lowpass_bw(lpbw);
GLOBALS::get().par_.lowpass_bw_Hz_ = lpbw;
EchoParameter("lowpass_bw", ws);
result = EchoParameter("lowpass_bw");
}
else if( regex_match(i_command, match, regex(R"_(set\:lowpass_trans=([+-]?([0-9]*[.])?[0-9]+))_")) && match.size() > 1 )
{
float lptr = stof(match[1]);
GLOBALS::get().decoder_.lowpass_trans(lptr);
GLOBALS::get().par_.lowpass_tr_ = lptr;
EchoParameter("lowpass_trans", ws);
result = EchoParameter("lowpass_trans");
}
else if( regex_match(i_command, match, regex(R"_(set\:baud=([+-]?([0-9]*[.])?[0-9]+))_")) && match.size() > 1 )
{
float baud = stof(match[1]);
GLOBALS::get().decoder_.baud( baud );
GLOBALS::get().par_.baud_ = baud;
EchoParameter("baud", ws);
result = EchoParameter("baud");
}
else if( regex_match(i_command, match, regex(R"_(set\:rtty_bits=(\d+))_")) && match.size() > 1 )
{
int rtty_bits = stoi(match[1]);
GLOBALS::get().decoder_.rtty_bits(rtty_bits);
GLOBALS::get().par_.rtty_ascii_bits_ = rtty_bits;
EchoParameter("rtty_bits", ws);
result = EchoParameter("rtty_bits");
}
else if( regex_match(i_command, match, regex(R"_(set\:rtty_stops=([+-]?([0-9]*[.])?[0-9]+))_")) && match.size() > 1 )
{
int rtty_stops = stoi(match[1]);
GLOBALS::get().decoder_.rtty_stops(rtty_stops);
GLOBALS::get().par_.rtty_ascii_stops_ = rtty_stops;
EchoParameter("rtty_stops", ws);
result = EchoParameter("rtty_stops");
}
else if( regex_match(i_command, match, regex(R"_(set\:datasize=(\d+))_")) && match.size() > 1 )
{
@ -244,27 +305,27 @@ bool HandleCommand(const std::string i_command, websocket::stream<tcp::socket>&
if(datasize == 1) GLOBALS::get().par_.transport_data_type_ = GLOBALS::TransportDataType::kChar; // 8bit
if(datasize == 2) GLOBALS::get().par_.transport_data_type_ = GLOBALS::TransportDataType::kShort;// 16 bit
if(datasize == 4) GLOBALS::get().par_.transport_data_type_ = GLOBALS::TransportDataType::kFloat;// 32bit
EchoParameter("datasize", ws);
result = EchoParameter("datasize");
}
else if( regex_match(i_command, match, regex(R"_(set\:biastee=([0-9])+)_")) && match.size() > 1 )
{
double value = stod(match[1]);
GLOBALS::get().p_iq_source_->setOption("biastee_double", &value);
GLOBALS::get().par_.biast_ = value;
EchoParameter("biastee", ws);
result = EchoParameter("biastee");
}
else if( regex_match(i_command, match, regex(R"_(set\:afc=([0-9])+)_")) && match.size() > 1 )
{
int value = stoi(match[1]);
GLOBALS::get().par_.afc_ = value;
EchoParameter("afc", ws);
result = EchoParameter("afc");
}
else if( regex_match(i_command, match, regex(R"_(set\:dc_remove=([0-9])+)_")) && match.size() > 1 )
{
int value = stoi(match[1]);
GLOBALS::get().decoder_.dc_remove(value);
GLOBALS::get().par_.dc_remove_ = value;
EchoParameter("dc_remove", ws);
result = EchoParameter("dc_remove");
}
else if( regex_match(i_command, match, regex(R"_(set\:payload=(\w+))_")) && match.size() > 1 )
{
@ -277,7 +338,7 @@ bool HandleCommand(const std::string i_command, websocket::stream<tcp::socket>&
}
catch(const exception& e) {
cout<<"Failed loading flights list: "<<e.what()<<endl;
return false;
return result;
}
auto& DEC = GLOBALS::get().decoder_;
@ -309,17 +370,21 @@ bool HandleCommand(const std::string i_command, websocket::stream<tcp::socket>&
}
}
EchoParameter("frequency", ws);
EchoParameter("baud", ws);
EchoParameter("rtty_bits", ws);
EchoParameter("rtty_stops", ws);
auto res_fr = EchoParameter("frequency");
auto res_bd = EchoParameter("baud");
auto res_rb = EchoParameter("rtty_bits");
auto res_rs = EchoParameter("rtty_stops");
result.insert( result.end(), res_fr.begin(), res_fr.end() );
result.insert( result.end(), res_bd.begin(), res_bd.end() );
result.insert( result.end(), res_rb.begin(), res_rb.end() );
result.insert( result.end(), res_rs.begin(), res_rs.end() );
}
else
{
cout<<C_RED<<"Unknown command: "<<i_command<<C_OFF<<endl;
}
return true;
return result;
}
@ -394,7 +459,7 @@ size_t SpectrumToStream(std::stringstream& res_stream, float zoom, int resolutio
}
// serialize demod
// serialize demodulation
size_t DemodToStream(std::stringstream& res_stream, int resolution)
{
std::vector<TDecoder::TValue> demod_acc;
@ -417,164 +482,106 @@ size_t DemodToStream(std::stringstream& res_stream, int resolution)
return demod_acc.size();
}
void WS_CLIENT_SESSION_THREAD(tcp::socket& i_socket)
std::vector< std::shared_ptr<HabdecMessage> > HandleRequest(std::string command)
{
using namespace std;
try {
websocket::stream<tcp::socket> ws{std::move(i_socket)};
ws.accept();
// this line does not work
// ws.accept_ex([](websocket::response_type &m) { m.insert(beast::http::field::server, "habdec_server"); });
// cout<<"HandleRequest: "<<command<<endl;
ws.auto_fragment(false);
std::vector< std::shared_ptr<HabdecMessage> > result; // response messages
// result.emplace_back( std::make_shared<HabdecMessage>() );
// auto& msg = *result[0];
while(!G_DO_EXIT)
{
beast::multi_buffer buffer;
ws.read(buffer);
smatch match;
string command = beast::buffers_to_string(buffer.data());
buffer.consume(buffer.size());
smatch match;
// power,res=resolution_value,zoom=zoom_value
if( regex_match(command, match, regex(R"_(cmd\:\:power\:res=(\d+),zoom=([+-]?([0-9]*[.])?[0-9]+))_")) && match.size() > 2 )
{
stringstream res_stream;
res_stream<<"PWR_";
if(SpectrumToStream( res_stream, stof(match[2]), stoi(match[1]) ))
{
ws.binary(true);
ws.write( boost::asio::buffer(res_stream.str()) );
}
}
// demod=resolution
else if( regex_match(command, match, regex(R"_(cmd\:\:demod\:res=(\d+))_")) && match.size() > 1 )
{
stringstream res_stream;
res_stream<<"DEM_";
if( DemodToStream( res_stream, stoi(match[1]) ) )
{
ws.binary(true);
ws.write( boost::asio::buffer(res_stream.str()) );
}
}
// cmd::sentence
else if( regex_match(command, match, regex(R"_(cmd\:\:sentence)_")) && match.size() > 0 )
{
auto& sen_map = GLOBALS::get().sentences_map_;
string sentence("");
if( sen_map.crbegin() != sen_map.crend() )
sentence = sen_map.crbegin()->second;
string o_command = "cmd::info:sentence=" + sentence;
ws.text(true);
ws.write( boost::asio::buffer( o_command.c_str(), o_command.size()) );
}
// cmd::liveprint
else if( regex_match(command, match, regex(R"_(cmd\:\:liveprint)_")) && match.size() > 0 )
{
string o_command = "cmd::info:liveprint=" + GLOBALS::get().decoder_.getRTTY();
ws.text(true);
ws.write( boost::asio::buffer(o_command.c_str(), o_command.size()) );
}
// statistics
else if( regex_match(command, match, regex(R"_(cmd\:\:stats)_")) && match.size() > 0 )
{
auto& stats = GLOBALS::get().stats_;
string o_command = "cmd::info:stats=";
o_command += "ok:" + to_string(stats.num_ok_);
o_command += ",dist_line:" + to_string(stats.D_.dist_line_);
o_command += ",dist_circ:" + to_string(stats.D_.dist_circle_);
o_command += ",max_dist:" + to_string(stats.dist_max_);
o_command += ",min_elev:" + to_string(stats.elev_min_);
o_command += ",lat:" + to_string(GLOBALS::get().par_.station_lat_);
o_command += ",lon:" + to_string(GLOBALS::get().par_.station_lon_);
o_command += ",alt:" + to_string(GLOBALS::get().par_.station_alt_);
ws.text(true);
ws.write( boost::asio::buffer(o_command.c_str(), o_command.size()) );
}
// cmd::****
else if(command.size()>5 && command.substr(0,5) == "cmd::")
{
cout<<C_MAGENTA<<"Command "<<command<<C_OFF<<endl;
ws.text(true);
HandleCommand(command.substr(5), ws);
}
// give last sentence
static thread_local int last_sentence_id = -1;
auto& sen_map = GLOBALS::get().sentences_map_;
if( sen_map.crbegin() != sen_map.crend() && sen_map.crbegin()->first != last_sentence_id )
{
string o_command = "cmd::info:sentence=" + sen_map.crbegin()->second;
ws.text(true);
ws.write( boost::asio::buffer( o_command.c_str(), o_command.size()) );
last_sentence_id = sen_map.crbegin()->first;
}
// send parameters if they changed
static thread_local typename GLOBALS::PARAMS params;
if(params != GLOBALS::get().par_)
{
// cout<<"need to update parametes "<<std::this_thread::get_id()<<endl;
EchoParameter("", ws);
params = GLOBALS::get().par_;
}
}
// power:res=resolution_value,zoom=zoom_value
if( regex_match(command, match, regex(R"_(cmd\:\:power\:res=(\d+),zoom=([+-]?([0-9]*[.])?[0-9]+))_")) && match.size() > 2 )
{
auto p_msg = std::make_shared<HabdecMessage>();
p_msg->is_binary_ = true;
p_msg->data_stream_<<"PWR_";
if(SpectrumToStream( p_msg->data_stream_, stof(match[2]), stoi(match[1]) ))
result.push_back(p_msg);
}
catch(const boost::system::system_error& se) {
if(se.code() != websocket::error::closed)
cout << "Error: boost::system::system_error: " << se.code().message() << endl;
else
cout << "Session Closed. " << se.code().message() << endl;
// demod:res=resolution_value
else if( regex_match(command, match, regex(R"_(cmd\:\:demod\:res=(\d+))_")) && match.size() > 1 )
{
auto p_msg = std::make_shared<HabdecMessage>();
p_msg->is_binary_ = true;
p_msg->data_stream_<<"DEM_";
if( DemodToStream( p_msg->data_stream_, stoi(match[1]) ) )
result.push_back(p_msg);
}
catch(const exception& e) {
cout << "Session Error: " << e.what() << endl;
// cmd::sentence
else if( regex_match(command, match, regex(R"_(cmd\:\:sentence)_")) && match.size() > 0 )
{
auto& sen_map = GLOBALS::get().sentences_map_;
string sentence("");
if( sen_map.crbegin() != sen_map.crend() )
sentence = sen_map.crbegin()->second;
auto p_msg = std::make_shared<HabdecMessage>();
p_msg->data_stream_<<"cmd::info:sentence="<<sentence;
result.push_back(p_msg);
}
// cmd::liveprint
else if( regex_match(command, match, regex(R"_(cmd\:\:liveprint)_")) && match.size() > 0 )
{
auto p_msg = std::make_shared<HabdecMessage>();
p_msg->data_stream_<<"cmd::info:liveprint="<<GLOBALS::get().decoder_.getRTTY();
result.push_back(p_msg);
}
// cmd::stats
else if( regex_match(command, match, regex(R"_(cmd\:\:stats)_")) && match.size() > 0 )
{
auto& stats = GLOBALS::get().stats_;
auto p_msg = std::make_shared<HabdecMessage>();
p_msg->data_stream_<<"cmd::info:stats=";
p_msg->data_stream_<<"ok:"<<stats.num_ok_;
p_msg->data_stream_<<",dist_line:"<<stats.D_.dist_line_;
p_msg->data_stream_<<",dist_circ:"<<stats.D_.dist_circle_;
p_msg->data_stream_<<",max_dist:"<<stats.dist_max_;
p_msg->data_stream_<<",min_elev:"<<stats.elev_min_;
p_msg->data_stream_<<",lat:"<<GLOBALS::get().par_.station_lat_;
p_msg->data_stream_<<",lon:"<<GLOBALS::get().par_.station_lon_;
p_msg->data_stream_<<",alt:"<<GLOBALS::get().par_.station_alt_;
result.push_back(p_msg);
cout<<"STATS: "<<p_msg->data_stream_.str()<<endl;
}
// cmd::****
else if(command.size()>5 && command.substr(0,5) == "cmd::")
{
cout<<C_MAGENTA<<"Command "<<command<<C_OFF<<endl;
auto responses = HandleCommand(command.substr(5));
result.insert( result.end(), responses.begin(), responses.end() );
}
cout << "Session END."<<endl;
/*
// give last sentence
static thread_local int last_sentence_id = -1;
auto& sen_map = GLOBALS::get().sentences_map_;
if( sen_map.crbegin() != sen_map.crend() && sen_map.crbegin()->first != last_sentence_id )
{
// msg.data_stream_<<"cmd::info:sentence="<<sen_map.crbegin()->second;
last_sentence_id = sen_map.crbegin()->first;
}
*/
// send parameters if they changed
static thread_local typename GLOBALS::PARAMS params;
if(params != GLOBALS::get().par_)
{
auto responses = EchoParameter( "" );
result.insert( result.end(), responses.begin(), responses.end() );
params = GLOBALS::get().par_;
}
return result;
}
void RunCommandServer(const std::string command_host, const int command_port)
{
using namespace std;
using tcp = boost::asio::ip::tcp;
if(!command_port || command_host == "")
{
cout<<C_RED<<"No Command host or port."<<C_OFF<<endl;
return;
}
while(!G_DO_EXIT)
{
if (!GLOBALS::get().p_iq_source_)
continue;
try {
auto const address = boost::asio::ip::make_address( command_host );
auto const port = static_cast<unsigned short>( command_port );
boost::asio::io_context ioc{1};
tcp::acceptor acceptor{ioc, {address, port}};
while(!G_DO_EXIT)
{
tcp::socket socket{ioc};
acceptor.accept(socket); // Block until we get a connection
cout<<C_MAGENTA<<"\nNew Client"<<C_OFF<<endl;
std::thread{std::bind(&WS_CLIENT_SESSION_THREAD, std::move(socket))}.detach();
}
}
catch(const exception& e) {
cout<<C_RED<<"Failed starting Command Server\n"<<e.what()<<C_OFF<<endl;
std::this_thread::sleep_for( ( std::chrono::duration<double, std::milli>(1000) ));
}
}
}

Wyświetl plik

@ -20,7 +20,19 @@
#pragma once
#include <memory>
#include <string>
#include <thread>
#include <sstream>
#include <vector>
void RunCommandServer(const std::string command_host, const int command_port);
// internal message passing structure
// this is not part of protocol
struct HabdecMessage
{
bool is_binary_ = false;
bool to_all_clients_ = false;
std::stringstream data_stream_;
};
// handles request and returns list of reponses
std::vector< std::shared_ptr<HabdecMessage> > HandleRequest(std::string request);

Wyświetl plik

@ -0,0 +1,113 @@
/*
Copyright 2018 Michal Fratczak
This file is part of habdec.
habdec is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
habdec is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with habdec. If not, see <https://www.gnu.org/licenses/>.
*/
#include "http_session.h"
#include "websocket_session.h"
#include <iostream>
http_session::http_session(
tcp::socket socket,
std::shared_ptr<WebsocketServer> p_ws_server)
: socket_(std::move(socket)),
p_ws_server_(p_ws_server)
{
}
void http_session::run()
{
boost::beast::http::async_read(socket_, buffer_, req_,
[self = shared_from_this()]
(error_code ec, std::size_t bytes)
{
self->on_read(ec, bytes);
});
}
void http_session::fail(error_code ec, char const* what)
{
if(ec == net::error::operation_aborted)
return;
std::cerr << what << ": " << ec.message() << "\n";
}
void http_session::on_read(error_code ec, std::size_t)
{
if(ec == boost::beast::http::error::end_of_stream)
{
socket_.shutdown(tcp::socket::shutdown_send, ec);
return;
}
if(ec)
return fail(ec, "read");
if(boost::beast::websocket::is_upgrade(req_))
{
std::make_shared<websocket_session>(
std::move(socket_), p_ws_server_
)->run(std::move(req_));
return;
}
// Send the response
/*
handle_request(state_->doc_root(), std::move(req_),
[this](auto&& response)
{
// The lifetime of the message has to extend
// for the duration of the async operation so
// we use a shared_ptr to manage it.
using response_type = typename std::decay<decltype(response)>::type;
auto sp = std::make_shared<response_type>(std::forward<decltype(response)>(response));
// Write the response
auto self = shared_from_this();
http::async_write(this->socket_, *sp,
[self, sp](
error_code ec, std::size_t bytes)
{
self->on_write(ec, bytes, sp->need_eof());
});
});
*/
}
void http_session::on_write(error_code ec, std::size_t, bool close)
{
if(ec)
return fail(ec, "write");
if(close)
{
socket_.shutdown(tcp::socket::shutdown_send, ec);
return;
}
// Clear contents of the request message,
// otherwise the read behavior is undefined.
req_ = {};
boost::beast::http::async_read(socket_, buffer_, req_,
[self = shared_from_this()]
(error_code ec, std::size_t bytes)
{
self->on_read(ec, bytes);
});
}

Wyświetl plik

@ -0,0 +1,51 @@
/*
Copyright 2018 Michal Fratczak
This file is part of habdec.
habdec is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
habdec is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with habdec. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#include <cstdlib>
#include <memory>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include "ws_server.h"
namespace net = boost::asio; // namespace asio
using tcp = net::ip::tcp; // from <boost/asio/ip/tcp.hpp>
using error_code = boost::system::error_code; // from <boost/system/error_code.hpp>
class http_session : public std::enable_shared_from_this<http_session>
{
tcp::socket socket_;
boost::beast::flat_buffer buffer_;
boost::beast::http::request<boost::beast::http::string_body> req_;
std::shared_ptr<WebsocketServer> p_ws_server_;
void fail(error_code ec, char const* what);
void on_read(error_code ec, std::size_t);
void on_write(error_code ec, std::size_t, bool close);
public:
http_session(tcp::socket socket, std::shared_ptr<WebsocketServer> p_ws_server);
void run();
};

Wyświetl plik

@ -0,0 +1,96 @@
/*
Copyright 2018 Michal Fratczak
This file is part of habdec.
habdec is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
habdec is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with habdec. If not, see <https://www.gnu.org/licenses/>.
*/
#include "listener.h"
#include "http_session.h"
#include <iostream>
listener::listener(net::io_context& ioc, tcp::endpoint endpoint, WebsocketServer& ws_server)
: acceptor_(ioc) , socket_(ioc), p_ws_server_(&ws_server)
{
error_code ec;
acceptor_.open(endpoint.protocol(), ec);
if(ec)
{
fail(ec, "open");
return;
}
acceptor_.set_option(net::socket_base::reuse_address(true));
if(ec)
{
fail(ec, "set_option");
return;
}
acceptor_.bind(endpoint, ec);
if(ec)
{
fail(ec, "bind");
return;
}
acceptor_.listen(
net::socket_base::max_listen_connections, ec);
if(ec)
{
fail(ec, "listen");
return;
}
}
void listener::run()
{
acceptor_.async_accept(
socket_,
[self = shared_from_this()](error_code ec)
{
self->on_accept(ec);
});
}
// Report a failure
void listener::fail(error_code ec, char const* what)
{
if(ec == net::error::operation_aborted)
return;
std::cerr << what << ": " << ec.message() << "\n";
}
void listener::on_accept(error_code ec)
{
if(ec)
return fail(ec, "accept");
else
std::make_shared<http_session>(
std::move(socket_), p_ws_server_
)->run(); // moved-from socket is still valid
// Accept another connection
acceptor_.async_accept(
socket_,
[self = shared_from_this()](error_code ec)
{
self->on_accept(ec);
});
}

Wyświetl plik

@ -0,0 +1,47 @@
/*
Copyright 2018 Michal Fratczak
This file is part of habdec.
habdec is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
habdec is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with habdec. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#include <memory>
#include <string>
#include <boost/asio.hpp>
#include "ws_server.h"
namespace net = boost::asio; // namespace asio
using tcp = net::ip::tcp; // from <boost/asio/ip/tcp.hpp>
using error_code = boost::system::error_code; // from <boost/system/error_code.hpp>
class listener : public std::enable_shared_from_this<listener>
{
tcp::acceptor acceptor_;
tcp::socket socket_;
std::shared_ptr<WebsocketServer> p_ws_server_;
void fail(error_code ec, char const* what);
void on_accept(error_code ec);
public:
listener(net::io_context& ioc, tcp::endpoint endpoint, WebsocketServer& ws_server);
void run();
};

Wyświetl plik

@ -26,6 +26,7 @@
#include <iostream>
#include <future>
#include <cstdlib>
#include <unordered_set>
#include <SoapySDR/Device.hpp>
#include <SoapySDR/Formats.hpp>
@ -37,12 +38,10 @@
#include "common/console_colors.h"
#include "habitat/habitat_interface.h"
#include "GLOBALS.h"
#include "server.h"
#include "ws_server.h"
#include "common/git_repo_sha1.h"
bool G_DO_EXIT = false;
using namespace std;
@ -223,7 +222,7 @@ void DECODER_THREAD()
samples.resize(256*256);
samples.samplingRate( p_iq_src->samplingRate() );
while(!G_DO_EXIT)
while(1)
{
auto _start = std::chrono::high_resolution_clock::now();
@ -274,13 +273,24 @@ void DECODER_THREAD()
}
void SentenceCallback(std::string callsign, std::string data, std::string crc)
void SentenceCallback(std::string callsign, std::string data, std::string crc, std::shared_ptr<WebsocketServer> p_ws)
{
using namespace std;
using Ms = std::chrono::milliseconds;
string sentence = callsign + "," + data + "*" + crc;
int sentence_number = stoi( data.substr(0, data.find(',')) );
const string sentence = callsign + "," + data + "*" + crc;
const int sentence_number = stoi( data.substr(0, data.find(',')) );
// notify all websocket clients
if(p_ws)
{
auto p_sentence_msg = std::make_shared<HabdecMessage>();
p_sentence_msg->to_all_clients_ = true;
p_sentence_msg->data_stream_<<"cmd::info:sentence="<<sentence;
p_ws->sessions_send(p_sentence_msg);
}
// register in globals
if( GLOBALS::get().sentences_map_mtx_.try_lock_for(Ms(1000)) )
@ -349,6 +359,8 @@ void SentenceCallback(std::string callsign, std::string data, std::string crc)
int main(int argc, char** argv)
{
using namespace std;
signal( SIGINT, [](int){exit(1);} );
signal( SIGILL, [](int){exit(1);} );
signal( SIGFPE, [](int){exit(1);} );
@ -357,13 +369,13 @@ int main(int argc, char** argv)
signal( SIGABRT, [](int){exit(1);} );
// thousands separator
struct thousand_separators : std::numpunct<char>
struct thousand_separators : numpunct<char>
{
char do_thousands_sep() const { return ','; }
string do_grouping() const { return "\3"; }
};
try{
std::cout.imbue( std::locale(locale(""), new thousand_separators) );
cout.imbue( locale(locale(""), new thousand_separators) );
}
catch(exception& e) {
@ -421,20 +433,53 @@ int main(int argc, char** argv)
cout<<"Current Options: "<<endl;
GLOBALS::Print();
// websocket server
/*
WebsocketServer ws_server(GLOBALS::get().par_.command_host_ , GLOBALS::get().par_.command_port_);
threads.emplace( new std::thread(
[&ws_server]() { ws_server(); }
) );
*/
shared_ptr<WebsocketServer> p_ws_server = make_shared<WebsocketServer>(
GLOBALS::get().par_.command_host_ , GLOBALS::get().par_.command_port_);
DECODER.success_callback_ =
[](std::string callsign, std::string data, std::string crc)
[p_ws_server](string callsign, string data, string crc)
{
std::async(std::launch::async, SentenceCallback, callsign, data, crc);
async(launch::async, SentenceCallback, callsign, data, crc, p_ws_server);
// SentenceCallback(callsign, data, crc, ws_server);
};
// feed decoder with IQ samples and decode
std::thread* decoder_thread = new std::thread(DECODER_THREAD);
// START THREADS
//
unordered_set<thread*> threads;
// websocket server thread. this call is blocking
RunCommandServer( GLOBALS::get().par_.command_host_ , GLOBALS::get().par_.command_port_ );
threads.emplace( new thread(
[p_ws_server]() { (*p_ws_server)(); }
) );
G_DO_EXIT = true;
decoder_thread->join();
// Decoder
threads.emplace( new thread(
DECODER_THREAD
) );
/*
// message broker
MessageBroker msg_broker(ws_server);
threads.emplace( new std::thread(
[&msg_broker]() { msg_broker(); }
) );
*/
// while(1);
// G_DO_EXIT = true;
for(auto t : threads)
t->join();
return 0;
}

Wyświetl plik

@ -0,0 +1,126 @@
/*
Copyright 2018 Michal Fratczak
This file is part of habdec.
habdec is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
habdec is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with habdec. If not, see <https://www.gnu.org/licenses/>.
*/
#include "websocket_session.h"
#include "habdec_ws_protocol.h"
websocket_session::websocket_session(
tcp::socket socket,
std::shared_ptr<WebsocketServer> p_ws_server )
: ws_(std::move(socket))
{
p_ws_server_ = p_ws_server;
}
websocket_session::~websocket_session()
{
// Remove this session from the list of active sessions
// state_->leave(*this);
p_ws_server_->session_delete(*this);
}
void websocket_session::fail(error_code ec, char const* what)
{
if( ec == net::error::operation_aborted ||
ec == boost::beast::websocket::error::closed)
return;
std::cerr << what << ": " << ec.message() << "\n";
}
void websocket_session::on_accept(error_code ec)
{
if(ec)
return fail(ec, "accept");
p_ws_server_->session_add(*this);
// Read a message
ws_.async_read(
buffer_,
[sp = shared_from_this()](
error_code ec, std::size_t bytes)
{
sp->on_read(ec, bytes);
});
}
void websocket_session::on_read(error_code ec, std::size_t)
{
if(ec)
return fail(ec, "read");
std::string msg_str = boost::beast::buffers_to_string( buffer_.data() );
buffer_.consume(buffer_.size());
auto response_messages = HandleRequest(msg_str);
for(const auto& p_msg : response_messages)
{
if(p_msg->to_all_clients_)
p_ws_server_->sessions_send(p_msg);
else
this->send( p_msg );
}
ws_.async_read(
buffer_,
[sp = shared_from_this()](
error_code ec, std::size_t bytes)
{
sp->on_read(ec, bytes);
});
}
void websocket_session::send(std::shared_ptr<HabdecMessage const> const& i_msg)
{
queue_.push_back(i_msg);
if(queue_.size() > 1)
return;
ws_.binary(i_msg->is_binary_);
ws_.async_write(
net::buffer(queue_.front()->data_stream_.str()),
[sp = shared_from_this()](error_code ec, std::size_t bytes)
{
sp->on_write(ec, bytes);
});
}
void websocket_session::on_write(error_code ec, std::size_t)
{
if(ec)
return fail(ec, "write");
queue_.erase(queue_.begin());
ws_.binary(queue_.front()->is_binary_);
if(!queue_.empty())
ws_.async_write(
net::buffer(queue_.front()->data_stream_.str()),
[sp = shared_from_this()](error_code ec, std::size_t bytes)
{
sp->on_write(ec, bytes);
});
}

Wyświetl plik

@ -0,0 +1,75 @@
/*
Copyright 2018 Michal Fratczak
This file is part of habdec.
habdec is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
habdec is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with habdec. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#include <cstdlib>
#include <memory>
#include <string>
#include <vector>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include "ws_server.h"
#include "habdec_ws_protocol.h"
namespace net = boost::asio; // namespace asio
using tcp = net::ip::tcp; // from <boost/asio/ip/tcp.hpp>
using error_code = boost::system::error_code; // from <boost/system/error_code.hpp>
class websocket_session : public std::enable_shared_from_this<websocket_session>
{
boost::beast::flat_buffer buffer_;
boost::beast::websocket::stream<tcp::socket> ws_;
// std::vector<std::shared_ptr<std::string const>> queue_;
std::vector<std::shared_ptr<HabdecMessage const>> queue_;
std::shared_ptr<WebsocketServer> p_ws_server_;
void fail(error_code ec, char const* what);
void on_accept(error_code ec);
void on_read(error_code ec, std::size_t bytes_transferred);
void on_write(error_code ec, std::size_t bytes_transferred);
public:
websocket_session(tcp::socket socket, std::shared_ptr<WebsocketServer> p_ws_server);
~websocket_session();
template<class Body, class Allocator>
void run(boost::beast::http::request<Body, boost::beast::http::basic_fields<Allocator>> req);
// void send(std::shared_ptr<std::string const> const& i_msg);
void send(std::shared_ptr<HabdecMessage const> const& i_msg);
};
template<class Body, class Allocator>
void websocket_session::run(boost::beast::http::request<Body, boost::beast::http::basic_fields<Allocator>> req)
{
ws_.async_accept(
req,
std::bind(
&websocket_session::on_accept,
shared_from_this(),
std::placeholders::_1));
}

Wyświetl plik

@ -0,0 +1,52 @@
/*
Copyright 2018 Michal Fratczak
This file is part of habdec.
habdec is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
habdec is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with habdec. If not, see <https://www.gnu.org/licenses/>.
*/
#include "ws_server.h"
#include "listener.h"
#include "websocket_session.h"
void WebsocketServer::operator()()
{
using namespace std;
using tcp = boost::asio::ip::tcp;
auto const address = boost::asio::ip::make_address( host_ );
auto const port = static_cast<unsigned short>( port_ );
make_shared<listener>(
ioc_,
tcp::endpoint{address, port},
*this
)->run();
ioc_.run();
}
void WebsocketServer::sessions_send(std::shared_ptr<HabdecMessage const> p_msg)
{
{
std::lock_guard<std::mutex> lock(sessions_mtx_);
for(auto session : ws_sessions_)
session->send(p_msg);
}
}

Wyświetl plik

@ -0,0 +1,64 @@
/*
Copyright 2018 Michal Fratczak
This file is part of habdec.
habdec is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
habdec is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with habdec. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#include <string>
#include <thread>
#include <mutex>
#include <unordered_set>
#include <boost/asio/ip/tcp.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/asio/signal_set.hpp>
#include "habdec_ws_protocol.h"
class websocket_session;
class WebsocketServer : public std::enable_shared_from_this<WebsocketServer>
{
public:
WebsocketServer() = delete;
WebsocketServer(const WebsocketServer&) = delete;
WebsocketServer& operator=(const WebsocketServer&) = delete;
WebsocketServer(std::string host, unsigned int port) : host_(host), port_(port) {}
void operator()();
// sessions
void session_add (websocket_session& session) { ws_sessions_.insert(&session); }
void session_delete (websocket_session& session) { ws_sessions_.erase(&session); }
void sessions_send (std::shared_ptr<HabdecMessage const> p_msg);
private:
std::string host_{"0.0.0.0"};
unsigned int port_{5555};
boost::asio::io_context ioc_{1};
// sessions
std::unordered_set<websocket_session*> ws_sessions_;
std::mutex sessions_mtx_;
};