diff --git a/horusdemodlib/__init__.py b/horusdemodlib/__init__.py index b3f4756..ae73625 100755 --- a/horusdemodlib/__init__.py +++ b/horusdemodlib/__init__.py @@ -1 +1 @@ -__version__ = "0.1.2" +__version__ = "0.1.3" diff --git a/horusdemodlib/decoder.py b/horusdemodlib/decoder.py index c968f61..3c1b2e7 100644 --- a/horusdemodlib/decoder.py +++ b/horusdemodlib/decoder.py @@ -2,6 +2,7 @@ # HorusLib - Binary Packet Decoder Functions # import codecs +import datetime import struct import time from .delegates import * @@ -159,6 +160,9 @@ def decode_packet(data:bytes, packet_format:dict = None) -> dict: _ukhas_crc = ukhas_crc(_ukhas_str.encode('ascii')) _output['ukhas_str'] = "$$" + _ukhas_str + "*" + _ukhas_crc + # Duplicate some fields for parsing later + _output['callsign'] = _output['payload_id'] + return _output @@ -172,6 +176,79 @@ def hex_to_bytes(data:str) -> bytes: return None +def parse_ukhas_string(sentence:str) -> dict: + """ Attempt to decode a UKHAS telemetry sentence into a dictionary """ + + # Try and convert from bytes to str if necessary + if type(sentence) == bytes: + sentence = sentence.decode('ascii') + + # Try and proceed through the following. If anything fails, we have a corrupt sentence. + try: + # Strip out any leading/trailing whitespace. + _sentence = sentence.strip() + + # First, try and find the start of the sentence, which always starts with '$$'' + _sentence = _sentence.split('$$')[-1] + # Hack to handle odd numbers of $$'s at the start of a sentence + if _sentence[0] == '$': + _sentence = _sentence[1:] + # Now try and split out the telemetry from the CRC16. + _telem = _sentence.split('*')[0] + _crc = _sentence.split('*')[1] + + # Now check if the CRC matches. + _calc_crc = ukhas_crc(_telem.encode('ascii')) + + if _calc_crc != _crc: + raise ValueError("Could not parse RTTY Sentence - CRC Fail.") + + # We now have a valid sentence! Extract fields.. + _fields = _telem.split(',') + + _callsign = _fields[0] + _time = _fields[2] + _latitude = float(_fields[3]) + _longitude = float(_fields[4]) + _altitude = int(_fields[5]) + # The rest we don't care about. + + # Perform some sanity checks on the data. + + # Attempt to parse the time string. This will throw an error if any values are invalid. + try: + _time_dt = datetime.datetime.strptime(_time, "%H:%M:%S") + except: + raise ValueError("Could not parse RTTY Sentence - Invalid Time.") + + # Check if the lat/long is 0.0,0.0 - no point passing this along. + if _latitude == 0.0 or _longitude == 0.0: + raise ValueError("Could not parse RTTY Sentence - Zero Lat/Long.") + + # Place a limit on the altitude field. We generally store altitude on the payload as a uint16, so it shouldn't fall outside these values. + if _altitude > 65535 or _altitude < 0: + raise ValueError("Could not parse RTTY Sentence - Invalid Altitude.") + + # Produce a dict output which is compatible with the output of the binary decoder. + _telem = { + 'callsign': _callsign, + 'time': _time, + 'latitude': _latitude, + 'longitude': _longitude, + 'altitude': _altitude, + 'speed': -1, + 'heading': -1, + 'temp': -1, + 'sats': -1, + 'batt_voltage': -1 + } + + return _telem + + except Exception as e: + raise ValueError("Could not parse ASCII Sentence - %s" % str(e)) + + if __name__ == "__main__": import argparse import sys @@ -209,6 +286,7 @@ if __name__ == "__main__": if args.test: + # Binary packet decoder tests tests = [ ['horus_binary_v1', b'\x01\x12\x00\x00\x00\x23\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x1C\x9A\x95\x45', ''], ['horus_binary_v1', b'\x01\x12\x00\x00\x00\x23\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x1C\x9A\x95\x45', 'error'], @@ -232,6 +310,17 @@ if __name__ == "__main__": print(f"Input ({_format}): {str(_input)} - Caught Error: {str(e)}") assert(_output == 'error') + # RTTY Decoder Tests + tests = [ + '$$HORUS,6,06:43:16,0.000000,0.000000,0,0,0,1801,20*1DA2' + ] + + for _test in tests: + try: + _decoded = parse_ukhas_string(_test) + print(_decoded) + except ValueError as e: + print(f"Caught Error: {str(e)}") print("All tests passed!") diff --git a/pyproject.toml b/pyproject.toml index 1b01ed4..e17fe7c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "horusdemodlib" -version = "0.1.2" +version = "0.1.3" description = "Project Horus HAB Telemetry Demodulators" authors = ["Mark Jessop"] license = "LGPL-2.1-or-later"