horusdemodlib/horusdemodlib/habitat.py

365 wiersze
12 KiB
Python

#!/usr/bin/env python
#
# Horus Demod Library - Habitat Uploader
#
# Mark Jessop <vk5qi@rfhead.net>
#
import datetime
import json
import logging
import random
import requests
import time
from base64 import b64encode
from hashlib import sha256
from queue import Queue
from threading import Thread
class HabitatUploader(object):
"""
Queued Habitat Telemetry Uploader class
Packets to be uploaded to Habitat are added to a queue for uploading.
If an upload attempt times out, the packet is discarded.
If the queue fills up (probably indicating no network connection, and a fast packet downlink rate),
it is immediately emptied, to avoid upload of out-of-date packets.
"""
HABITAT_URL = "http://habitat.habhub.org/"
HABITAT_DB = "habitat"
HABITAT_UUIDS = HABITAT_URL + "_uuids?count=%d"
HABITAT_DB_URL = HABITAT_URL + HABITAT_DB + "/"
def __init__(
self,
user_callsign="FSK_DEMOD",
listener_lat=0.0,
listener_lon=0.0,
listener_radio="",
listener_antenna="",
listener_upload_rate=3, # Hours
queue_size=64,
upload_timeout=10,
upload_retries=5,
upload_retry_interval=0.25,
inhibit=False,
):
""" Create a Habitat Uploader object. """
self.upload_timeout = upload_timeout
self.upload_retries = upload_retries
self.upload_retry_interval = upload_retry_interval
self.queue_size = queue_size
self.habitat_upload_queue = Queue(queue_size)
self.inhibit = inhibit
# Listener information
self.user_callsign = user_callsign
self.listener_lat = listener_lat
self.listener_lon = listener_lon
self.listener_radio = listener_radio
self.listener_antenna = listener_antenna
self.listener_upload_rate = listener_upload_rate
self.position_uploaded = False
self.last_listener_upload_time = 0
# Try and convert the supplied listener lat/lon to a float
# if this fails, just set the lat/lon to 0/0
try:
_lat = float(self.listener_lat)
_lon = float(self.listener_lon)
self.listener_lat = _lat
self.listener_lon = _lon
except:
logging.error("Could not parse listener lat/lon, setting both to 0.0")
self.listener_lat = 0.0
self.listener_lon = 0.0
self.last_freq_hz = None
self.callsign_init = False
self.uuids = []
if self.inhibit:
logging.info("Habitat Uploader inhibited.")
# Start the uploader thread.
self.habitat_uploader_running = True
self.uploadthread = Thread(target=self.habitat_upload_thread)
self.uploadthread.start()
def habitat_upload(self, sentence):
""" Upload a UKHAS-standard telemetry sentence to Habitat """
# Generate payload to be uploaded
# b64encode accepts and returns bytes objects.
_sentence_b64 = b64encode(sentence.encode("ascii"))
_date = datetime.datetime.utcnow().isoformat("T") + "Z"
_user_call = self.user_callsign
_data = {
"type": "payload_telemetry",
"data": {
"_raw": _sentence_b64.decode(
"ascii"
) # Convert back to a string to be serialisable
},
"receivers": {
_user_call: {"time_created": _date, "time_uploaded": _date},
},
}
if self.last_freq_hz:
# Add in frequency information if we have it.
_data["receivers"][_user_call]["rig_info"] = {"frequency": self.last_freq_hz}
# The URl to upload to.
_url = f"{self.HABITAT_URL}{self.HABITAT_DB}/_design/payload_telemetry/_update/add_listener/{sha256(_sentence_b64).hexdigest()}"
# Delay for a random amount of time between 0 and upload_retry_interval*2 seconds.
time.sleep(random.random() * self.upload_retry_interval * 2.0)
_retries = 0
# When uploading, we have three possible outcomes:
# - Can't connect. No point re-trying in this situation.
# - The packet is uploaded successfult (201 / 403)
# - There is a upload conflict on the Habitat DB end (409). We can retry and it might work.
while _retries < self.upload_retries:
# Run the request.
try:
_req = requests.put(
_url, data=json.dumps(_data), timeout=(self.upload_timeout, 6.1)
)
except Exception as e:
logging.error("Habitat - Upload Failed: %s" % str(e))
break
if _req.status_code == 201 or _req.status_code == 403:
# 201 = Success, 403 = Success, sentence has already seen by others.
logging.info(f"Habitat - Uploaded sentence: {sentence.strip()}")
_upload_success = True
break
elif _req.status_code == 409:
# 409 = Upload conflict (server busy). Sleep for a moment, then retry.
logging.debug("Habitat - Upload conflict.. retrying.")
time.sleep(random.random() * self.upload_retry_interval)
_retries += 1
else:
logging.error(
"Habitat - Error uploading to Habitat. Status Code: %d."
% _req.status_code
)
break
if _retries == self.upload_retries:
logging.error(
"Habitat - Upload conflict not resolved with %d retries."
% self.upload_retries
)
return
def habitat_upload_thread(self):
""" Handle uploading of packets to Habitat """
logging.info("Started Habitat Uploader Thread.")
while self.habitat_uploader_running:
if self.habitat_upload_queue.qsize() > 0:
# If the queue is completely full, jump to the most recent telemetry sentence.
if self.habitat_upload_queue.qsize() == self.queue_size:
while not self.habitat_upload_queue.empty():
sentence = self.habitat_upload_queue.get()
logging.warning(
"Habitat uploader queue was full - possible connectivity issue."
)
else:
# Otherwise, get the first item in the queue.
sentence = self.habitat_upload_queue.get()
# Attempt to upload it.
self.habitat_upload(sentence)
else:
# Wait for a short time before checking the queue again.
time.sleep(0.5)
# Listener position upload
if (
time.time() - self.last_listener_upload_time
) > self.listener_upload_rate * 3600:
# Time to upload a listener postion
if (self.listener_lat != 0.0) or (self.listener_lon != 0.0):
_success = self.uploadListenerPosition(
self.user_callsign,
self.listener_lat,
self.listener_lon,
self.listener_radio,
self.listener_antenna,
)
if _success:
logging.info(f"Habitat - Listener information uploaded. Re-uploading in {self.listener_upload_rate} hours.")
# Update the last upload time.
self.last_listener_upload_time = time.time()
logging.info("Stopped Habitat Uploader Thread.")
def add(self, sentence):
""" Add a sentence to the upload queue """
if self.inhibit:
# We have upload inhibited. Return.
return
# Handling of arbitrary numbers of $$'s at the start of a sentence:
# Extract the data part of the sentence (i.e. everything after the $$'s')
sentence = sentence.split("$")[-1]
# Now add the *correct* number of $$s back on.
sentence = "$$" + sentence
if not (sentence[-1] == "\n"):
sentence += "\n"
try:
self.habitat_upload_queue.put_nowait(sentence)
except Exception as e:
logging.error("Habitat - Error adding sentence to queue, queue full.")
def close(self):
""" Shutdown uploader thread. """
self.habitat_uploader_running = False
def ISOStringNow(self):
return "%sZ" % datetime.datetime.utcnow().isoformat()
def postListenerData(self, doc, timeout=10):
# do we have at least one uuid, if not go get more
if len(self.uuids) < 1:
self.fetchUuids()
# Attempt to add UUID and time data to document.
try:
doc["_id"] = self.uuids.pop()
except IndexError:
logging.error(
"Habitat - Unable to post listener data - no UUIDs available."
)
return False
doc["time_uploaded"] = self.ISOStringNow()
try:
_r = requests.post(
f"{self.HABITAT_URL}{self.HABITAT_DB}/", json=doc, timeout=timeout
)
return True
except Exception as e:
logging.error("Habitat - Could not post listener data - %s" % str(e))
return False
def fetchUuids(self, timeout=10):
_retries = 5
while _retries > 0:
try:
_r = requests.get(self.HABITAT_UUIDS % 10, timeout=timeout)
self.uuids.extend(_r.json()["uuids"])
logging.debug("Habitat - Got UUIDs")
return
except Exception as e:
logging.error(
"Habitat - Unable to fetch UUIDs, retrying in 2 seconds - %s"
% str(e)
)
time.sleep(2)
_retries = _retries - 1
continue
logging.error("Habitat - Gave up trying to get UUIDs.")
return
def initListenerCallsign(self, callsign, radio="", antenna=""):
doc = {
"type": "listener_information",
"time_created": self.ISOStringNow(),
"data": {"callsign": callsign, "antenna": antenna, "radio": radio,},
}
resp = self.postListenerData(doc)
if resp is True:
logging.debug("Habitat - Listener Callsign Initialized.")
return True
else:
logging.error("Habitat - Unable to initialize callsign.")
return False
def uploadListenerPosition(self, callsign, lat, lon, radio="", antenna=""):
""" Initializer Listener Callsign, and upload Listener Position """
# Attempt to initialize the listeners callsign
resp = self.initListenerCallsign(callsign, radio=radio, antenna=antenna)
# If this fails, it means we can't contact the Habitat server,
# so there is no point continuing.
if resp is False:
return False
doc = {
"type": "listener_telemetry",
"time_created": self.ISOStringNow(),
"data": {
"callsign": callsign,
"chase": False,
"latitude": lat,
"longitude": lon,
"altitude": 0,
"speed": 0,
},
}
# post position to habitat
resp = self.postListenerData(doc)
if resp is True:
return True
else:
logging.error("Habitat - Unable to upload listener information.")
return False
def trigger_position_upload(self):
""" Trigger a re-upload of the listener position """
self.position_uploaded = False
if __name__ == "__main__":
# Setup Logging
logging.basicConfig(
format="%(asctime)s %(levelname)s: %(message)s", level=logging.INFO
)
habitat = HabitatUploader(
user_callsign="HORUSGUI_TEST",
listener_lat=-34.0,
listener_lon=138.0,
listener_radio="Testing Habitat Uploader",
listener_antenna="Wet Noodle",
)
habitat.add("$$DUMMY,0,0.0,0.0*F000")
time.sleep(10)
habitat.trigger_position_upload()
time.sleep(5)
habitat.close()