kopia lustrzana https://github.com/projecthorus/horusdemodlib
Add RTTY parser
rodzic
f9a7e0a4ba
commit
30c7c22a68
|
@ -1 +1 @@
|
|||
__version__ = "0.1.2"
|
||||
__version__ = "0.1.3"
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
# HorusLib - Binary Packet Decoder Functions
|
||||
#
|
||||
import codecs
|
||||
import datetime
|
||||
import struct
|
||||
import time
|
||||
from .delegates import *
|
||||
|
@ -159,6 +160,9 @@ def decode_packet(data:bytes, packet_format:dict = None) -> dict:
|
|||
_ukhas_crc = ukhas_crc(_ukhas_str.encode('ascii'))
|
||||
_output['ukhas_str'] = "$$" + _ukhas_str + "*" + _ukhas_crc
|
||||
|
||||
# Duplicate some fields for parsing later
|
||||
_output['callsign'] = _output['payload_id']
|
||||
|
||||
return _output
|
||||
|
||||
|
||||
|
@ -172,6 +176,79 @@ def hex_to_bytes(data:str) -> bytes:
|
|||
return None
|
||||
|
||||
|
||||
def parse_ukhas_string(sentence:str) -> dict:
|
||||
""" Attempt to decode a UKHAS telemetry sentence into a dictionary """
|
||||
|
||||
# Try and convert from bytes to str if necessary
|
||||
if type(sentence) == bytes:
|
||||
sentence = sentence.decode('ascii')
|
||||
|
||||
# Try and proceed through the following. If anything fails, we have a corrupt sentence.
|
||||
try:
|
||||
# Strip out any leading/trailing whitespace.
|
||||
_sentence = sentence.strip()
|
||||
|
||||
# First, try and find the start of the sentence, which always starts with '$$''
|
||||
_sentence = _sentence.split('$$')[-1]
|
||||
# Hack to handle odd numbers of $$'s at the start of a sentence
|
||||
if _sentence[0] == '$':
|
||||
_sentence = _sentence[1:]
|
||||
# Now try and split out the telemetry from the CRC16.
|
||||
_telem = _sentence.split('*')[0]
|
||||
_crc = _sentence.split('*')[1]
|
||||
|
||||
# Now check if the CRC matches.
|
||||
_calc_crc = ukhas_crc(_telem.encode('ascii'))
|
||||
|
||||
if _calc_crc != _crc:
|
||||
raise ValueError("Could not parse RTTY Sentence - CRC Fail.")
|
||||
|
||||
# We now have a valid sentence! Extract fields..
|
||||
_fields = _telem.split(',')
|
||||
|
||||
_callsign = _fields[0]
|
||||
_time = _fields[2]
|
||||
_latitude = float(_fields[3])
|
||||
_longitude = float(_fields[4])
|
||||
_altitude = int(_fields[5])
|
||||
# The rest we don't care about.
|
||||
|
||||
# Perform some sanity checks on the data.
|
||||
|
||||
# Attempt to parse the time string. This will throw an error if any values are invalid.
|
||||
try:
|
||||
_time_dt = datetime.datetime.strptime(_time, "%H:%M:%S")
|
||||
except:
|
||||
raise ValueError("Could not parse RTTY Sentence - Invalid Time.")
|
||||
|
||||
# Check if the lat/long is 0.0,0.0 - no point passing this along.
|
||||
if _latitude == 0.0 or _longitude == 0.0:
|
||||
raise ValueError("Could not parse RTTY Sentence - Zero Lat/Long.")
|
||||
|
||||
# Place a limit on the altitude field. We generally store altitude on the payload as a uint16, so it shouldn't fall outside these values.
|
||||
if _altitude > 65535 or _altitude < 0:
|
||||
raise ValueError("Could not parse RTTY Sentence - Invalid Altitude.")
|
||||
|
||||
# Produce a dict output which is compatible with the output of the binary decoder.
|
||||
_telem = {
|
||||
'callsign': _callsign,
|
||||
'time': _time,
|
||||
'latitude': _latitude,
|
||||
'longitude': _longitude,
|
||||
'altitude': _altitude,
|
||||
'speed': -1,
|
||||
'heading': -1,
|
||||
'temp': -1,
|
||||
'sats': -1,
|
||||
'batt_voltage': -1
|
||||
}
|
||||
|
||||
return _telem
|
||||
|
||||
except Exception as e:
|
||||
raise ValueError("Could not parse ASCII Sentence - %s" % str(e))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import argparse
|
||||
import sys
|
||||
|
@ -209,6 +286,7 @@ if __name__ == "__main__":
|
|||
|
||||
if args.test:
|
||||
|
||||
# Binary packet decoder tests
|
||||
tests = [
|
||||
['horus_binary_v1', b'\x01\x12\x00\x00\x00\x23\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x1C\x9A\x95\x45', ''],
|
||||
['horus_binary_v1', b'\x01\x12\x00\x00\x00\x23\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x1C\x9A\x95\x45', 'error'],
|
||||
|
@ -232,6 +310,17 @@ if __name__ == "__main__":
|
|||
print(f"Input ({_format}): {str(_input)} - Caught Error: {str(e)}")
|
||||
assert(_output == 'error')
|
||||
|
||||
# RTTY Decoder Tests
|
||||
tests = [
|
||||
'$$HORUS,6,06:43:16,0.000000,0.000000,0,0,0,1801,20*1DA2'
|
||||
]
|
||||
|
||||
for _test in tests:
|
||||
try:
|
||||
_decoded = parse_ukhas_string(_test)
|
||||
print(_decoded)
|
||||
except ValueError as e:
|
||||
print(f"Caught Error: {str(e)}")
|
||||
|
||||
print("All tests passed!")
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[tool.poetry]
|
||||
name = "horusdemodlib"
|
||||
version = "0.1.2"
|
||||
version = "0.1.3"
|
||||
description = "Project Horus HAB Telemetry Demodulators"
|
||||
authors = ["Mark Jessop"]
|
||||
license = "LGPL-2.1-or-later"
|
||||
|
|
Ładowanie…
Reference in New Issue