faster downloads :)

pull/2/head
Michaela 2021-02-10 16:18:10 +10:00
rodzic 6302131f7c
commit 952dbc2576
5 zmienionych plików z 211 dodań i 11 usunięć

Wyświetl plik

@ -5,7 +5,7 @@ sondehub.Stream(sondes=["serial number"], on_message=callback)
``` ```
If no `sondes` list is provided then all radiosondes will be streamed. If no `sondes` list is provided then all radiosondes will be streamed.
On message callback will contain a python dictonary using the [Universal Sonde Telemetry Format](https://github.com/projecthorus/radiosonde_auto_rx/wiki/Suggested-Universal-Sonde-Telemetry-Format) On message callback will contain a python dictonary using the [Universal Sonde Telemetry Format](https://github.com/projecthorus/radiosonde_auto_rx/wiki/SondeHub-DB-Universal-Telemetry-Format)
``` ```
@ -66,4 +66,16 @@ sondehub | jq .
} }
.... ....
```
Open Data Access
==
A basic interface to the Open Data is a available using `sondehub.download(serial=None, datetime_prefix=None)`
```
import sondehub
frames = sondehub.download(datetime_prefix="2018-10-01")
frames = sondehub.download(serial="serial")
``` ```

Wyświetl plik

@ -1,11 +1,16 @@
import sondehub import sondehub
from multiprocessing import Queue
# def on_message(message):
# print(f"{message['serial']} - {message['alt']}")
# test = sondehub.Stream(on_message=on_message)
# while 1:
# pass
def on_message(message): print(sondehub.download(serial="S1120364"))
print(message) #print(sondehub.download(datetime_prefix="2021-02-04T02:19"))
# tasks_to_accomplish = Queue()
# blah = Queue()
# tasks_to_accomplish.put(("sondehub-open-data","serial/S1120364/2020-10-18T01:10:12.152254Z-5fb87ccb-7a0e-4975-b39f-593e19bc2bcd.json"))
# print(sondehub.parallel_download(tasks_to_accomplish,blah))
#test = sondehub.Stream(sondes=["R3320848"], on_message=on_message)
test = sondehub.Stream(on_message=on_message)
while 1:
pass

123
poetry.lock wygenerowano
Wyświetl plik

@ -1,3 +1,46 @@
[[package]]
name = "boto3"
version = "1.14.44"
description = "The AWS SDK for Python"
category = "main"
optional = false
python-versions = "*"
[package.dependencies]
botocore = ">=1.17.44,<1.18.0"
jmespath = ">=0.7.1,<1.0.0"
s3transfer = ">=0.3.0,<0.4.0"
[[package]]
name = "botocore"
version = "1.17.44"
description = "Low-level, data-driven core of boto 3."
category = "main"
optional = false
python-versions = "*"
[package.dependencies]
docutils = ">=0.10,<0.16"
jmespath = ">=0.7.1,<1.0.0"
python-dateutil = ">=2.1,<3.0.0"
urllib3 = {version = ">=1.20,<1.26", markers = "python_version != \"3.4\""}
[[package]]
name = "docutils"
version = "0.15.2"
description = "Docutils -- Python Documentation Utilities"
category = "main"
optional = false
python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*"
[[package]]
name = "jmespath"
version = "0.10.0"
description = "JSON Matching Expressions"
category = "main"
optional = false
python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*"
[[package]] [[package]]
name = "paho-mqtt" name = "paho-mqtt"
version = "1.5.1" version = "1.5.1"
@ -9,12 +52,88 @@ python-versions = "*"
[package.extras] [package.extras]
proxy = ["pysocks"] proxy = ["pysocks"]
[[package]]
name = "python-dateutil"
version = "2.8.1"
description = "Extensions to the standard Python datetime module"
category = "main"
optional = false
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7"
[package.dependencies]
six = ">=1.5"
[[package]]
name = "s3transfer"
version = "0.3.4"
description = "An Amazon S3 Transfer Manager"
category = "main"
optional = false
python-versions = "*"
[package.dependencies]
botocore = ">=1.12.36,<2.0a.0"
[[package]]
name = "six"
version = "1.15.0"
description = "Python 2 and 3 compatibility utilities"
category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*"
[[package]]
name = "urllib3"
version = "1.25.11"
description = "HTTP library with thread-safe connection pooling, file post, and more."
category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, <4"
[package.extras]
brotli = ["brotlipy (>=0.6.0)"]
secure = ["pyOpenSSL (>=0.14)", "cryptography (>=1.3.4)", "idna (>=2.0.0)", "certifi", "ipaddress"]
socks = ["PySocks (>=1.5.6,!=1.5.7,<2.0)"]
[metadata] [metadata]
lock-version = "1.1" lock-version = "1.1"
python-versions = "^3.9" python-versions = "^3.6"
content-hash = "76d5dfeb617b3f477c15007458ec4f5fad94e4d9ab19f2c8080bb7581685bc1f" content-hash = "5eea1d9524a6003f57f557fe7d9d72b25e416ed1bcdbf691db48a0dd01979cbe"
[metadata.files] [metadata.files]
boto3 = [
{file = "boto3-1.14.44-py2.py3-none-any.whl", hash = "sha256:b1936392d1491b4de04a972636c91904ec814519d8847d11e3375d77d5df1b12"},
{file = "boto3-1.14.44.tar.gz", hash = "sha256:01d45a986abd81d4b55f5649060fcbc49f6f87deaa508b99367a9d9ea21c0347"},
]
botocore = [
{file = "botocore-1.17.44-py2.py3-none-any.whl", hash = "sha256:1b46ffe1d13922066c873323186cbf97e77c137e08e27039d9d684552ccc4892"},
{file = "botocore-1.17.44.tar.gz", hash = "sha256:1f6175bf59ffa068055b65f7d703eb1f748c338594a40dfdc645a6130280d8bb"},
]
docutils = [
{file = "docutils-0.15.2-py2-none-any.whl", hash = "sha256:9e4d7ecfc600058e07ba661411a2b7de2fd0fafa17d1a7f7361cd47b1175c827"},
{file = "docutils-0.15.2-py3-none-any.whl", hash = "sha256:6c4f696463b79f1fb8ba0c594b63840ebd41f059e92b31957c46b74a4599b6d0"},
{file = "docutils-0.15.2.tar.gz", hash = "sha256:a2aeea129088da402665e92e0b25b04b073c04b2dce4ab65caaa38b7ce2e1a99"},
]
jmespath = [
{file = "jmespath-0.10.0-py2.py3-none-any.whl", hash = "sha256:cdf6525904cc597730141d61b36f2e4b8ecc257c420fa2f4549bac2c2d0cb72f"},
{file = "jmespath-0.10.0.tar.gz", hash = "sha256:b85d0567b8666149a93172712e68920734333c0ce7e89b78b3e987f71e5ed4f9"},
]
paho-mqtt = [ paho-mqtt = [
{file = "paho-mqtt-1.5.1.tar.gz", hash = "sha256:9feb068e822be7b3a116324e01fb6028eb1d66412bf98595ae72698965cb1cae"}, {file = "paho-mqtt-1.5.1.tar.gz", hash = "sha256:9feb068e822be7b3a116324e01fb6028eb1d66412bf98595ae72698965cb1cae"},
] ]
python-dateutil = [
{file = "python-dateutil-2.8.1.tar.gz", hash = "sha256:73ebfe9dbf22e832286dafa60473e4cd239f8592f699aa5adaf10050e6e1823c"},
{file = "python_dateutil-2.8.1-py2.py3-none-any.whl", hash = "sha256:75bb3f31ea686f1197762692a9ee6a7550b59fc6ca3a1f4b5d7e32fb98e2da2a"},
]
s3transfer = [
{file = "s3transfer-0.3.4-py2.py3-none-any.whl", hash = "sha256:1e28620e5b444652ed752cf87c7e0cb15b0e578972568c6609f0f18212f259ed"},
{file = "s3transfer-0.3.4.tar.gz", hash = "sha256:7fdddb4f22275cf1d32129e21f056337fd2a80b6ccef1664528145b72c49e6d2"},
]
six = [
{file = "six-1.15.0-py2.py3-none-any.whl", hash = "sha256:8b74bedcbbbaca38ff6d7491d76f2b06b3592611af620f8426e82dddb04a5ced"},
{file = "six-1.15.0.tar.gz", hash = "sha256:30639c035cdb23534cd4aa2dd52c3bf48f06e5f4a941509c8bafd8ce11080259"},
]
urllib3 = [
{file = "urllib3-1.25.11-py2.py3-none-any.whl", hash = "sha256:f5321fbe4bf3fefa0efd0bfe7fb14e90909eb62a48ccda331726b4319897dd5e"},
{file = "urllib3-1.25.11.tar.gz", hash = "sha256:8d7eaa5a82a1cac232164990f04874c594c9453ec55eef02eab885aa02fc17a2"},
]

Wyświetl plik

@ -1,6 +1,6 @@
[tool.poetry] [tool.poetry]
name = "sondehub" name = "sondehub"
version = "0.1.5" version = "0.1.7"
description = "SDK to access SondeHub open data" description = "SDK to access SondeHub open data"
authors = ["Michaela <git@michaela.lgbt>"] authors = ["Michaela <git@michaela.lgbt>"]
readme = "README.md" readme = "README.md"
@ -12,6 +12,7 @@ license = "GPL-3.0-or-later"
[tool.poetry.dependencies] [tool.poetry.dependencies]
python = "^3.6" python = "^3.6"
paho-mqtt = "^1.5.1" paho-mqtt = "^1.5.1"
boto3 = "^1.14.44"
[tool.poetry.dev-dependencies] [tool.poetry.dev-dependencies]

Wyświetl plik

@ -2,6 +2,14 @@ import paho.mqtt.client as mqtt
from urllib.parse import urlparse from urllib.parse import urlparse
import http.client import http.client
import json import json
import boto3
import sys
import threading
from queue import Queue
import queue
S3_BUCKET = "sondehub-open-data"
class Stream: class Stream:
@ -73,3 +81,58 @@ class Stream:
def disconnect(self): def disconnect(self):
self.mqttc.disconnect() self.mqttc.disconnect()
class Downloader(threading.Thread):
def __init__(
self, tasks_to_accomplish, tasks_that_are_done, debug=False, *args, **kwargs
):
self.tasks_to_accomplish = tasks_to_accomplish
self.tasks_that_are_done = tasks_that_are_done
self.debug = debug
super().__init__(*args, **kwargs)
def run(self):
s3 = boto3.client("s3")
while True:
try:
task = self.tasks_to_accomplish.get_nowait()
except queue.Empty:
return
data = s3.get_object(Bucket=task[0], Key=task[1])
response = json.loads(data["Body"].read())
if self.debug:
print(response)
self.tasks_that_are_done.put(response)
self.tasks_to_accomplish.task_done()
def download(serial=None, datetime_prefix=None, debug=False):
if serial:
prefix_filter = f"serial/{serial}/"
elif serial and datetime_prefix:
prefix_filter = f"serial/{serial}/{datetime_prefix}"
elif datetime_prefix:
prefix_filter = f"date/{datetime_prefix}"
else:
prefix_filter = "date/"
s3 = boto3.resource("s3")
bucket = s3.Bucket(S3_BUCKET)
data = []
number_of_processes = 50
tasks_to_accomplish = Queue()
tasks_that_are_done = Queue()
for s3_object in bucket.objects.filter(Prefix=prefix_filter):
tasks_to_accomplish.put((s3_object.bucket_name, s3_object.key))
for _ in range(number_of_processes):
Downloader(tasks_to_accomplish, tasks_that_are_done, debug).start()
tasks_to_accomplish.join()
while not tasks_that_are_done.empty():
data.append(tasks_that_are_done.get())
return data