diff --git a/examples/live.py b/examples/live.py new file mode 100644 index 0000000..8b43b7a --- /dev/null +++ b/examples/live.py @@ -0,0 +1,32 @@ +import json +import time +import sondehub + +interval = 5 +msgs = 0 +elapsed = 0 +starttime = 0 +log = True + +def on_msg(*args): + global msgs + msgs +=1 + +def on_connect(*args): + print("on_connect:", *args) + +def on_disconnect(*args): + print("on_disconnect:", *args) + +def on_log(*args): + if log: + print(f"on_log:", *args) + +sh = sondehub.Stream(on_message=on_msg, + on_disconnect=on_disconnect, + on_log=on_log, + on_connect=on_connect); +while True: + time.sleep(interval) + print(f"tick {msgs=}") + msgs = 0 diff --git a/sondehub/__init__.py b/sondehub/__init__.py index 7e62889..22deb63 100644 --- a/sondehub/__init__.py +++ b/sondehub/__init__.py @@ -15,12 +15,21 @@ S3_BUCKET = "sondehub-open-data" class Stream: - def __init__(self, sondes: list = ["#"], on_connect=None, on_message=None): + def __init__(self, + sondes: list = ["#"], + on_connect=None, + on_message=None, + on_log=None, + on_disconnect=None, asJson=False): self.mqttc = mqtt.Client(transport="websockets") self._sondes = sondes - self.ws_connect() + self.asJson = asJson self.on_connect = on_connect self.on_message = on_message + self.on_disconnect = on_disconnect + self.on_log = on_log + self.ws_connect() + def add_sonde(self, sonde): if sonde not in self._sondes: @@ -53,10 +62,11 @@ class Stream: self.mqttc.tls_set() except ValueError: pass - self.mqttc.connect(urlparts.netloc, 443, 60) + try: + self.mqttc.connect(urlparts.netloc, 443, 60) + except OSError: + pass self.mqttc.loop_start() - for sonde in self._sondes: - self.add_sonde(sonde) def get_url(self): conn = http.client.HTTPSConnection("api.v2.sondehub.org") @@ -67,16 +77,31 @@ class Stream: def _on_message(self, mqttc, obj, msg): if self.on_message: - self.on_message(json.loads(msg.payload)) + if self.asJson: + self.on_message(msg.payload) + else: + self.on_message(json.loads(msg.payload)) def _on_connect(self, mqttc, obj, flags, rc): + for sonde in self._sondes: + self.add_sonde(sonde) if mqtt.MQTT_ERR_SUCCESS != rc: self.ws_connect() if self.on_connect: - self.on_connect() + self.on_connect(mqttc, obj, flags, rc) + + def _on_log(self, *args, **kwargs): + if self.on_log: + self.on_log(*args, **kwargs) def _on_disconnect(self, client, userdata, rc): - self.ws_connect() + try: + self.ws_connect() + except OSError: + pass + + if self.on_disconnect: + self.on_disconnect(client, userdata, rc) def __exit__(self, type, value, traceback): self.mqttc.disconnect()