kopia lustrzana https://github.com/projecthorus/pysondehub
Merge branch 'mhaberler-fix-reconnect' into main
commit
a56e1f5e89
|
@ -0,0 +1,32 @@
|
|||
import json
|
||||
import time
|
||||
import sondehub
|
||||
|
||||
interval = 5
|
||||
msgs = 0
|
||||
elapsed = 0
|
||||
starttime = 0
|
||||
log = True
|
||||
|
||||
def on_msg(*args):
|
||||
global msgs
|
||||
msgs +=1
|
||||
|
||||
def on_connect(*args):
|
||||
print("on_connect:", *args)
|
||||
|
||||
def on_disconnect(*args):
|
||||
print("on_disconnect:", *args)
|
||||
|
||||
def on_log(*args):
|
||||
if log:
|
||||
print(f"on_log:", *args)
|
||||
|
||||
sh = sondehub.Stream(on_message=on_msg,
|
||||
on_disconnect=on_disconnect,
|
||||
on_log=on_log,
|
||||
on_connect=on_connect);
|
||||
while True:
|
||||
time.sleep(interval)
|
||||
print(f"tick {msgs=}")
|
||||
msgs = 0
|
|
@ -15,12 +15,21 @@ S3_BUCKET = "sondehub-open-data"
|
|||
|
||||
|
||||
class Stream:
|
||||
def __init__(self, sondes: list = ["#"], on_connect=None, on_message=None):
|
||||
def __init__(self,
|
||||
sondes: list = ["#"],
|
||||
on_connect=None,
|
||||
on_message=None,
|
||||
on_log=None,
|
||||
on_disconnect=None, asJson=False):
|
||||
self.mqttc = mqtt.Client(transport="websockets")
|
||||
self._sondes = sondes
|
||||
self.ws_connect()
|
||||
self.asJson = asJson
|
||||
self.on_connect = on_connect
|
||||
self.on_message = on_message
|
||||
self.on_disconnect = on_disconnect
|
||||
self.on_log = on_log
|
||||
self.ws_connect()
|
||||
|
||||
|
||||
def add_sonde(self, sonde):
|
||||
if sonde not in self._sondes:
|
||||
|
@ -53,10 +62,11 @@ class Stream:
|
|||
self.mqttc.tls_set()
|
||||
except ValueError:
|
||||
pass
|
||||
self.mqttc.connect(urlparts.netloc, 443, 60)
|
||||
try:
|
||||
self.mqttc.connect(urlparts.netloc, 443, 60)
|
||||
except OSError:
|
||||
pass
|
||||
self.mqttc.loop_start()
|
||||
for sonde in self._sondes:
|
||||
self.add_sonde(sonde)
|
||||
|
||||
def get_url(self):
|
||||
conn = http.client.HTTPSConnection("api.v2.sondehub.org")
|
||||
|
@ -67,16 +77,31 @@ class Stream:
|
|||
|
||||
def _on_message(self, mqttc, obj, msg):
|
||||
if self.on_message:
|
||||
self.on_message(json.loads(msg.payload))
|
||||
if self.asJson:
|
||||
self.on_message(msg.payload)
|
||||
else:
|
||||
self.on_message(json.loads(msg.payload))
|
||||
|
||||
def _on_connect(self, mqttc, obj, flags, rc):
|
||||
for sonde in self._sondes:
|
||||
self.add_sonde(sonde)
|
||||
if mqtt.MQTT_ERR_SUCCESS != rc:
|
||||
self.ws_connect()
|
||||
if self.on_connect:
|
||||
self.on_connect()
|
||||
self.on_connect(mqttc, obj, flags, rc)
|
||||
|
||||
def _on_log(self, *args, **kwargs):
|
||||
if self.on_log:
|
||||
self.on_log(*args, **kwargs)
|
||||
|
||||
def _on_disconnect(self, client, userdata, rc):
|
||||
self.ws_connect()
|
||||
try:
|
||||
self.ws_connect()
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
if self.on_disconnect:
|
||||
self.on_disconnect(client, userdata, rc)
|
||||
|
||||
def __exit__(self, type, value, traceback):
|
||||
self.mqttc.disconnect()
|
||||
|
|
Ładowanie…
Reference in New Issue