Rejig loading of payload/custom field configs

pull/21/head
Mark Jessop 2020-07-12 11:38:00 +09:30
rodzic 39eb32208f
commit 71aa870d16
3 zmienionych plików z 117 dodań i 19 usunięć

Wyświetl plik

@ -1 +1 @@
__version__ = "0.1.10"
__version__ = "0.1.11"

Wyświetl plik

@ -5,11 +5,10 @@ import json
import logging
import requests
# Global payload list
# Global payload list - Basic version
HORUS_PAYLOAD_LIST = {0:'4FSKTEST', 1:'HORUSBINARY', 65535:'HORUSTEST'}
# URL for payload list
# TODO: Move this into horusdemodlib repo
PAYLOAD_ID_LIST_URL = "https://raw.githubusercontent.com/projecthorus/horusdemodlib/master/payload_id_list.txt"
# Custom field data.
@ -40,7 +39,7 @@ HORUS_CUSTOM_FIELDS = {
HORUS_CUSTOM_FIELD_URL = "https://raw.githubusercontent.com/projecthorus/horusdemodlib/master/custom_field_list.json"
def read_payload_list(filename="payload_id_list.txt"):
""" Read in the payload ID list, and return the parsed data as a dictionary """
""" Read a payload ID list from a file, and return the parsed data as a dictionary """
# Dummy payload list.
payload_list = HORUS_PAYLOAD_LIST
@ -74,35 +73,75 @@ def read_payload_list(filename="payload_id_list.txt"):
return payload_list
def grab_latest_payload_id_list(url=PAYLOAD_ID_LIST_URL, local_file="payload_id_list.txt"):
""" Attempt to download the latest payload ID list from Github """
def download_latest_payload_id_list(url=PAYLOAD_ID_LIST_URL, filename=None):
"""
Attempt to download the latest payload ID list from Github, and parse into a dictionary.
Optionally, save it to a file.
"""
# Download the list.
try:
logging.info("Attempting to download latest payload ID list from GitHub...")
_r = requests.get(url, timeout=10)
except Exception as e:
logging.error("Unable to get latest payload ID list: %s" % str(e))
return False
return None
# Check it is what we think it is..
if "HORUS BINARY PAYLOAD ID LIST" not in _r.text:
logging.error("Downloaded payload ID list is invalid.")
return False
return None
_text = _r.text
_payload_list = {}
# So now we most likely have a valid payload ID list, so write it out.
with open(local_file, 'w') as f:
f.write(_r.text)
try:
for line in _r.text.split('\n'):
if line == "":
continue
# Skip comment lines.
if line[0] == '#':
continue
else:
# Attempt to split the line with a comma.
_params = line.split(',')
if len(_params) != 2:
# Invalid line.
logging.error("Could not parse line: %s" % line)
else:
try:
_id = int(_params[0])
_callsign = _params[1].strip()
_payload_list[_id] = _callsign
except:
logging.error("Error parsing line: %s" % line)
except Exception as e:
logging.error("Error reading Payload ID list - %s" % str(e))
return None
if filename != None:
try:
with open(filename, 'w') as f:
f.write(_r.text)
except Exception as e:
logging.error(f"Error writing payload list to file {filename} - {str(e)}")
logging.debug("Known Payload IDs:")
for _payload in _payload_list:
logging.debug("\t%s - %s" % (_payload, _payload_list[_payload]))
return _payload_list
logging.info("Updated payload ID list successfully!")
return True
def init_payload_id_list(filename="payload_id_list.txt"):
""" Initialise and update the local payload ID list. """
grab_latest_payload_id_list(local_file=filename)
HORUS_PAYLOAD_LIST = read_payload_list(filename=filename)
_list = download_latest_payload_id_list(filename=filename)
if _list:
HORUS_PAYLOAD_LIST = _list
else:
logging.warning("Could not download Payload ID List - attempting to use local version.")
HORUS_PAYLOAD_LIST = read_payload_list(filename=filename)
@ -171,10 +210,68 @@ def grab_latest_custom_field_list(url=HORUS_CUSTOM_FIELD_URL, local_file="custom
return True
def download_latest_custom_field_list(url=HORUS_CUSTOM_FIELD_URL, filename=None):
""" Attempt to download the latest custom field list from Github """
# Download the list.
try:
logging.info("Attempting to download latest custom field list from GitHub...")
_r = requests.get(url, timeout=10)
except Exception as e:
logging.error("Unable to get latest custom field list: %s" % str(e))
return None
# Check it is what we think it is..
# (Currently checking for the presence of one of the test payloads)
if "HORUSTEST" not in _r.text:
logging.error("Downloaded custom field list is invalid.")
return None
_text = _r.text
_custom_field_list = {}
try:
# Attempt to parse JSON
_field_data = json.loads(_r.text)
if type(_field_data) != dict:
logging.error("Error reading custom field list - Incorrect input format.")
return None
# Iterate through fields in the file we just read in
for _payload in _field_data:
_data = _field_data[_payload]
if ("struct" in _data) and ("fields" in _data):
_custom_field_list[_payload] = {
"struct": _data["struct"],
"fields": _data["fields"]
}
logging.debug(f"Loaded custom field data for {_payload}.")
except Exception as e:
logging.error(f"Could not parse downloaded custom field list - {str(e)}")
return None
if filename != None:
try:
with open(filename, 'w') as f:
f.write(_r.text)
except Exception as e:
logging.error(f"Error writing custom field list to file {filename} - {str(e)}")
return _custom_field_list
def init_custom_field_list(filename="custom_field_list.json"):
""" Initialise and update the local custom field list """
grab_latest_custom_field_list(local_file=filename)
HORUS_CUSTOM_FIELDS = read_custom_field_list(filename=filename)
_list = download_latest_custom_field_list(filename=filename)
if _list:
HORUS_CUSTOM_FIELDS = _list
else:
logging.warning("Could not download Custom Field List - attempting to use local version.")
HORUS_CUSTOM_FIELDS = read_custom_field_list(filename=filename)
if __name__ == "__main__":
@ -184,6 +281,7 @@ if __name__ == "__main__":
format="%(asctime)s %(levelname)s: %(message)s", level=logging.DEBUG
)
init_payload_id_list()
print(HORUS_PAYLOAD_LIST)

Wyświetl plik

@ -1,6 +1,6 @@
[tool.poetry]
name = "horusdemodlib"
version = "0.1.10"
version = "0.1.11"
description = "Project Horus HAB Telemetry Demodulators"
authors = ["Mark Jessop"]
license = "LGPL-2.1-or-later"