Add horusdemodlib python files

pull/2/head
Mark Jessop 2020-07-04 19:12:23 +09:30
rodzic f263b08053
commit d6d702fff2
10 zmienionych plików z 805 dodań i 3 usunięć

Wyświetl plik

@ -7,22 +7,34 @@ This library includes the following:
* The 'Wenet' demodulator, used to downlink imagery from HAB payloads.
## HorusDemodLib C Library
This contains the demodulator portions of horuslib, which are written in C.
## Building
```
$ git clone https://github.com/projecthorus/horuslib.git
$ cd horuslib && mkdir build_linux && cd build_linux
$ git clone https://github.com/projecthorus/horusdemodlib.git
$ cd horusdemodlib && mkdir build && cd build
$ cmake ..
$ make
$ make install
```
## Testing
```
$ cd horus/build_linux
$ cd build
$ ctest
```
## HorusDemodLib Python Wrapper
```
$ pip install -r requirements.txt
$ pip install -e .
```
## Further Reading
Here are some links to projects and blog posts that use this code:

Wyświetl plik

@ -0,0 +1,21 @@
{
"HORUSTEST": {
"struct": "<BbBfH",
"fields": [
["cutdown_battery_voltage", "battery_5v_byte"],
["external_temperature", "none"],
["test_counter", "none"],
["test_float_field", "none"],
["test_int_field", "none"]
]
},
"HORUSTEST2": {
"struct": "<BbBH4x",
"fields": [
["cutdown_battery_voltage", "battery_5v_byte"],
["external_temperature", "none"],
["test_counter", "none"],
["test_int_field", "none"]
]
}
}

Wyświetl plik

@ -0,0 +1,31 @@
# HORUS BINARY PAYLOAD ID LIST
# 2019-06-02
#
# Payload IDs 0 through 255 are available. If we get near this limit,
# the payload format may need to be re-evaluated.
#
# Request a payload ID by either raising an issue, or submitting a pull request
# on the horusbinary github page: https://github.com/projecthorus/horusbinary
#
0, 4FSKTEST
1, HORUSBINARY
2, VK5BRL
3, STRATOS
4, HORUSBINARY2
5, HORUSBINARY3
6, PE2BZ-4FSK
7, SP8NCG-4FSK
8, PY2UEP-4FSK
9, PC4L-4FSK
10, HAPPYSAT-4FSK
11, PH1M-4FSK
12, DK0WT-4FSK
13, PE1ANS-4FSK
14, WBLSCOUTS-4FSK
15, PB0AHX-4FSK
16, PRESCOTTSOUTH
17, EAGLE-1-4FSK
18, VK8TH-4FSK
19, VK5NTM-4FSK
23, MAGNU
31, ICEDVOVO

3
requirements.txt 100644
Wyświetl plik

@ -0,0 +1,3 @@
requests
crcmod
ruamel.yaml

44
setup.py 100755
Wyświetl plik

@ -0,0 +1,44 @@
import os
import re
from setuptools import setup, find_packages
regexp = re.compile(r".*__version__ = [\'\"](.*?)[\'\"]", re.S)
init_file = os.path.join(os.path.dirname(__file__), "src", "horusdemodlib", "__init__.py")
with open(init_file, "r") as f:
module_content = f.read()
match = regexp.match(module_content)
if match:
version = match.group(1)
else:
raise RuntimeError(f"Cannot find __version__ in {init_file}")
with open("README.md", "r") as f:
readme = f.read()
with open("requirements.txt", "r") as f:
requirements = []
for line in f.read().split("\n"):
line = line.strip()
if line and not line.startswith("#"):
requirements.append(line)
if __name__ == "__main__":
setup(
name="horusdemodlib",
description="Project Horus Telemetry Decoding Library",
long_description=readme,
version=version,
install_requires=requirements,
keywords=["horus modem telemetry radio"],
package_dir={"": "src"},
packages=find_packages("src"),
classifiers=[
"Intended Audience :: Developers",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
]
)

Wyświetl plik

@ -0,0 +1 @@
__version__ = "0.0.1"

Wyświetl plik

@ -0,0 +1,79 @@
#
# HorusLib - Checksumming functions
#
import crcmod
import logging
import struct
def ukhas_crc(data:bytes) -> str:
"""
Calculate the CRC16 CCITT checksum of *data*.
(CRC16 CCITT: start 0xFFFF, poly 0x1021)
"""
crc16 = crcmod.predefined.mkCrcFun('crc-ccitt-false')
return hex(crc16(data))[2:].upper().zfill(4)
def check_packet_crc(data:bytes, checksum:str='crc16'):
"""
Attempt to validate a packets checksum, which is assumed to be present
in the last few bytes of the packet.
Support CRC types: CRC16-CCITT
"""
if (checksum == 'crc16') or (checksum == 'CRC16') or (checksum == 'crc16-ccitt') or (checksum == 'CRC16-CCITT'):
# Check we have enough data for a sane CRC16.
if len(data) < 3:
raise ValueError(f"Checksum - Not enough data for CRC16!")
# Decode the last 2 bytes as a uint16
_packet_checksum = struct.unpack('<H', data[-2:])[0]
# Calculate a CRC over the rest of the data
_crc16 = crcmod.predefined.mkCrcFun('crc-ccitt-false')
_calculated_crc = _crc16(data[:-2])
if _calculated_crc == _packet_checksum:
return True
else:
logging.debug(f"Calculated: {hex(_calculated_crc)}, Packet: {hex(_packet_checksum)}")
return False
else:
raise ValueError(f"Checksum - Unknown Checksym type {checksum}.")
if __name__ == "__main__":
# Setup Logging
logging.basicConfig(
format="%(asctime)s %(levelname)s: %(message)s", level=logging.DEBUG
)
tests = [
['crc16', b'\x01\x12\x00\x00\x00\x23\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x1C\x9A\x95\x45', True],
['crc16', b'\x01\x12\x00\x00\x00\x23\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x1C\x9A\x95\x45', False],
['crc16', b'\x01\x12\x02\x00\x02\xbc\xeb!AR\x10\x00\xff\x00\xe1\x7e', True],
# id seq_no HH MM SS lat lon alt spd sat tmp bat custom data
['crc16', b'\xFF\xFF\x12\x00\x00\x00\x23\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xe8\x82', True],
]
for _test in tests:
_format = _test[0]
_input = _test[1]
_output = _test[2]
_decoded = check_packet_crc(_input, _format)
print(f"Packet: {_input}. CRC OK: {_decoded}")
assert(_decoded == _output)
print("All tests passed!")

Wyświetl plik

@ -0,0 +1,195 @@
#
# HorusLib - Binary Packet Decoder Functions
#
import struct
import time
from .delegates import *
from .checksums import *
from .payloads import HORUS_CUSTOM_FIELDS, HORUS_PAYLOAD_LIST
#
# Horus Binary V1 and V2 Packet Formats
#
HORUS_PACKET_FORMATS = {
'horus_binary_v1': {
'name': 'Horus Binary v1 22 Byte Format',
'length': 22,
'struct': '<BH3sffHBBbBH',
'checksum': 'crc16',
'fields': [
['payload_id', 'payload_id'],
['sequence_number', 'none'],
['time', 'time_hms'],
['latitude', 'degree_float'],
['longitude', 'degree_float'],
['altitude', 'none'],
['speed', 'none'],
['satellites', 'none'],
['temperature', 'none'],
['battery_voltage', 'battery_5v_byte'],
['checksum', 'none']
]
},
'horus_binary_v2_16byte': {
'name': 'Horus Binary v2 16 Byte Format',
'length': 16,
'struct': '<BBH3s3sHBBH',
'checksum': 'crc16',
'fields': [
['payload_id', 'payload_id'],
['sequence_number', 'none'],
['time', 'time_biseconds'],
['latitude', 'degree_fixed3'],
['longitude', 'degree_fixed3'],
['altitude', 'none'],
['battery_voltage', 'battery_5v_byte'],
['flags', 'none'],
['checksum', 'none']
]
},
'horus_binary_v2_32byte': {
'name': 'Horus Binary v2 32 Byte Format',
'length': 32,
'struct': '<HH3sffHBBbB9sH',
'checksum': 'crc16',
'fields': [
['payload_id', 'payload_id'],
['sequence_number', 'none'],
['time', 'time_hms'],
['latitude', 'degree_float'],
['longitude', 'degree_float'],
['altitude', 'none'],
['speed', 'none'],
['satellites', 'none'],
['temperature', 'none'],
['battery_voltage', 'battery_5v_byte'],
['custom', 'custom'],
['checksum', 'none']
]
}
}
# Lookup for packet length to the appropriate format.
HORUS_LENGTH_TO_FORMAT = {
22: 'horus_binary_v1',
16: 'horus_binary_v2_16byte',
32: 'horus_binary_v2_32byte'
}
def decode_packet(data:bytes, packet_format:dict = None) -> dict:
"""
Attempt to decode a set of bytes based on a provided packet format.
"""
if packet_format is None:
# Attempt to lookup the format based on the length of the data if it has not been provided.
if len(data) in HORUS_LENGTH_TO_FORMAT:
packet_format = HORUS_PACKET_FORMATS[HORUS_LENGTH_TO_FORMAT[len(data)]]
# Output dictionary
_output = {
'packet_format': packet_format,
'crc_ok': False,
'payload_id': 0
}
# Check the length provided in the packet format matches up with the length defined by the struct.
_struct_length = struct.calcsize(packet_format['struct'])
if _struct_length != packet_format['length']:
raise ValueError(f"Decoder - Provided length {packet_format['length']} and struct length ({_struct_length}) do not match!")
# Check the length of the input data bytes matches that of the struct.
if len(data) != _struct_length:
raise ValueError(f"Decoder - Input data has length {len(data)}, should be length {_struct_length}.")
# Check the Checksum
_crc_ok = check_packet_crc(data, checksum=packet_format['checksum'])
if not _crc_ok:
raise ValueError("Decoder - CRC Failure.")
else:
_output['crc_ok'] = True
# Now try and decode the data.
_raw_fields = struct.unpack(packet_format['struct'], data)
# Check the number of decoded fields is equal to the number of field definitions in the packet format.
if len(_raw_fields) != len(packet_format['fields']):
raise ValueError(f"Decoder - Packet format defines {len(packet_format['fields'])} fields, got {len(_raw_fields)} from struct.")
# Now we can start extracting and formatting fields.
_ukhas_fields = []
for _i in range(len(_raw_fields)):
_field_name = packet_format['fields'][_i][0]
_field_type = packet_format['fields'][_i][1]
_field_data = _raw_fields[_i]
if _field_name == 'custom':
# Attempt to interpret custom fields.
# Note: This requires that the payload ID has been decoded prior to this field being parsed.
if _output['payload_id'] in HORUS_CUSTOM_FIELDS:
(_custom_data, _custom_str) = decode_custom_fields(_field_data, _output['payload_id'])
# Add custom fields to string
_ukhas_fields.append(_custom_str)
# Add custom fields to output dict.
for _field in _custom_data:
_output[_field] = _custom_data[_field]
# Ignore checksum field. (and maybe other fields?)
elif _field_name not in ['checksum']:
# Decode field to string.
(_decoded, _decoded_str) = decode_field(_field_type, _field_data)
_output[_field_name] = _decoded
_ukhas_fields.append(_decoded_str)
# Convert to a UKHAS-compliant string.
_ukhas_str = ",".join(_ukhas_fields)
_ukhas_crc = ukhas_crc(_ukhas_str.encode('ascii'))
_output['ukhas_str'] = "$$" + _ukhas_str + "*" + _ukhas_crc
return _output
if __name__ == "__main__":
tests = [
['horus_binary_v1', b'\x01\x12\x00\x00\x00\x23\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x1C\x9A\x95\x45', ''],
['horus_binary_v1', b'\x01\x12\x00\x00\x00\x23\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x1C\x9A\x95\x45', 'error'],
['horus_binary_v2_16byte', b'\x01\x12\x02\x00\x02\xbc\xeb!AR\x10\x00\xff\x00\xe1\x7e', ''],
# id seq_no HH MM SS lat lon alt spd sat tmp bat custom data -----------------------| crc16
['horus_binary_v2_32byte', b'\xFF\xFF\x12\x00\x00\x00\x23\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xe8\x82', '']
]
for _test in tests:
_format = _test[0]
_input = _test[1]
_output = _test[2]
try:
_decoded = decode_packet(_input)
print(f"Input ({_format}): {str(_input)} - Output: {_decoded['ukhas_str']}")
print(_decoded)
# Insert assert checks here.
except ValueError as e:
print(f"Input ({_format}): {str(_input)} - Caught Error: {str(e)}")
assert(_output == 'error')
print("All tests passed!")

Wyświetl plik

@ -0,0 +1,227 @@
#
# HorusLib - Decoder Delegate Functions
#
import struct
import time
from .payloads import HORUS_PAYLOAD_LIST, HORUS_CUSTOM_FIELDS
# Payload ID
def decode_payload_id(data: int) -> str:
"""
Attempt to decode a payload ID into a callsign string.
"""
if type(data) != int:
return ValueError("payload_id - Invalid input type.")
if data in HORUS_PAYLOAD_LIST:
_str = HORUS_PAYLOAD_LIST[data]
else:
_str = "UNKNOWN_PAYLOAD_ID"
return (_str, _str)
# Time representations
def decode_time_hms(data: bytes) -> str:
"""
Decode a time field, encoded as three bytes representing hours, minutes and seconds of the current UTC day.
Returns: String, as "HH:MM:SS"
Example: \x01\x02\x03 -> "01:02:03"
"""
if len(data) != 3:
raise ValueError(f"time_hms - Input has incorrect length ({len(data)}), should be 3.")
_hour = int(data[0])
_minute = int(data[1])
_second = int(data[2])
_str = f"{_hour:02d}:{_minute:02d}:{_second:02d}"
return (_str, _str)
def decode_time_biseconds(data:int) -> str:
"""
Decode a time field, encoded as a uint16, representing seconds since the start of the UTC day,
divided by 2 ('biseconds')
Returns: String, as "HH:MM:SS"
Examples:
0 -> 00:00:00
1 -> 00:00:02
"""
if type(data) != int:
raise ValueError("time_biseconds - Invalid input type.")
if (data < 0) or data > 43200:
raise ValueError("time_biseconds - Input of of range (0-43200)")
_str = time.strftime("%H:%M:%S", time.gmtime(data*2))
return (_str, _str)
# Latitude/Longitude representations
def decode_degree_float(data:float) -> str:
"""
Convert a degree (latitude/longitude) field, provided as a float,
to a string representation, with 6 decimal places.
"""
if type(data) != float:
raise ValueError("decimal_degrees - Invalid input type.")
return (data, f"{data:.6f}")
def decode_degree_fixed3(data:bytes) -> str:
"""
Convert a degree (latitude/longitude) field, provided as a
three-byte fixed-point representation, to a string.
The input is interpreted as the 3 most-significant-bytes of a
little-endian 4-byte signed integer. The LSB is set to 0x00.
Once converted to an int, the value is then scaled to degrees by
multilying by 1e-7.
"""
if type(data) != bytes:
raise ValueError("degree_fixed3 - Invalid input type.")
if len(data) != 3:
raise ValueError("degree_fixed3 - Invalid input length.")
# Add input onto a null byte
_temp = b'\x00' + data
# Parse as a signed int.
_value = struct.unpack('<i', _temp)[0]
_value_degrees = _value * 1e-7
return (_value_degrees, f"{_value_degrees:.6f}")
def decode_battery_5v_byte(data: int) -> str:
"""
Decode a battery voltage, encoded as as a single byte, where
0 = 0v, 255 = 5.0V, with linear steps in between.
"""
if type(data) != int:
raise ValueError("battery_5v_byte - Invalid input type.")
_batt = 5.0*data/255.0
return (_batt, f"{_batt:.2f}")
delegate_list = {
'payload_id': decode_payload_id,
'time_hms': decode_time_hms,
'time_biseconds': decode_time_biseconds,
'degree_float': decode_degree_float,
'degree_fixed3': decode_degree_fixed3,
'battery_5v_byte': decode_battery_5v_byte
}
def decode_field(field_type:str, data):
""" Attempt to decode a field, supplied as bytes, using a specified delegate function """
if field_type in delegate_list:
return delegate_list[field_type](data)
else:
if (field_type == 'none') or (field_type == 'None') or (field_type == None):
# Basic datatype, just convert to a string using Pythons internal conversions.
if (type(data) == float) or (type(data) == int) or (type(data) == str):
return (data, f"{data}")
else:
raise ValueError(f"Data has unknown type ({str(type(data))}) and could not be decoded.")
else:
raise ValueError(f"Invalid field type - {field_type}")
def decode_custom_fields(data:bytes, payload_id:str):
""" Attempt to decode custom field data from the 9-byte custom section of a 32-byte payload """
if payload_id not in HORUS_CUSTOM_FIELDS:
raise ValueError(f"Custom Field Decoder - Unknown payload ID {payload_id}")
_custom_field = HORUS_CUSTOM_FIELDS[payload_id]
_struct = _custom_field['struct']
_struct_len = struct.calcsize(_struct)
_field_names = _custom_field['fields']
if type(data) != bytes:
raise ValueError("Custom Field Decoder - Invalid Input type.")
if len(data) !=_struct_len:
raise ValueError(f"Custom Field Decoder - Invalid Input Length ({len(data)}, should be {_struct_len}).")
# Attempt to parse the data.
_raw_fields = struct.unpack(_struct, data)
if len(_field_names) != len(_raw_fields):
raise ValueError(f"Custom Field Decoder - Packet format defines {len(_field_names)} fields, got {len(_raw_fields)} from struct.")
_output_fields = []
_output_dict = {}
for _i in range(len(_raw_fields)):
_field_name = _field_names[_i][0]
_field_type = _field_names[_i][1]
_field_data = _raw_fields[_i]
# Decode field to string.
(_decoded, _decoded_str) = decode_field(_field_type, _field_data)
_output_dict[_field_name] = _decoded
_output_fields.append(_decoded_str)
_output_fields_str = ",".join(_output_fields)
return (_output_dict, _output_fields_str)
if __name__ == "__main__":
tests = [
['time_hms', b'\x01\x02\x03', "01:02:03"],
['time_hms', b'\x17\x3b\x3b', "23:59:59"],
['time_biseconds', 0, "00:00:00"],
['time_biseconds', 1, "00:00:02"],
['time_biseconds', 43199, "23:59:58"],
['time_biseconds', 43200, "00:00:00"],
['degree_float', 0.0, "0.000000"],
['degree_float', 0.001, "0.001000"],
['degree_float', -34.01, "-34.010000"],
['degree_float', -138.000001, "-138.000001"],
['degree_fixed3', b'\x00\x00\x00', "0.0"],
['battery_5v_byte', 0, "0.00"],
['battery_5v_byte', 128, "2.51"],
['battery_5v_byte', 255, "5.00"],
]
for _test in tests:
_field_type = _test[0]
_input = _test[1]
_output = _test[2]
_decoded = decode_field(_field_type, _input)
print(f"{_field_type} {str(_input)} -> {_decoded}")
assert(_decoded == _output)
print("All tests passed!")

Wyświetl plik

@ -0,0 +1,189 @@
#
# HorusLib - Payload ID List
#
import json
import logging
import requests
# Global payload list
HORUS_PAYLOAD_LIST = {0:'4FSKTEST', 1:'HORUSBINARY', 65535:'HORUSTEST'}
# URL for payload list
# TODO: Move this into horusdemodlib repo
PAYLOAD_ID_LIST_URL = "https://raw.githubusercontent.com/projecthorus/horusbinary/master/payload_id_list.txt"
# Custom field data.
HORUS_CUSTOM_FIELDS = {
"HORUSTEST": {
"struct": "<BbBfH",
"fields": [
["cutdown_battery_voltage", "battery_5v_byte"],
["external_temperature", "none"],
["test_counter", "none"],
["test_float_field", "none"],
["test_int_field", "none"]
]
},
"HORUSTEST2": {
"struct": "<BbBH4x",
"fields": [
["cutdown_battery_voltage", "battery_5v_byte"],
["external_temperature", "none"],
["test_counter", "none"],
["test_int_field", "none"]
]
}
}
# Custom Field JSON URL
HORUS_CUSTOM_FIELD_URL = ""
def read_payload_list(filename="payload_id_list.txt"):
""" Read in the payload ID list, and return the parsed data as a dictionary """
# Dummy payload list.
payload_list = HORUS_PAYLOAD_LIST
try:
with open(filename,'r') as file:
for line in file:
# Skip comment lines.
if line[0] == '#':
continue
else:
# Attempt to split the line with a comma.
_params = line.split(',')
if len(_params) != 2:
# Invalid line.
logging.error("Could not parse line: %s" % line)
else:
try:
_id = int(_params[0])
_callsign = _params[1].strip()
payload_list[_id] = _callsign
except:
logging.error("Error parsing line: %s" % line)
except Exception as e:
logging.error("Error reading Payload ID list, does it exist? - %s" % str(e))
logging.debug("Known Payload IDs:")
for _payload in payload_list:
logging.debug("\t%s - %s" % (_payload, payload_list[_payload]))
return payload_list
def grab_latest_payload_id_list(url=PAYLOAD_ID_LIST_URL, local_file="payload_id_list.txt"):
""" Attempt to download the latest payload ID list from Github """
# Download the list.
try:
logging.info("Attempting to download latest payload ID list from GitHub...")
_r = requests.get(url, timeout=10)
except Exception as e:
logging.error("Unable to get latest payload ID list: %s" % str(e))
return False
# Check it is what we think it is..
if "HORUS BINARY PAYLOAD ID LIST" not in _r.text:
logging.error("Downloaded payload ID list is invalid.")
return False
# So now we most likely have a valid payload ID list, so write it out.
with open(local_file, 'w') as f:
f.write(_r.text)
return True
def init_payload_id_list():
""" Initialise and update the local payload ID list. """
grab_latest_payload_id_list()
HORUS_PAYLOAD_LIST = read_payload_list()
def read_custom_field_list(filename="custom_field_list.json"):
"""
Read in a JSON file containing descriptions of custom payload fields,
for use with the Horus Binary v2 32-byte payload format.
"""
_custom_field_list = HORUS_CUSTOM_FIELDS
try:
# Read in entirity of file contents.
_f = open(filename, 'r')
_raw_data = _f.read()
_f.close()
# Attempt to parse JSON
_field_data = json.loads(_raw_data)
if type(_field_data) != dict:
logging.error("Error reading custom field list, using defaults.")
return _custom_field_list
# Iterate through fields in the file we just read in
for _payload in _field_data:
_data = _field_data[_payload]
if ("struct" in _data) and ("fields" in _data):
_custom_field_list[_payload] = {
"struct": _data["struct"],
"fields": _data["fields"]
}
logging.debug(f"Loaded custom field data for {_payload}.")
return _custom_field_list
except Exception as e:
logging.error(f"Error parsing custom field list file ({filename}): {str(e)}")
return _custom_field_list
def grab_latest_custom_field_list(url=HORUS_CUSTOM_FIELD_URL, local_file="custom_field_list.json"):
""" Attempt to download the latest custom field list from Github """
# Download the list.
try:
logging.info("Attempting to download latest custom field list from GitHub...")
_r = requests.get(url, timeout=10)
except Exception as e:
logging.error("Unable to get latest custom field list: %s" % str(e))
return False
# Check it is what we think it is..
# (Currently checking for the presence of one of the test payloads)
if "HORUSTEST" not in _r.text:
logging.error("Downloaded custom field list is invalid.")
return False
# So now we most likely have a valid custom field list, so write it out.
with open(local_file, 'w') as f:
f.write(_r.text)
return True
def init_custom_field_list():
""" Initialise and update the local custom field list """
grab_latest_custom_field_list()
HORUS_CUSTOM_FIELDS = read_custom_field_list()
if __name__ == "__main__":
# Setup Logging
logging.basicConfig(
format="%(asctime)s %(levelname)s: %(message)s", level=logging.DEBUG
)
init_payload_id_list()
print(HORUS_PAYLOAD_LIST)
init_custom_field_list()
print(HORUS_CUSTOM_FIELDS)