Use lifo circular buffer for habitat telemetry upload.

pull/53/head
Phil Crump 2018-05-06 15:33:33 +00:00
rodzic 7738328c14
commit e142c9f7e2
4 zmienionych plików z 190 dodań i 96 usunięć

Wyświetl plik

@ -38,6 +38,7 @@
#include "listener.h"
#include "habpack.h"
#include "udpclient.h"
#include "lifo_buffer.h"
#define VERSION "V1.8.19"
bool run = TRUE;
@ -187,15 +188,12 @@ struct TBinaryPacket {
#pragma pack(pop)
lifo_buffer_t Habitat_Upload_Buffer;
// Create pipes for inter proces communication
// GLOBAL AS CALLED FROM INTERRRUPT
int telem_pipe_fd[2];
int ssdv_pipe_fd[2];
// Create a structure to share some variables with the habitat child process
// GLOBAL AS CALLED FROM INTERRRUPT
thread_shared_vars_t htsv;
// Create a structure to share some variables with the ssdv child process
// GLOBAL AS CALLED FROM INTERRRUPT
thread_shared_vars_t stsv;
@ -1019,19 +1017,15 @@ void ProcessTelemetryMessage(int Channel, received_t *Received)
if ( Config.EnableHabitat )
{
// Add the telemetry packet to the pipe
int result = write( telem_pipe_fd[1], Received, sizeof( *Received ) );
if ( result == -1 )
// Add to Habitat upload queue
received_t *queueReceived = malloc(sizeof(received_t));
if(queueReceived != NULL)
{
exit_error("Error writing to the telemetry pipe\n");
}
if ( result == 0 )
{
LogMessage( "Nothing written to telemetry pipe \n" );
}
if ( result > 1 )
{
htsv.packet_count++;
/* WARNING: Doesn't copy linked-list :/ */
memcpy(queueReceived, Received, sizeof(received_t));
/* Push pointer onto upload queue */
lifo_buffer_push(&Habitat_Upload_Buffer, (void *)queueReceived);
}
}
@ -2318,12 +2312,6 @@ int main( int argc, char **argv )
int result;
result = pipe( telem_pipe_fd );
if ( result < 0 )
{
exit_error("Error creating telemetry pipe\n");
}
result = pipe( ssdv_pipe_fd );
if ( result < 0 )
{
@ -2373,15 +2361,11 @@ int main( int argc, char **argv )
return 1;
}
// Initialise the vars
htsv.parent_status = RUNNING;
htsv.packet_count = 0;
if (Config.EnableHabitat)
{
if ( pthread_create (&HabitatThread, NULL, HabitatLoop, ( void * ) &htsv))
lifo_buffer_init(&Habitat_Upload_Buffer, 1024);
if ( pthread_create (&HabitatThread, NULL, HabitatLoop, NULL))
{
fprintf( stderr, "Error creating Habitat thread\n" );
return 1;
@ -2565,14 +2549,11 @@ int main( int argc, char **argv )
LogMessage( "Closing SSDV pipe\n" );
close( ssdv_pipe_fd[1] );
LogMessage( "Closing Habitat pipe\n" );
close( telem_pipe_fd[1] );
LogMessage( "Stopping SSDV thread\n" );
stsv.parent_status = STOPPED;
LogMessage( "Stopping Habitat thread\n" );
htsv.parent_status = STOPPED;
lifo_buffer_quitwait(&Habitat_Upload_Buffer);
if (Config.EnableSSDV)
{

Wyświetl plik

@ -23,6 +23,9 @@
#include "sha256.h"
#include "wiringPi.h"
#include "gateway.h"
#include "lifo_buffer.h"
extern lifo_buffer_t Habitat_Upload_Buffer;
extern int telem_pipe_fd[2];
extern pthread_mutex_t var;
@ -79,19 +82,6 @@ void UploadTelemetryPacket( received_t * t )
doc_tm = gmtime( &t->Metadata.Timestamp );
strftime( doc_time, sizeof( doc_time ), "%Y-%0m-%0dT%H:%M:%SZ", doc_tm );
// So that the response to the curl PUT doesn't mess up my finely crafted display!
curl_easy_setopt( curl, CURLOPT_WRITEFUNCTION, habitat_write_data );
// Set the timeout
curl_easy_setopt( curl, CURLOPT_TIMEOUT, 15 );
// RJH capture http errors and report
// curl_easy_setopt( curl, CURLOPT_FAILONERROR, 1 );
curl_easy_setopt( curl, CURLOPT_ERRORBUFFER, curl_error );
// Avoid curl library bug that happens if above timeout occurs (sigh)
curl_easy_setopt( curl, CURLOPT_NOSIGNAL, 1 );
// Grab current telemetry string and append a linefeed
sprintf( Sentence, "%s\n", t->UKHASstring );
@ -111,12 +101,22 @@ void UploadTelemetryPacket( received_t * t )
"{\"data\": {\"_raw\": \"%s\"},\"receivers\": {\"%s\": {\"time_created\": \"%s\",\"time_uploaded\": \"%s\",\"rig_info\": {\"frequency\":%.0f}}}}",
base64_data, Config.Tracker, doc_time, now, (t->Metadata.Frequency + t->Metadata.FrequencyError) * 1000000 );
// LogTelemetryPacket(json);
// Set the URL that is about to receive our PUT
sprintf( url, "http://habitat.habhub.org/habitat/_design/payload_telemetry/_update/add_listener/%s", doc_id);
// So that the response to the curl PUT doesn't mess up my finely crafted display!
curl_easy_setopt( curl, CURLOPT_WRITEFUNCTION, habitat_write_data );
// Set the timeout
curl_easy_setopt( curl, CURLOPT_TIMEOUT, 15 );
// RJH capture http errors and report
// curl_easy_setopt( curl, CURLOPT_FAILONERROR, 1 );
curl_easy_setopt( curl, CURLOPT_ERRORBUFFER, curl_error );
// Avoid curl library bug that happens if above timeout occurs (sigh)
curl_easy_setopt( curl, CURLOPT_NOSIGNAL, 1 );
// Set the headers
headers = NULL;
headers = curl_slist_append(headers, "Accept: application/json");
@ -169,65 +169,33 @@ void UploadTelemetryPacket( received_t * t )
void *HabitatLoop( void *vars )
{
if ( Config.EnableHabitat )
{
thread_shared_vars_t *htsv;
htsv = vars;
received_t t;
int packets = 0;
unsigned long total_packets = 0;
received_t *dequeued_telemetry_ptr;
int i = 1;
// Keep looping until the parent quits and there are no more packets to
// send to habitat.
while ( ( htsv->parent_status == RUNNING ) || ( packets > 0 ) )
// Keep looping until the parent quits
while ( true )
{
dequeued_telemetry_ptr = lifo_buffer_waitpop(&Habitat_Upload_Buffer);
//THis is neded for some reason habitat thread has a pthread_mutex_lock set
// and this removes it
if ( i )
if(dequeued_telemetry_ptr != NULL)
{
// pthread_mutex_lock(&var);
pthread_mutex_unlock( &var );
i = 0;
}
if ( htsv->packet_count > total_packets )
{
packets = read( telem_pipe_fd[0], &t, sizeof( t ) );
ChannelPrintf( dequeued_telemetry_ptr->Metadata.Channel, 6, 1, "Habitat" );
UploadTelemetryPacket( dequeued_telemetry_ptr );
ChannelPrintf( dequeued_telemetry_ptr->Metadata.Channel, 6, 1, " " );
free(dequeued_telemetry_ptr);
}
else
{
packets = 0;
// pthread_mutex_unlock(&var);
// If we have have a rollover after processing 4294967295 packets
if ( htsv->packet_count < total_packets )
total_packets = 0;
/* We've been asked to quit */
break;
}
if ( packets )
{
// LogMessage ("%s\n", t.Telemetry);
ChannelPrintf( t.Metadata.Channel, 6, 1, "Habitat" );
UploadTelemetryPacket( &t );
ChannelPrintf( t.Metadata.Channel, 6, 1, " " );
total_packets++;
}
delay(100); // Don't eat too much CPU
}
}
close( telem_pipe_fd[0] );
close( telem_pipe_fd[1] );
LogMessage( "Habitat thread closing\n" );
return NULL;

115
lifo_buffer.c 100644
Wyświetl plik

@ -0,0 +1,115 @@
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include "lifo_buffer.h"
void lifo_buffer_init(lifo_buffer_t *buf, uint32_t length)
{
pthread_mutex_init(&buf->Mutex, NULL);
pthread_cond_init(&buf->Signal, NULL);
pthread_mutex_lock(&buf->Mutex);
buf->Head = 0;
buf->Tail = 0;
buf->Length = length;
buf->Data = malloc(length * sizeof(void *));
buf->Quit = false;
pthread_mutex_unlock(&buf->Mutex);
}
/* Lossy when buffer is full */
void lifo_buffer_push(lifo_buffer_t *buf, void *data_ptr)
{
pthread_mutex_lock(&buf->Mutex);
/* If no space, remove oldest from bottom of the queue by advancing Tail */
if(buf->Head==(buf->Tail-1) || (buf->Head==(buf->Length-1) && buf->Tail==0))
{
if(buf->Tail==(buf->Length-1))
buf->Tail=0;
else
buf->Tail++;
}
if(buf->Head==(buf->Length-1))
buf->Head=0;
else
buf->Head++;
buf->Data[buf->Head] = data_ptr;
pthread_cond_signal(&buf->Signal);
pthread_mutex_unlock(&buf->Mutex);
}
/* Returns NULL when unsuccessful */
void *lifo_buffer_pop(lifo_buffer_t *buf)
{
void *result;
pthread_mutex_lock(&buf->Mutex);
if(buf->Head!=buf->Tail)
{
result = buf->Data[buf->Head];
if(buf->Head==0)
buf->Head=(buf->Length-1);
else
buf->Head--;
pthread_mutex_unlock(&buf->Mutex);
}
else
{
pthread_mutex_unlock(&buf->Mutex);
result = NULL;
}
return result;
}
void *lifo_buffer_waitpop(lifo_buffer_t *buf)
{
void *result;
pthread_mutex_lock(&buf->Mutex);
while(buf->Head==buf->Tail && !buf->Quit) /* If buffer is empty */
{
/* Mutex is atomically unlocked on beginning waiting for signal */
pthread_cond_wait(&buf->Signal, &buf->Mutex);
/* and locked again on resumption */
}
if(buf->Quit)
{
return NULL;
}
result = buf->Data[buf->Head];
if(buf->Head==0)
buf->Head=(buf->Length-1);
else
buf->Head--;
pthread_mutex_unlock(&buf->Mutex);
return result;
}
void lifo_buffer_quitwait(lifo_buffer_t *buf)
{
pthread_mutex_lock(&buf->Mutex);
buf->Quit = true;
pthread_cond_signal(&buf->Signal);
pthread_mutex_unlock(&buf->Mutex);
}

30
lifo_buffer.h 100644
Wyświetl plik

@ -0,0 +1,30 @@
#ifndef __LIFO_BUFFER_H__
#define __LIFO_BUFFER_H__
#include <stdbool.h>
#include <pthread.h>
typedef struct
{
/* Buffer Access Lock */
pthread_mutex_t Mutex;
/* New Data Signal */
pthread_cond_t Signal;
/* Whether the waiting thread should quit */
bool Quit;
/* Head and Tail Indexes */
uint32_t Head, Tail;
/* Data */
void **Data;
/* Data Length */
uint32_t Length;
} lifo_buffer_t;
/** Common functions **/
void lifo_buffer_init(lifo_buffer_t *buf, uint32_t length);
void lifo_buffer_push(lifo_buffer_t *buf, void *data_ptr);
void *lifo_buffer_pop(lifo_buffer_t *buf);
void *lifo_buffer_waitpop(lifo_buffer_t *buf);
void lifo_buffer_quitwait(lifo_buffer_t *buf);
#endif /* __LIFO_BUFFER_H__*/