diff --git a/README.md b/README.md index 110cadc..8e3f934 100644 --- a/README.md +++ b/README.md @@ -7,22 +7,34 @@ This library includes the following: * The 'Wenet' demodulator, used to downlink imagery from HAB payloads. +## HorusDemodLib C Library +This contains the demodulator portions of horuslib, which are written in C. + ## Building ``` -$ git clone https://github.com/projecthorus/horuslib.git -$ cd horuslib && mkdir build_linux && cd build_linux +$ git clone https://github.com/projecthorus/horusdemodlib.git +$ cd horusdemodlib && mkdir build && cd build $ cmake .. $ make +$ make install ``` ## Testing ``` -$ cd horus/build_linux +$ cd build $ ctest ``` +## HorusDemodLib Python Wrapper + +``` +$ pip install -r requirements.txt +$ pip install -e . +``` + + ## Further Reading Here are some links to projects and blog posts that use this code: diff --git a/custom_field_list.json b/custom_field_list.json new file mode 100644 index 0000000..3f9a93e --- /dev/null +++ b/custom_field_list.json @@ -0,0 +1,21 @@ +{ +"HORUSTEST": { + "struct": " str: + """ + Calculate the CRC16 CCITT checksum of *data*. + + (CRC16 CCITT: start 0xFFFF, poly 0x1021) + """ + crc16 = crcmod.predefined.mkCrcFun('crc-ccitt-false') + + return hex(crc16(data))[2:].upper().zfill(4) + + +def check_packet_crc(data:bytes, checksum:str='crc16'): + """ + Attempt to validate a packets checksum, which is assumed to be present + in the last few bytes of the packet. + + Support CRC types: CRC16-CCITT + + """ + + if (checksum == 'crc16') or (checksum == 'CRC16') or (checksum == 'crc16-ccitt') or (checksum == 'CRC16-CCITT'): + # Check we have enough data for a sane CRC16. + if len(data) < 3: + raise ValueError(f"Checksum - Not enough data for CRC16!") + + # Decode the last 2 bytes as a uint16 + _packet_checksum = struct.unpack(' dict: + """ + Attempt to decode a set of bytes based on a provided packet format. + + """ + + if packet_format is None: + # Attempt to lookup the format based on the length of the data if it has not been provided. + if len(data) in HORUS_LENGTH_TO_FORMAT: + packet_format = HORUS_PACKET_FORMATS[HORUS_LENGTH_TO_FORMAT[len(data)]] + + + # Output dictionary + _output = { + 'packet_format': packet_format, + 'crc_ok': False, + 'payload_id': 0 + } + + # Check the length provided in the packet format matches up with the length defined by the struct. + _struct_length = struct.calcsize(packet_format['struct']) + if _struct_length != packet_format['length']: + raise ValueError(f"Decoder - Provided length {packet_format['length']} and struct length ({_struct_length}) do not match!") + + # Check the length of the input data bytes matches that of the struct. + if len(data) != _struct_length: + raise ValueError(f"Decoder - Input data has length {len(data)}, should be length {_struct_length}.") + + # Check the Checksum + _crc_ok = check_packet_crc(data, checksum=packet_format['checksum']) + + if not _crc_ok: + raise ValueError("Decoder - CRC Failure.") + else: + _output['crc_ok'] = True + + # Now try and decode the data. + _raw_fields = struct.unpack(packet_format['struct'], data) + + # Check the number of decoded fields is equal to the number of field definitions in the packet format. + if len(_raw_fields) != len(packet_format['fields']): + raise ValueError(f"Decoder - Packet format defines {len(packet_format['fields'])} fields, got {len(_raw_fields)} from struct.") + + # Now we can start extracting and formatting fields. + + _ukhas_fields = [] + for _i in range(len(_raw_fields)): + _field_name = packet_format['fields'][_i][0] + _field_type = packet_format['fields'][_i][1] + _field_data = _raw_fields[_i] + + + if _field_name == 'custom': + # Attempt to interpret custom fields. + # Note: This requires that the payload ID has been decoded prior to this field being parsed. + if _output['payload_id'] in HORUS_CUSTOM_FIELDS: + (_custom_data, _custom_str) = decode_custom_fields(_field_data, _output['payload_id']) + + # Add custom fields to string + _ukhas_fields.append(_custom_str) + + # Add custom fields to output dict. + for _field in _custom_data: + _output[_field] = _custom_data[_field] + + # Ignore checksum field. (and maybe other fields?) + elif _field_name not in ['checksum']: + # Decode field to string. + (_decoded, _decoded_str) = decode_field(_field_type, _field_data) + + _output[_field_name] = _decoded + + _ukhas_fields.append(_decoded_str) + + + # Convert to a UKHAS-compliant string. + _ukhas_str = ",".join(_ukhas_fields) + _ukhas_crc = ukhas_crc(_ukhas_str.encode('ascii')) + _output['ukhas_str'] = "$$" + _ukhas_str + "*" + _ukhas_crc + + return _output + + + + + +if __name__ == "__main__": + + tests = [ + ['horus_binary_v1', b'\x01\x12\x00\x00\x00\x23\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x1C\x9A\x95\x45', ''], + ['horus_binary_v1', b'\x01\x12\x00\x00\x00\x23\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x1C\x9A\x95\x45', 'error'], + ['horus_binary_v2_16byte', b'\x01\x12\x02\x00\x02\xbc\xeb!AR\x10\x00\xff\x00\xe1\x7e', ''], + # id seq_no HH MM SS lat lon alt spd sat tmp bat custom data -----------------------| crc16 + ['horus_binary_v2_32byte', b'\xFF\xFF\x12\x00\x00\x00\x23\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xe8\x82', ''] + ] + + for _test in tests: + _format = _test[0] + _input = _test[1] + _output = _test[2] + + try: + _decoded = decode_packet(_input) + print(f"Input ({_format}): {str(_input)} - Output: {_decoded['ukhas_str']}") + print(_decoded) + # Insert assert checks here. + + except ValueError as e: + print(f"Input ({_format}): {str(_input)} - Caught Error: {str(e)}") + assert(_output == 'error') + + + print("All tests passed!") + diff --git a/src/horusdemodlib/delegates.py b/src/horusdemodlib/delegates.py new file mode 100644 index 0000000..a7c99d6 --- /dev/null +++ b/src/horusdemodlib/delegates.py @@ -0,0 +1,227 @@ +# +# HorusLib - Decoder Delegate Functions +# + +import struct +import time + +from .payloads import HORUS_PAYLOAD_LIST, HORUS_CUSTOM_FIELDS + +# Payload ID + +def decode_payload_id(data: int) -> str: + """ + Attempt to decode a payload ID into a callsign string. + """ + + if type(data) != int: + return ValueError("payload_id - Invalid input type.") + + if data in HORUS_PAYLOAD_LIST: + _str = HORUS_PAYLOAD_LIST[data] + else: + _str = "UNKNOWN_PAYLOAD_ID" + + return (_str, _str) + + +# Time representations + +def decode_time_hms(data: bytes) -> str: + """ + Decode a time field, encoded as three bytes representing hours, minutes and seconds of the current UTC day. + + Returns: String, as "HH:MM:SS" + + Example: \x01\x02\x03 -> "01:02:03" + """ + + if len(data) != 3: + raise ValueError(f"time_hms - Input has incorrect length ({len(data)}), should be 3.") + + _hour = int(data[0]) + _minute = int(data[1]) + _second = int(data[2]) + + _str = f"{_hour:02d}:{_minute:02d}:{_second:02d}" + + return (_str, _str) + + +def decode_time_biseconds(data:int) -> str: + """ + Decode a time field, encoded as a uint16, representing seconds since the start of the UTC day, + divided by 2 ('biseconds') + + Returns: String, as "HH:MM:SS" + + Examples: + 0 -> 00:00:00 + 1 -> 00:00:02 + + """ + + if type(data) != int: + raise ValueError("time_biseconds - Invalid input type.") + + if (data < 0) or data > 43200: + raise ValueError("time_biseconds - Input of of range (0-43200)") + + _str = time.strftime("%H:%M:%S", time.gmtime(data*2)) + + return (_str, _str) + +# Latitude/Longitude representations + +def decode_degree_float(data:float) -> str: + """ + Convert a degree (latitude/longitude) field, provided as a float, + to a string representation, with 6 decimal places. + """ + if type(data) != float: + raise ValueError("decimal_degrees - Invalid input type.") + + return (data, f"{data:.6f}") + + +def decode_degree_fixed3(data:bytes) -> str: + """ + Convert a degree (latitude/longitude) field, provided as a + three-byte fixed-point representation, to a string. + + The input is interpreted as the 3 most-significant-bytes of a + little-endian 4-byte signed integer. The LSB is set to 0x00. + + Once converted to an int, the value is then scaled to degrees by + multilying by 1e-7. + + """ + + if type(data) != bytes: + raise ValueError("degree_fixed3 - Invalid input type.") + + if len(data) != 3: + raise ValueError("degree_fixed3 - Invalid input length.") + + # Add input onto a null byte + _temp = b'\x00' + data + + # Parse as a signed int. + _value = struct.unpack(' str: + """ + Decode a battery voltage, encoded as as a single byte, where + 0 = 0v, 255 = 5.0V, with linear steps in between. + """ + + if type(data) != int: + raise ValueError("battery_5v_byte - Invalid input type.") + + _batt = 5.0*data/255.0 + + return (_batt, f"{_batt:.2f}") + + +delegate_list = { + 'payload_id': decode_payload_id, + 'time_hms': decode_time_hms, + 'time_biseconds': decode_time_biseconds, + 'degree_float': decode_degree_float, + 'degree_fixed3': decode_degree_fixed3, + 'battery_5v_byte': decode_battery_5v_byte +} + +def decode_field(field_type:str, data): + """ Attempt to decode a field, supplied as bytes, using a specified delegate function """ + + if field_type in delegate_list: + return delegate_list[field_type](data) + else: + if (field_type == 'none') or (field_type == 'None') or (field_type == None): + # Basic datatype, just convert to a string using Pythons internal conversions. + if (type(data) == float) or (type(data) == int) or (type(data) == str): + return (data, f"{data}") + else: + raise ValueError(f"Data has unknown type ({str(type(data))}) and could not be decoded.") + else: + raise ValueError(f"Invalid field type - {field_type}") + + +def decode_custom_fields(data:bytes, payload_id:str): + """ Attempt to decode custom field data from the 9-byte custom section of a 32-byte payload """ + + if payload_id not in HORUS_CUSTOM_FIELDS: + raise ValueError(f"Custom Field Decoder - Unknown payload ID {payload_id}") + + _custom_field = HORUS_CUSTOM_FIELDS[payload_id] + _struct = _custom_field['struct'] + _struct_len = struct.calcsize(_struct) + _field_names = _custom_field['fields'] + + if type(data) != bytes: + raise ValueError("Custom Field Decoder - Invalid Input type.") + + if len(data) !=_struct_len: + raise ValueError(f"Custom Field Decoder - Invalid Input Length ({len(data)}, should be {_struct_len}).") + + # Attempt to parse the data. + _raw_fields = struct.unpack(_struct, data) + + if len(_field_names) != len(_raw_fields): + raise ValueError(f"Custom Field Decoder - Packet format defines {len(_field_names)} fields, got {len(_raw_fields)} from struct.") + + _output_fields = [] + _output_dict = {} + for _i in range(len(_raw_fields)): + _field_name = _field_names[_i][0] + _field_type = _field_names[_i][1] + _field_data = _raw_fields[_i] + + # Decode field to string. + (_decoded, _decoded_str) = decode_field(_field_type, _field_data) + + _output_dict[_field_name] = _decoded + + _output_fields.append(_decoded_str) + + _output_fields_str = ",".join(_output_fields) + + return (_output_dict, _output_fields_str) + + + + +if __name__ == "__main__": + + tests = [ + ['time_hms', b'\x01\x02\x03', "01:02:03"], + ['time_hms', b'\x17\x3b\x3b', "23:59:59"], + ['time_biseconds', 0, "00:00:00"], + ['time_biseconds', 1, "00:00:02"], + ['time_biseconds', 43199, "23:59:58"], + ['time_biseconds', 43200, "00:00:00"], + ['degree_float', 0.0, "0.000000"], + ['degree_float', 0.001, "0.001000"], + ['degree_float', -34.01, "-34.010000"], + ['degree_float', -138.000001, "-138.000001"], + ['degree_fixed3', b'\x00\x00\x00', "0.0"], + ['battery_5v_byte', 0, "0.00"], + ['battery_5v_byte', 128, "2.51"], + ['battery_5v_byte', 255, "5.00"], + ] + + for _test in tests: + _field_type = _test[0] + _input = _test[1] + _output = _test[2] + + _decoded = decode_field(_field_type, _input) + print(f"{_field_type} {str(_input)} -> {_decoded}") + assert(_decoded == _output) + + print("All tests passed!") \ No newline at end of file diff --git a/src/horusdemodlib/payloads.py b/src/horusdemodlib/payloads.py new file mode 100644 index 0000000..55d79fb --- /dev/null +++ b/src/horusdemodlib/payloads.py @@ -0,0 +1,189 @@ +# +# HorusLib - Payload ID List +# +import json +import logging +import requests + +# Global payload list +HORUS_PAYLOAD_LIST = {0:'4FSKTEST', 1:'HORUSBINARY', 65535:'HORUSTEST'} + +# URL for payload list +# TODO: Move this into horusdemodlib repo +PAYLOAD_ID_LIST_URL = "https://raw.githubusercontent.com/projecthorus/horusbinary/master/payload_id_list.txt" + +# Custom field data. +HORUS_CUSTOM_FIELDS = { + "HORUSTEST": { + "struct": "