Add testing script to payloads module

Mark Jessop 2024-01-31 11:51:06 +10:30
rodzic aa0ebdf6d1
commit 0e3df15067
2 zmienionych plików z 110 dodań i 25 usunięć

Wyświetl plik

@ -5,6 +5,7 @@
import ctypes
from ctypes import *
import codecs
import datetime
import logging
import sys
from enum import Enum
@ -12,27 +13,12 @@ import os
import logging
from .decoder import decode_packet, hex_to_bytes
class Encoder():
HorusLib provides a binding to horuslib to demoulate frames.
Horus Binary Encoder class.
Example usage:
from horuslib import HorusLib, Mode
with HorusLib(, mode=Mode.BINARY, verbose=False) as horus:
with open("test.wav", "rb") as f:
while True:
data =*2)
if horus.nin != 0 and data == b'': #detect end of file
output = horus.demodulate(data)
if output.crc_pass and
print(f'{} SNR: {output.snr}')
for x in range(horus.mfsk):
print(f'F{str(x)}: {float(output.extended_stats.f_est[x])}')
Allows creation of a Horus Binary packet.
@ -97,6 +83,8 @@ class Encoder():
# Wrappers for libhorus C functions that we need.
def get_num_tx_data_bytes(self, packet_size) -> int:
Calculate the number of transmit data bytes (uw+packet+fec) for a given
@ -161,6 +149,97 @@ class Encoder():
self.c_lib.horus_l2_decode_rx_packet(_decoded, _encoded, num_payload_bytes)
return bytes(_decoded)
def create_horus_v2_packet(self,
payload_id = 256,
sequence_number = 0,
time_dt = datetime.datetime.utcnow(),
latitude = 0.0,
longitude = 0.0,
altitude = 0.0,
speed = 0.0,
satellites = 0,
temperature = 0.0,
battery_voltage = 0.0,
# Default fields used for the 'custom' section of the packet.
ascent_rate = 0.0,
ext_temperature = 0.0,
ext_humidity = 0.0,
ext_pressure = 0.0,
# Alternate custom data - must be bytes, and length=9
custom_data = None
# Sanity check input data.
if payload_id < 256 or payload_id > 65535:
raise ValueError("Invalid Horus v2 Payload ID. (Must be 256-65535)")
# Clip sequence number
sequence_number = int(sequence_number) % 65536
# Try and extract HHMMSS from time
hours = int(time_dt.hour)
minutes = int(time_dt.minute)
seconds = int(time_dt.second)
raise ValueError("Could not parse input datetime object.")
# Assume lat/lon are fine. They are just sent as floats anyway.
# Clip Altitude
altitude = int(altitude)
if altitude < 0:
altitude = 0
if altitude > 65535:
altitude = 65535
# Clip Speed (kph)
speed = int(speed)
if speed < 0:
speed = 0
if speed > 255:
speed = 255
# Clip sats
satellites = int(satellites)
if satellites < 0:
satellites = 0
if satellites > 255:
satellites = 255
# Temperature
# Battery voltage clip and conversion
# Custom data
# Ascent rate
# PTU data
# 'struct': '<HH3sffHBBbB9sH',
# 'checksum': 'crc16',
# 'fields': [
# ['payload_id', 'payload_id'],
# ['sequence_number', 'none'],
# ['time', 'time_hms'],
# ['latitude', 'degree_float'],
# ['longitude', 'degree_float'],
# ['altitude', 'none'],
# ['speed', 'none'],
# ['satellites', 'none'],
# ['temperature', 'none'],
# ['battery_voltage', 'battery_5v_byte'],
# ['custom', 'custom'],
# ['checksum', 'none']
# ]
if __name__ == "__main__":
@ -174,6 +253,7 @@ if __name__ == "__main__":
print("Encoder Tests: ")
horus_v1_unencoded = "000900071E2A000000000000000000000000259A6B14"
horus_v1_unencoded = "e701010000000000000000000000000000000022020000000000000000006e8e"
print(f" Horus v1 Input: {horus_v1_unencoded}")
horus_v1_unencoded_bytes = codecs.decode(horus_v1_unencoded, 'hex')
(_encoded, _num_bytes) = e.horus_l2_encode_packet(horus_v1_unencoded_bytes)

Wyświetl plik

@ -327,15 +327,20 @@ def update_payload_lists(payload_list, custom_field_list):
if __name__ == "__main__":
import argparse
# Setup Logging
format="%(asctime)s %(levelname)s: %(message)s", level=logging.DEBUG
# Read command-line arguments
parser = argparse.ArgumentParser(description="Test script for payload ID lists", formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--download", action="store_false", default=True, help="Download lists from github, then check")
parser.add_argument("--print", action="store_true", default=False, help="Print content of payload ID lists")
args = parser.parse_args()
# Set up logging
logging.basicConfig(format="%(asctime)s %(levelname)s: %(message)s", level=logging.DEBUG)
if args.print: