From a7a2b860eacc5944eabd05606e2bb4c7aeb933ea Mon Sep 17 00:00:00 2001 From: Mark Jessop Date: Sat, 28 May 2022 21:01:24 +0930 Subject: [PATCH 1/3] Initial attempt at sondehub-amateur uploader class --- pyproject.toml | 6 +- sondehub/__init__.py | 3 + sondehub/amateur.py | 671 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 678 insertions(+), 2 deletions(-) create mode 100644 sondehub/amateur.py diff --git a/pyproject.toml b/pyproject.toml index 5bed5c4..eb8f44a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [tool.poetry] name = "sondehub" -version = "0.2.1" -description = "SDK to access SondeHub open data" +version = "0.3.0" +description = "SDK to access SondeHub open data, and helpers for uploading telemetry." authors = ["Michaela "] readme = "README.md" homepage = "https://github.com/projecthorus/pysondehub" @@ -13,6 +13,8 @@ license = "GPL-3.0-or-later" python = "^3.6" paho-mqtt = "^1.5.1" boto3 = "^1.14.44" +requests = "^2.24.0" +python-dateutil = "^2.8" [tool.poetry.dev-dependencies] diff --git a/sondehub/__init__.py b/sondehub/__init__.py index 36c4e1d..f1aa87a 100644 --- a/sondehub/__init__.py +++ b/sondehub/__init__.py @@ -11,6 +11,9 @@ from queue import Queue import queue import gzip + +__version__ = "0.3.0" + S3_BUCKET = "sondehub-history" diff --git a/sondehub/amateur.py b/sondehub/amateur.py new file mode 100644 index 0000000..156d64d --- /dev/null +++ b/sondehub/amateur.py @@ -0,0 +1,671 @@ +#!/usr/bin/env python +# +# SondeHub Amateur Uploader +# +# Uploads telemetry to the SondeHub ElasticSearch cluster, +# in the new 'universal' format descried here: +# https://github.com/projecthorus/sondehub-infra/wiki/%5BDRAFT%5D-Amateur-Balloon-Telemetry-Format +# +# Copyright (C) 2022 Mark Jessop +# Released under GNU GPL v3 or later +# +import sondehub +import datetime +import glob +import gzip +import json +import logging +import os +import requests +import time +from threading import Thread +from email.utils import formatdate +from queue import Queue +from dateutil.parser import parse + + +def fix_datetime(datetime_str, local_dt_str=None): + """ + Given a HH:MM:SS string from a telemetry sentence, produce a complete timestamp, + using the current system time as a guide for the date. + """ + + if local_dt_str is None: + _now = datetime.datetime.utcnow() + else: + _now = parse(local_dt_str) + + # Are we in the rollover window? + if _now.hour == 23 or _now.hour == 0: + _outside_window = False + else: + _outside_window = True + + # Parsing just a HH:MM:SS will return a datetime object with the year, month and day replaced by values in the 'default' + # argument. + # If the year/month/date has also been provided, then we just end up with the supplied time. + _new_dt = parse(datetime_str, default=_now) + + if _outside_window: + # We are outside the day-rollover window, and can safely use the current zulu date. + return _new_dt + else: + # We are within the window, and need to adjust the day backwards or forwards based on the sonde time. + if _new_dt.hour == 23 and _now.hour == 0: + # Assume system clock running slightly fast, and subtract a day from the telemetry date. + _new_dt = _new_dt - datetime.timedelta(days=1) + + elif _new_dt.hour == 00 and _now.hour == 23: + # System clock running slow. Add a day. + _new_dt = _new_dt + datetime.timedelta(days=1) + + return _new_dt + + +class Uploader(object): + """ Sondehub (Amateur) Uploader Class. + + Accepts telemetry data in various forms, buffers them up, and then compresses and uploads + them to the Sondehub Elasticsearch cluster. + + """ + + # SondeHub API endpoint + SONDEHUB_AMATEUR_URL = "https://api.v2.sondehub.org/amateur/telemetry" + SONDEHUB_AMATEUR_STATION_POSITION_URL = "https://api.v2.sondehub.org/amateur/listeners" + + def __init__( + self, + uploader_callsign, + uploader_position = None, + uploader_radio = None, + uploader_antenna = None, + software_name = None, + software_version = None, + upload_rate=2, + upload_timeout=20, + upload_retries=5, + developer_mode=False + ): + """ Initialise and start a Sondehub (Amateur) uploader + + Mandatory Args: + uploader_callsign (str): Callsign/name of the uploader. + + Optional Args: + uploader_position (list): Uploader position as [lat, lon, alt] + uploader_radio (str): Information on the Uploader's radio. + uploader_antenna (str): Information on the Uploader's antenna. + software_name (str): Software name information. If this is provided, software_version must also be provided. + software_version (str): Software version number, e.g. "0.1.2" + upload_rate (int): How often to upload batches of data. + upload_timeout (int): Upload timeout (seconds) + upload_retries (int): Upload retries + developer_mode (bool): If set to true, packets will be discarded when they enter the Sondehub DB. + """ + + self.upload_rate = upload_rate + self.upload_timeout = upload_timeout + self.upload_retries = upload_retries + self.developer_mode = developer_mode + + # User information + self.uploader_callsign = str(uploader_callsign) + # Optional User Information + if uploader_position: + if (type(uploader_position) == list) and (len(uploader_position) ==3): + self.uploader_position = uploader_position + else: + raise ValueError("Invalid uploader position supplied, must be a list with 3 (lat, lon, alt) elements.") + else: + self.uploader_position = None + self.uploader_radio = uploader_radio + self.uploader_antenna = uploader_antenna + + if software_name: + # User has supplied a software name, they must also supply a version string + if not software_version: + raise ValueError("software_name provided, but no software_version provided.") + + self.software_name = str(software_name) + self.software_version = str(software_version) + else: + self.software_name = "pysondehub" + self.software_version = sondehub.__version__ + + self.software_combined_name = f"{self.software_name}-{self.software_version}" + + # Input Queue. + self.input_queue = Queue() + + # Start processing the input queue. + self.input_processing_running = True + self.input_process_thread = Thread(target=self.process_queue) + self.input_process_thread.start() + + + def add_telemetry(self, + # Mandatory fields. + payload_callsign, + timestamp, # Either as a string, or a datetime object. + lat, + lon, + alt, + # Optional 'standard' telemetry fields. + frame=None, + time_received=None, # Will be replaced with the current time if not provided. + sats=None, + batt=None, + temp=None, + humidity=None, + pressure=None, + vel_h=None, + vel_v=None, + heading=None, + tx_frequency=None, + # Optional metadata fields + modulation=None, + snr=None, + frequency=None, + rssi=None, + + # Per-telemetry-packet Uploader information + uploader_callsign=None, + uploader_position=None, + uploader_antenna=None, + uploader_radio=None, + + # Any custom fields can be included fields + extra_fields = {}, + ): + """ + Add a datapoint of telemetry to the upload queue + """ + + # Output dictionary. + output = {} + + if self.developer_mode: + output['dev'] = True + + # Mandatory fields: + # Payload callsign + output['payload_callsign'] = str(payload_callsign) + + # Timestamp + if type(timestamp) == datetime.datetime: + # User has provided a datetime object - we just use this as-is. + _datetime = timestamp + else: + # Try and parse the timestamp as a string. + # Detect if the user has provider a date: + _temp_dt = parse(timestamp, default=datetime.datetime(1900,1,1,1,1,1)) + if _temp_dt.year == 1900: + # User has provided a timestamp with just HH:MM:SS fields. + # Pass it through fix_datetime + _datetime = fix_datetime(timestamp) + else: + # User has provided a full date/timestamp + _datetime = _temp_dt + # Add the datetime to the dictionary. + output['datetime'] = _datetime.strftime( + "%Y-%m-%dT%H:%M:%S.%fZ" + ) + + # Lat/Lon/Alt + output['lat'] = float(lat) + output['lon'] = float(lon) + output['alt'] = float(alt) + + # Drop packets with null lat and lon. + if (output['lat'] == 0.0) and (output['lon'] == 0.0): + self.log_warning("Lat/Lon both 0.0 - dropping packet") + return + + # Packet received time + if time_received: + if type(time_received) == datetime.datetime: + # Provided as a datetime object. + output['time_received'] = time_received.strftime( + "%Y-%m-%dT%H:%M:%S.%fZ" + ) + else: + # Provided as a string, parse it and add. + output['time_received'] = parse(time_received).strftime( + "%Y-%m-%dT%H:%M:%S.%fZ" + ) + else: + output['time_received'] = datetime.datetime.utcnow().strftime( + "%Y-%m-%dT%H:%M:%S.%fZ" + ) + + # Optional standard fields. + if frame: + output['frame'] = int(frame) + + if sats: + output['sats'] = int(sats) + if output["sats"] == 0: + self.log_error("Satellites field provided, and is 0. Not uploading due to potentially inaccurate position.") + return + + if batt: + output['batt'] = float(batt) + + if temp: + output['temp'] = float(temp) + + if humidity: + output['humdity'] = float(humidity) + + if pressure: + output['pressure'] = float(pressure) + + if vel_v: + output['vel_v'] = float(vel_v) + + if vel_h: + output['vel_h'] = float(vel_h) + + if heading: + output['heading'] = float(heading) + + if tx_frequency: + output['tx_frequency'] = float(tx_frequency) + + # Optional Metadata fields. + if modulation: + output['modulation'] = str(modulation) + + if snr: + output['snr'] = float(snr) + + if frequency: + output['frequency'] = float(frequency) + + if rssi: + output['rssi'] = float(rssi) + + # Per-Packet Uploader Information + if uploader_callsign: + # Per-packet uploader information has been provided. + output['uploader_callsign'] = str(uploader_callsign) + + # Check for any other information. + if uploader_position: + # Uploader position has been provided with the telemetry, check it's ok. + if (type(uploader_position) == list) and (len(uploader_position) == 3): + output['uploader_position'] = uploader_position + else: + raise ValueError("Invalid uploader position supplied, must be a list with 3 (lat, lon, alt) elements.") + + if uploader_radio: + output['uploader_radio'] = str(uploader_radio) + + if uploader_antenna: + output['uploader_antenna'] = str(uploader_antenna) + + else: + # No per-packet information - use the information provided on object instantiation. + output['uploader_callsign'] = self.uploader_callsign + + if self.uploader_position: + output['uploader_position'] = self.uploader_position + + if self.uploader_radio: + output['uploader_radio'] = str(self.uploader_radio) + + if self.uploader_antenna: + output['uploader_antenna'] = str(self.uploader_antenna) + + + # Any extra telemetry fields. + if type(extra_fields) == dict: + # Add each supplied field into the output, as long as it is not already present. + for _field in extra_fields: + if _field not in output: + output[_field] = extra_fields[_field] + + + logging.debug(f"Sondehub Amateur Uploader - Generated Packet: {str(output)}") + + # Finally, add the data to the queue if we are running. + if self.input_processing_running: + self.input_queue.put(output) + else: + self.log_debug("Processing not running, discarding.") + + + return + + + def add_dict(self, telemetry): + """ + Add a dictionary of telemetry to the input queue. + + Note that there is no checking of the content of this dictionary. + Use the add_telemetry function as a guide for what must be included. + + Args: + telemetry (dict): Telemetry dictionary to add to the input queue. + """ + + if self.inhibit: + return + + # Add it to the queue if we are running. + if self.input_processing_running: + self.input_queue.put(telemetry) + else: + self.log_debug("Processing not running, discarding.") + + + def process_queue(self): + """ Process data from the input queue, and write telemetry to log files. + """ + self.log_info("Started Sondehub Amateur Uploader Thread.") + + while self.input_processing_running: + + # Process everything in the queue. + _to_upload = [] + + while self.input_queue.qsize() > 0: + try: + _to_upload.append(self.input_queue.get_nowait()) + except Exception as e: + self.log_error("Error grabbing telemetry from queue - %s" % str(e)) + + # Upload data! + if len(_to_upload) > 0: + self.upload_telemetry(_to_upload) + + + # Sleep while waiting for some new data. + for i in range(int(self.upload_rate)): + time.sleep(1) + if self.input_processing_running == False: + break + + self.log_info("Stopped Sondehub Amateur Uploader Thread.") + + + def upload_telemetry(self, telem_list): + """ Upload an list of telemetry data to Sondehub """ + + _data_len = len(telem_list) + + try: + _start_time = time.time() + _telem_json = json.dumps(telem_list).encode("utf-8") + _compressed_payload = gzip.compress(_telem_json) + except Exception as e: + self.log_error( + "Error serialising and compressing telemetry list for upload - %s" + % str(e) + ) + return + + _compression_time = time.time() - _start_time + self.log_debug( + "Pre-compression: %d bytes, post: %d bytes. %.1f %% compression ratio, in %.1f s" + % ( + len(_telem_json), + len(_compressed_payload), + (len(_compressed_payload) / len(_telem_json)) * 100, + _compression_time, + ) + ) + + _retries = 0 + _upload_success = False + + _start_time = time.time() + + while _retries < self.upload_retries: + # Run the request. + try: + headers = { + "User-Agent": "pysondehub-" + sondehub.__version__, + "Content-Encoding": "gzip", + "Content-Type": "application/json", + "Date": formatdate(timeval=None, localtime=False, usegmt=True), + } + _req = requests.put( + self.SONDEHUB_AMATEUR_URL, + _compressed_payload, + # TODO: Revisit this second timeout value. + timeout=(self.upload_timeout, 6.1), + headers=headers, + ) + except Exception as e: + self.log_error("Upload Failed: %s" % str(e)) + return + + if _req.status_code == 200: + # 200 is the only status code that we accept. + _upload_time = time.time() - _start_time + self.log_info( + "Uploaded %d telemetry packets to Sondehub Amateur in %.1f seconds." + % (_data_len, _upload_time) + ) + _upload_success = True + break + + elif _req.status_code == 500: + # Server Error, Retry. + self.log_debug( + "Error uploading to Sondehub Amateur. Status Code: %d %s." + % (_req.status_code, _req.text) + ) + _retries += 1 + continue + + else: + self.log_error( + "Error uploading to Sondehub Amateur. Status Code: %d %s." + % (_req.status_code, _req.text) + ) + break + + if not _upload_success: + self.log_error("Upload failed after %d retries" % (_retries)) + + + def upload_station_position( + self, + callsign, + position, + uploader_radio="", + uploader_antenna="", + contact_email="", + mobile=False, + ): + """ + Upload station position information to the SondeHub-Amateur Database. + This is distinct from the station information sent along with telemetry, + in that it results in a station icon being shown on the tracker map for + ~12 hours after the upload. + + If 'mobile' is set to True, then the station will appear as a chase-car + instead. In this case, positions should be uploaded at much more regular + intervals to reflect the movement of the chase car. + + This uses the PUT /amateur/listeners API described here: + https://github.com/projecthorus/sondehub-infra/wiki/API-(Beta) + """ + + _callsign = str(callsign) + _radio = str(uploader_radio) + _antenna = str(uploader_antenna) + _contact_email = str(contact_email) + + if (type(position) == list) and (len(position) == 3): + _user_position = position + else: + raise ValueError("Invalid uploader position supplied, must be a list with 3 (lat, lon, alt) elements.") + + _position = { + "software_name": self.software_name, + "software_version": self.software_version, + "uploader_callsign": _callsign, + "uploader_position": _user_position, + "uploader_radio": _radio, + "uploader_antenna": _antenna, + "uploader_contact_email": _contact_email, + "mobile": mobile, + "dev": self.developer_mode + } + + logging.debug(f"Sondehub Amateur Uploader - Generated Station Position Packet: {str(_position)}") + + _retries = 0 + _upload_success = False + + _start_time = time.time() + + while _retries < self.upload_retries: + # Run the request. + try: + headers = { + "User-Agent": "pysondehub-" + sondehub.__version__, + "Content-Type": "application/json", + "Date": formatdate(timeval=None, localtime=False, usegmt=True), + } + _req = requests.put( + self.SONDEHUB_AMATEUR_STATION_POSITION_URL, + json=_position, + # TODO: Revisit this second timeout value. + timeout=(self.upload_timeout, 6.1), + headers=headers, + ) + except Exception as e: + self.log_error("Upload Failed: %s" % str(e)) + return + + if _req.status_code == 200: + # 200 is the only status code that we accept. + _upload_time = time.time() - _start_time + self.log_info("Uploaded station information to Sondehub.") + _upload_success = True + break + + elif _req.status_code == 500: + # Server Error, Retry. + _retries += 1 + continue + + elif _req.status_code == 404: + # API doesn't exist yet! + self.log_debug("Sondehub Amateur position upload API not implemented yet!") + _upload_success = True + break + + else: + self.log_error( + "Error uploading station information to Sondehub. Status Code: %d %s." + % (_req.status_code, _req.text) + ) + break + + if not _upload_success: + self.log_error( + "Station information upload failed after %d retries" % (_retries) + ) + self.log_debug(f"Attempted to upload {json.dumps(_position)}") + + self.last_user_position_upload = time.time() + + def close(self): + """ Close input processing thread. """ + self.input_processing_running = False + + def running(self): + """ Check if the uploader thread is running. + + Returns: + bool: True if the uploader thread is running. + """ + return self.input_processing_running + + def log_debug(self, line): + """ Helper function to log a debug message with a descriptive heading. + Args: + line (str): Message to be logged. + """ + logging.debug("Sondehub Amateur Uploader - %s" % line) + + def log_info(self, line): + """ Helper function to log an informational message with a descriptive heading. + Args: + line (str): Message to be logged. + """ + logging.info("Sondehub Amateur Uploader - %s" % line) + + def log_error(self, line): + """ Helper function to log an error message with a descriptive heading. + Args: + line (str): Message to be logged. + """ + logging.error("Sondehub Amateur Uploader - %s" % line) + + def log_warning(self, line): + """ Helper function to log an error message with a descriptive heading. + Args: + line (str): Message to be logged. + """ + logging.warning("Sondehub Amateur Uploader - %s" % line) + + +if __name__ == "__main__": + # Test Script for Uploader Class + logging.basicConfig(format="%(asctime)s %(levelname)s:%(message)s", level=logging.DEBUG) + + _test = Uploader( + "N0CALL-DEV-TESTING", + uploader_position=[0.1,0.1, 0], + uploader_radio="Testing pysondehub", + uploader_antenna="Testing pysondehub" + ) + _test.upload_station_position( + "N0CALL-DEV-TESTING", + [0.1,0.1, 0], + uploader_radio="Testing pysondehub radio", + uploader_antenna="Testing pysondehub antenna" + ) + _test.upload_station_position( + "N0CALL-DEV2-TESTING", + [-0.1, -0.1, 0.0], + mobile=True + ) + + _temp_date = datetime.datetime.utcnow() + _date_str = _temp_date.strftime("%H:%M:%S") + + _test.add_telemetry( + "VK5QI-DEV-TEST", + _date_str, + -0.1, + 0.1, + 10000, + frame=123, + sats=10, + batt=3.14159, + temp=-42.0, + pressure=1001.23, + humidity=99, + vel_v=-42.0, + vel_h=100.0, + heading=359.0, + snr=55.0, + frequency=434.123, + rssi=-123.0, + modulation="Test", + extra_fields={"my_field_1":5, "my_field_2":"foo", "my_field_3":-123.456}, + uploader_callsign="N0CALL-DEV2-TESTING", + uploader_position=[0.15,0.15,0] + ) + # Allow time for the packet to be uploaded. + time.sleep(5) + # Close the uploader + _test.close() From 67dc8e045cf8a1cc6039e60eb570da5f874eb43e Mon Sep 17 00:00:00 2001 From: Mark Jessop Date: Sun, 29 May 2022 10:10:10 +0930 Subject: [PATCH 2/3] Update readme --- README.md | 38 +++++++++++++++++++++++++++++--------- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index afc9685..007da6c 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,25 @@ -Simple realtime streaming SDK for sondehub.org V2 API. +# SondeHub (and SondeHub-Amateur) Python Library + +This repository contains: +* A uploader class for submitting high-altitude balloon telemetry data to SondeHub-Amateur +* A simple realtime streaming SDK for the sondehub.org V2 API (both radiosondes, and amateur balloons). + +### Contacts +* [Mark Jessop](https://github.com/darksidelemm) - vk5qi@rfhead.net +* [Michaela Wheeler](https://github.com/TheSkorm) - radiosonde@michaela.lgbt + +You can often find us in the #highaltitude IRC Channel on Libera Chat. + +## Installing +This library is available via pypi, and can be installed into your Python environment using: +``` +pip install sondehub +``` + +## Submitting Telemetry to SondeHub-Amateur +A guide on using the SondeHub-Amateur uploader class is available here https://github.com/projecthorus/pysondehub/wiki/SondeHub-Amateur-Uploader-Class-Usage + +## Streaming Telemetry Data ``` sondehub.Stream(sondes=["serial number"], on_message=callback) @@ -15,12 +36,12 @@ sondehub.Stream().remove_sonde(serial) Adds or removes a radiosonde from the filter -Data license --- +### Data license + Data is provided under the [Creative Commons BY-SA 2.0](https://creativecommons.org/licenses/by-sa/2.0/) license. -Example Usage --- +### Example Usage + ```python import sondehub @@ -35,8 +56,8 @@ while 1: ``` -Advanced Usage --- +### Advanced Usage + Manual usage of the Paho MQTT network loop can be obtained by using the `loop`, `loop_forever`, `loop_start` and `loop_stop` functions, taking care to ensure that the different types of network loop aren't mixed. See Paho documentation [here](https://www.eclipse.org/paho/index.php?page=clients/python/docs/index.php#network-loop). ```python @@ -84,8 +105,7 @@ sondehub --download S2810113 ``` -Open Data Access -== +## Open Data Access A basic interface to the Open Data is a available using `sondehub.download(serial=, datetime_prefix=)`. When using datetime_prefix only summary data is provided (the oldest, newest and highest frames) From 5b159b6ed1f7721b4edbd13d9c5205d7b938e921 Mon Sep 17 00:00:00 2001 From: Mark Jessop Date: Sun, 29 May 2022 11:07:58 +0930 Subject: [PATCH 3/3] Add dev flag to test --- sondehub/amateur.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sondehub/amateur.py b/sondehub/amateur.py index 156d64d..5c2bcc2 100644 --- a/sondehub/amateur.py +++ b/sondehub/amateur.py @@ -398,6 +398,7 @@ class Uploader(object): try: _start_time = time.time() _telem_json = json.dumps(telem_list).encode("utf-8") + logging.debug(f"Sondehub Amateur Uploader - Generated Packet: {str(_telem_json)}") _compressed_payload = gzip.compress(_telem_json) except Exception as e: self.log_error( @@ -431,6 +432,7 @@ class Uploader(object): "Content-Type": "application/json", "Date": formatdate(timeval=None, localtime=False, usegmt=True), } + logging.debug(f"Sondehub Amateur Uploader - Upload Headers: {str(headers)}") _req = requests.put( self.SONDEHUB_AMATEUR_URL, _compressed_payload, @@ -623,9 +625,10 @@ if __name__ == "__main__": _test = Uploader( "N0CALL-DEV-TESTING", - uploader_position=[0.1,0.1, 0], + uploader_position=[0.1,0.1,0], uploader_radio="Testing pysondehub", - uploader_antenna="Testing pysondehub" + uploader_antenna="Testing pysondehub", + developer_mode=True ) _test.upload_station_position( "N0CALL-DEV-TESTING",