From 81d0a6f6c6c12d531fded0dddd0d2c05ec630e2d Mon Sep 17 00:00:00 2001 From: Michael Haberler Date: Fri, 2 Apr 2021 14:36:56 +0200 Subject: [PATCH 1/4] fix "going deaf on reconnect" issue --- sondehub/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sondehub/__init__.py b/sondehub/__init__.py index 7e62889..199bcf8 100644 --- a/sondehub/__init__.py +++ b/sondehub/__init__.py @@ -55,8 +55,6 @@ class Stream: pass self.mqttc.connect(urlparts.netloc, 443, 60) self.mqttc.loop_start() - for sonde in self._sondes: - self.add_sonde(sonde) def get_url(self): conn = http.client.HTTPSConnection("api.v2.sondehub.org") @@ -70,6 +68,8 @@ class Stream: self.on_message(json.loads(msg.payload)) def _on_connect(self, mqttc, obj, flags, rc): + for sonde in self._sondes: + self.add_sonde(sonde) if mqtt.MQTT_ERR_SUCCESS != rc: self.ws_connect() if self.on_connect: From ff447ac37a5981d9b35db48d2fe8a33d51bd3807 Mon Sep 17 00:00:00 2001 From: Michael Haberler Date: Fri, 2 Apr 2021 15:15:57 +0200 Subject: [PATCH 2/4] Stream(): fix "going deaf pn reconnect" issue by subscribing on connect add on_log, on_disconnect callbacks. asJson flag: pass raw message instead of dict if False --- sondehub/__init__.py | 37 +++++++++++++++++++++++++++++++------ 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/sondehub/__init__.py b/sondehub/__init__.py index 199bcf8..f528a2f 100644 --- a/sondehub/__init__.py +++ b/sondehub/__init__.py @@ -15,12 +15,21 @@ S3_BUCKET = "sondehub-open-data" class Stream: - def __init__(self, sondes: list = ["#"], on_connect=None, on_message=None): + def __init__(self, + sondes: list = ["#"], + on_connect=None, + on_message=None, + on_log=None, + on_disconnect=None, asJson=True): self.mqttc = mqtt.Client(transport="websockets") self._sondes = sondes - self.ws_connect() + self.asJson = asJson self.on_connect = on_connect self.on_message = on_message + self.on_disconnect = on_disconnect + self.on_log = on_log + self.ws_connect() + def add_sonde(self, sonde): if sonde not in self._sondes: @@ -53,7 +62,10 @@ class Stream: self.mqttc.tls_set() except ValueError: pass - self.mqttc.connect(urlparts.netloc, 443, 60) + try: + self.mqttc.connect(urlparts.netloc, 443, 60) + except OSError: + pass self.mqttc.loop_start() def get_url(self): @@ -65,7 +77,10 @@ class Stream: def _on_message(self, mqttc, obj, msg): if self.on_message: - self.on_message(json.loads(msg.payload)) + if self.asJson: + self.on_message(json.loads(msg.payload)) + else: + self.on_message(msg.payload) def _on_connect(self, mqttc, obj, flags, rc): for sonde in self._sondes: @@ -73,10 +88,20 @@ class Stream: if mqtt.MQTT_ERR_SUCCESS != rc: self.ws_connect() if self.on_connect: - self.on_connect() + self.on_connect(mqttc, obj, flags, rc) + + def _on_log(self, *args, **kwargs): + if self.on_log: + self.on_log(*args, **kwargs) def _on_disconnect(self, client, userdata, rc): - self.ws_connect() + try: + self.ws_connect() + except OSError: + pass + + if self.on_disconnect: + self.on_disconnect(client, userdata, rc) def __exit__(self, type, value, traceback): self.mqttc.disconnect() From 4a2f60d2da2e14c0584c545685bc1afbf12073f2 Mon Sep 17 00:00:00 2001 From: Michael Haberler Date: Fri, 2 Apr 2021 15:17:09 +0200 Subject: [PATCH 3/4] example subscriber to live updates play with blocking/unblocking connections to us-east-1.amazonaws.com and see msg count drop to zero/picking up again after unblocking --- examples/live.py | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 examples/live.py diff --git a/examples/live.py b/examples/live.py new file mode 100644 index 0000000..8b43b7a --- /dev/null +++ b/examples/live.py @@ -0,0 +1,32 @@ +import json +import time +import sondehub + +interval = 5 +msgs = 0 +elapsed = 0 +starttime = 0 +log = True + +def on_msg(*args): + global msgs + msgs +=1 + +def on_connect(*args): + print("on_connect:", *args) + +def on_disconnect(*args): + print("on_disconnect:", *args) + +def on_log(*args): + if log: + print(f"on_log:", *args) + +sh = sondehub.Stream(on_message=on_msg, + on_disconnect=on_disconnect, + on_log=on_log, + on_connect=on_connect); +while True: + time.sleep(interval) + print(f"tick {msgs=}") + msgs = 0 From 1b6053df6a71b7e344eee5a6280c3baa5aadcc9d Mon Sep 17 00:00:00 2001 From: Michaela Date: Wed, 7 Apr 2021 18:03:08 +1000 Subject: [PATCH 4/4] Invert the logic of the asJson flag --- sondehub/__init__.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sondehub/__init__.py b/sondehub/__init__.py index f528a2f..22deb63 100644 --- a/sondehub/__init__.py +++ b/sondehub/__init__.py @@ -20,7 +20,7 @@ class Stream: on_connect=None, on_message=None, on_log=None, - on_disconnect=None, asJson=True): + on_disconnect=None, asJson=False): self.mqttc = mqtt.Client(transport="websockets") self._sondes = sondes self.asJson = asJson @@ -78,9 +78,9 @@ class Stream: def _on_message(self, mqttc, obj, msg): if self.on_message: if self.asJson: - self.on_message(json.loads(msg.payload)) - else: self.on_message(msg.payload) + else: + self.on_message(json.loads(msg.payload)) def _on_connect(self, mqttc, obj, flags, rc): for sonde in self._sondes: