changed the tsqueue to a circular buffer, update sample queue

bug_fixes_integration_tx
David Michaeli 2021-12-28 22:26:59 +02:00
rodzic 880c561a10
commit f32c2ee63b
8 zmienionych plików z 263 dodań i 113 usunięć

Wyświetl plik

@ -11,7 +11,7 @@ include_directories(${SUPER_DIR})
set(SOURCES_LIB caribou_smi.c)
set(SOURCES ${SOURCES_LIB} test_caribou_smi.c)
set(EXTERN_LIBS ${SUPER_DIR}/io_utils/build/libio_utils.a ${SUPER_DIR}/zf_log/build/libzf_log.a -lpthread)
add_compile_options(-Wall -Wextra -Wno-unused-parameter -Wno-missing-braces -O3)
add_compile_options(-Wall -Wextra -Wno-unused-parameter -Wno-missing-braces -Wno-unused-function -O3)
#Generate the static library from the sources
add_library(caribou_smi STATIC ${SOURCES_LIB})

Wyświetl plik

@ -365,20 +365,25 @@ int caribou_smi_search_offset(uint8_t *buff, int len)
}
//=========================================================================
#define TIMING_PERF_SYNC (0)
void* caribou_smi_analyze_thread(void* arg)
{
//static int a = 0;
int current_data_size = 0;
pthread_t tid = pthread_self();
struct timeval tv_pre = {0};
struct timeval tv_post = {0};
long long total_samples = 0;
double time_pre = 0, batch_time = 0, sample_rate = 0;
double time_post = 0, process_time = 0;
double temp_pre;
double num_samples = 0, num_samples_avg = 0;
// --------------------------------------------
// TIMING PERF VARIABLES
#if (TIMING_PERF_SYNC)
struct timeval tv_pre = {0};
struct timeval tv_post = {0};
long long total_samples = 0;
double time_pre = 0, batch_time = 0, sample_rate = 0;
double time_post = 0, process_time = 0;
double temp_pre;
double num_samples = 0, num_samples_avg = 0;
#endif // TIMING_PERF_SYNC
caribou_smi_stream_st* st = (caribou_smi_stream_st*)arg;
caribou_smi_st* dev = (caribou_smi_st*)st->parent_dev;
caribou_smi_stream_type_en type = (caribou_smi_stream_type_en)(st->stream_id>>1 & 0x1);
@ -391,7 +396,10 @@ void* caribou_smi_analyze_thread(void* arg)
while (st->read_analysis_thread_running)
{
pthread_mutex_lock(&st->read_analysis_lock);
gettimeofday(&tv_pre, NULL);
#if (TIMING_PERF_SYNC)
gettimeofday(&tv_pre, NULL);
#endif
if (!st->read_analysis_thread_running) break;
@ -415,31 +423,32 @@ void* caribou_smi_analyze_thread(void* arg)
st->current_app_buffer + offset,
st->batch_length);
gettimeofday(&tv_post, NULL);
#if (TIMING_PERF_SYNC)
gettimeofday(&tv_post, NULL);
// benchmarking
num_samples = (double)(st->read_ret_value) / 4.0;
num_samples_avg = num_samples_avg*0.1 + num_samples*0.9;
temp_pre = tv_pre.tv_sec + ((double)(tv_pre.tv_usec)) / 1e6;
time_post = tv_post.tv_sec + ((double)(tv_post.tv_usec)) / 1e6;
// benchmarking
num_samples = (double)(st->read_ret_value) / 4.0;
num_samples_avg = num_samples_avg*0.1 + num_samples*0.9;
temp_pre = tv_pre.tv_sec + ((double)(tv_pre.tv_usec)) / 1e6;
time_post = tv_post.tv_sec + ((double)(tv_post.tv_usec)) / 1e6;
batch_time = temp_pre - time_pre;
sample_rate = sample_rate*0.1 + (num_samples / batch_time) * 0.9;
process_time = process_time*0.1 + (time_post - temp_pre)*0.9;
batch_time = temp_pre - time_pre;
sample_rate = sample_rate*0.1 + (num_samples / batch_time) * 0.9;
process_time = process_time*0.1 + (time_post - temp_pre)*0.9;
time_pre = temp_pre;
total_samples += st->read_ret_value;
if (total_samples % (4*4000000) == 0)
{
printf("sample_rate = %.2f SPS, process_time = %.2f usec, num_samples_avg = %.1f\n", sample_rate, process_time * 1e6, num_samples_avg);
}
time_pre = temp_pre;
total_samples += st->read_ret_value;
if (total_samples % (4*4000000) == 0)
{
printf("sample_rate = %.2f SPS, process_time = %.2f usec, num_samples_avg = %.1f\n", sample_rate, process_time * 1e6, num_samples_avg);
}
#endif
}
ZF_LOGD("Leaving SMI analysis thread id %lu, running = %d", tid, st->read_analysis_thread_running);
return NULL;
}
#define TIMING_PERF_SYNC (0)
//=========================================================================
void* caribou_smi_thread(void *arg)
{
@ -896,7 +905,6 @@ static void caribou_smi_init_stream(caribou_smi_st* dev, caribou_smi_stream_type
st->parent_dev = dev;
}
//=========================================================================
static void caribou_smi_print_smi_settings(caribou_smi_st* dev, struct smi_settings *settings)
{

Wyświetl plik

@ -8,7 +8,7 @@ include_directories(/.)
include_directories(${SUPER_DIR})
#However, the file(GLOB...) allows for wildcard additions:
set(SOURCES_LIB tsqueue.c tiny_list.c)
set(SOURCES_LIB tsqueue.c tiny_list.c circular_buffer.cpp)
#add_compile_options(-Wall -Wextra -pedantic -Werror)
add_compile_options(-Wall -Wextra -pedantic -Wno-missing-braces)
@ -19,6 +19,9 @@ target_include_directories(datatypes PUBLIC ${CMAKE_CURRENT_SOURCE_DIR})
add_executable(test_tsqueue test_tsqueue.c)
target_link_libraries(test_tsqueue datatypes pthread)
add_executable(test_circular_buffer test_circular_buffer.cpp)
target_link_libraries(test_circular_buffer datatypes pthread)
add_executable(test_tiny_list test_tiny_list.c)
target_link_libraries(test_tiny_list datatypes pthread)

Wyświetl plik

@ -0,0 +1 @@
#include "circular_buffer.h"

Wyświetl plik

@ -0,0 +1,157 @@
#ifndef __SAMPLE_CIRC_BUFFER_H__
#define __SAMPLE_CIRC_BUFFER_H__
#include <string.h>
#include <thread>
#include <mutex>
#include <condition_variable>
#define IS_POWER_OF_2(x) (!((x) == 0) && !((x) & ((x) - 1)))
#define MIN(x,y) ((x)>(y)?(y):(x))
uint32_t next_power_of_2 (uint32_t x)
{
uint32_t power = 1;
while(power < x)
{
power <<= 1;
}
return power;
}
template <class T>
class circular_buffer {
public:
circular_buffer(size_t size, bool override_write = true, bool block_read = true)
{
max_size_ = size;
if (!IS_POWER_OF_2(max_size_))
{
max_size_ = next_power_of_2(max_size_);
}
buf_ = new T[max_size_];
override_write_ = override_write;
block_read_ = block_read;
}
~circular_buffer()
{
std::unique_lock<std::mutex> lock(mutex_);
delete []buf_;
}
size_t put(const T *data, size_t length)
{
std::lock_guard<std::mutex> lock(mutex_);
if ((max_size_ - size()) < length && override_write_)
{
// pop the amount of data the is needed
tail_ += length - (max_size_ - size());
}
size_t len = MIN(length, max_size_ - head_ + tail_);
auto l = MIN(len, max_size_ - (head_ & (max_size_ - 1)));
memcpy(buf_ + (head_ & (max_size_ - 1)), data, l * sizeof(T));
memcpy(buf_, data + l, (len - l) * sizeof(T));
head_ += len;
if (block_read_)
{
cond_var_.notify_one();
}
return len;
}
size_t get(T *data, size_t length)
{
std::unique_lock<std::mutex> lock(mutex_);
if (block_read_)
{
cond_var_.wait(lock, [&]()
{
// Acquire the lock only if
// we got enough items
return size() >= length;
});
}
size_t len = MIN(length, head_ - tail_);
auto l = MIN(len, max_size_ - (tail_ & (max_size_ - 1)));
if (data != NULL)
{
memcpy(data, buf_ + (tail_ & (max_size_ - 1)), l * sizeof(T));
memcpy(data + l, buf_, (len - l) * sizeof(T));
}
tail_ += len;
return len;
}
void put(T item)
{
put(&item, 1);
}
T get()
{
T item;
get(&item, 1);
return item;
}
void reset()
{
std::unique_lock<std::mutex> lock(mutex_);
head_ = tail_ = 0;
}
inline bool empty()
{
return head_ == tail_;
}
inline bool full()
{
return size() == capacity();
}
inline size_t capacity() const
{
return max_size_;
}
size_t size()
{
return (head_ - tail_);
}
void print_buffer()
{
std::unique_lock<std::mutex> lock(mutex_);
size_t t = tail_;
int i = 0;
while (t < head_)
{
printf("%d => %d\n", i++, (int)buf_[t++&(max_size_-1)]);
}
}
private:
std::mutex mutex_;
std::condition_variable cond_var_;
T* buf_;
size_t head_ = 0;
size_t tail_ = 0;
size_t max_size_;
bool override_write_;
bool block_read_;
};
#endif //__SAMPLE_CIRC_BUFFER_H__

Wyświetl plik

@ -0,0 +1,55 @@
#include "circular_buffer.h"
#include <unistd.h>
#include <stdio.h>
circular_buffer<uint32_t> *buf = NULL;
uint32_t data1[100] = {0};
uint32_t data2[100] = {0};
void producer(int times)
{
while (true)
{
sleep(0);
printf("THREAD! PUT1 100, ret %lu\n", buf->put(data1, 100));
sleep(0);
printf("PUT1 60, ret %lu\n", buf->put(data1, 60));
}
}
void consumer(int times)
{
printf("Capacity = %lu\n", buf->capacity());
while (true)
{
printf("GET1 100, ret %lu\n", buf->get(data2, 100));
printf("GET1 100, ret %lu\n", buf->get(data2, 100));
printf("GET1 100, ret %lu\n", buf->get(data2, 100));
printf("GET1 100, ret %lu\n", buf->get(data2, 60));
}
printf("finished\n");
}
int main ()
{
for (int i = 0; i < 100; i++)
{
data1[i] = i;
data2[i] = i*i;
}
buf = new circular_buffer<uint32_t>(200, true, true);
int times = 1;
std::thread t1(producer, times);
std::thread t2(consumer, times);
t2.join();
t1.join();
delete buf;
return 0;
}

Wyświetl plik

@ -18,7 +18,7 @@
//#define ZF_LOG_LEVEL ZF_LOG_ERROR
#define ZF_LOG_LEVEL ZF_LOG_VERBOSE
#include "datatypes/tsqueue.h"
#include "datatypes/circular_buffer.h"
#include "cariboulite_setup.h"
#include "cariboulite_radios.h"
@ -26,8 +26,8 @@ enum Cariboulite_Format
{
CARIBOULITE_FORMAT_FLOAT32 = 0,
CARIBOULITE_FORMAT_INT16 = 1,
CARIBOULITE_FORMAT_INT8 = 2,
CARIBOULITE_FORMAT_FLOAT64 = 3,
CARIBOULITE_FORMAT_INT8 = 2,
CARIBOULITE_FORMAT_FLOAT64 = 3,
};
//#define BUFFER_SIZE_MS ( 10 )
@ -114,7 +114,7 @@ public:
Cariboulite_Format chosen_format;
int dig_filt;
private:
tsqueue_st queue;
circular_buffer<sample_complex_int16> *queue;
size_t mtu_size_bytes;
uint8_t *partial_buffer;
int partial_buffer_start;

Wyświetl plik

@ -1,52 +1,8 @@
#include "Cariboulite.hpp"
#include <Iir.h>
#include <byteswap.h>
class CircFifo
{
private:
sample_complex_int16 *buffer;
uint32_t in;
uint32_t out;
uint32_t size;
public:
CircFifo(uint32_t _size)
{
if (!is_power_of_2(_size))
_size = roundup_power_of_2(_size);
buffer = new sample_complex_int16[_size];
in = 0;
out = 0;
size = _size;
}
~CircFifo()
{
delete []buffer;
}
uint32_t put(const uint8_t *data, uint32_t len)
{
len = min(len, size - in + out);
auto l = min(len, size - (in & (size - 1)));
memcpy(buffer + (in & (size - 1)), data, l);
memcpy(buffer, data + l, len - l);
in += len;
return len;
}
uint32_t get(uint8_t *data, uint32_t len)
{
len = min(len, in - out);
auto l = min(len, size - (out & (size - 1)));
memcpy(data, buffer + (out & (size - 1)), l);
memcpy(data + l, buffer, len - l);
out += len;
return len;
}
}
//==============================================
void print_iq(uint32_t* array, int len)
@ -73,8 +29,7 @@ void print_iq(uint32_t* array, int len)
SampleQueue::SampleQueue(int mtu_bytes, int num_buffers)
{
SoapySDR_logf(SOAPY_SDR_INFO, "Creating SampleQueue MTU: %d bytes, NumBuffers: %d", mtu_bytes, num_buffers);
tsqueue_init(&queue, mtu_bytes, num_buffers);
//printf("finished tsqueue\n");
queue = new circular_buffer(mtu_bytes / 4 * num_buffers)
mtu_size_bytes = mtu_bytes;
stream_id = -1;
stream_dir = -1;
@ -114,7 +69,7 @@ SampleQueue::~SampleQueue()
stream_channel = -1;
delete[] partial_buffer;
delete[] interm_native_buffer;
tsqueue_release(&queue);
delete queue;
}
//=================================================================
@ -133,39 +88,9 @@ int SampleQueue::AttachStreamId(int id, int dir, int channel)
}
//=================================================================
int SampleQueue::Write(uint8_t *buffer, size_t length, uint32_t meta, long timeout_us)
int SampleQueue::Write(sample_complex_int16 *buffer, size_t num_samples, uint8_t* meta, long timeout_us)
{
int left_to_write = length;
int offset = 0;
int chunk = 0;
//printf("Write: %dB\n", length);
while (left_to_write)
{
int current_length = ( left_to_write < (int)mtu_size_bytes ) ? left_to_write : mtu_size_bytes;
int res = tsqueue_insert_push_buffer(&queue,
buffer + offset,
current_length,
meta, timeout_us,
true);
switch (res)
{
case TSQUEUE_NOT_INITIALIZED:
case TSQUEUE_SEM_FAILED:
{
SoapySDR_logf(SOAPY_SDR_ERROR, "pushing buffer n %d failed", chunk);
return -1;
} break;
case TSQUEUE_TIMEOUT:
case TSQUEUE_FAILED_FULL: return offset; break;
default: break;
}
offset += current_length;
left_to_write -= current_length;
chunk ++;
}
return offset;
return queue->put(buffer, elements)
}
//=================================================================
@ -274,7 +199,8 @@ int SampleQueue::ReadSamples(sample_complex_int16* buffer, size_t num_elements,
}*/
for (int i = 0; i < tot_read_elements; i++)
{
{
buffer[i].i >>= 1;
buffer[i].q >>= 1;