diff --git a/README.md b/README.md index 85b9aaa..711c9a3 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ sondehub.Stream(sondes=["serial number"], on_message=callback) ``` If no `sondes` list is provided then all radiosondes will be streamed. -On message callback will contain a python dictonary using the [Universal Sonde Telemetry Format](https://github.com/projecthorus/radiosonde_auto_rx/wiki/Suggested-Universal-Sonde-Telemetry-Format) +On message callback will contain a python dictonary using the [Universal Sonde Telemetry Format](https://github.com/projecthorus/radiosonde_auto_rx/wiki/SondeHub-DB-Universal-Telemetry-Format) ``` @@ -66,4 +66,16 @@ sondehub | jq . } .... +``` + + +Open Data Access +== + +A basic interface to the Open Data is a available using `sondehub.download(serial=None, datetime_prefix=None)` + +``` +import sondehub +frames = sondehub.download(datetime_prefix="2018-10-01") +frames = sondehub.download(serial="serial") ``` \ No newline at end of file diff --git a/example.py b/example.py index 6a3ae1b..2ebfe6d 100644 --- a/example.py +++ b/example.py @@ -1,11 +1,16 @@ import sondehub +from multiprocessing import Queue +# def on_message(message): +# print(f"{message['serial']} - {message['alt']}") +# test = sondehub.Stream(on_message=on_message) +# while 1: +# pass -def on_message(message): - print(message) +print(sondehub.download(serial="S1120364")) +#print(sondehub.download(datetime_prefix="2021-02-04T02:19")) +# tasks_to_accomplish = Queue() +# blah = Queue() +# tasks_to_accomplish.put(("sondehub-open-data","serial/S1120364/2020-10-18T01:10:12.152254Z-5fb87ccb-7a0e-4975-b39f-593e19bc2bcd.json")) - -#test = sondehub.Stream(sondes=["R3320848"], on_message=on_message) -test = sondehub.Stream(on_message=on_message) -while 1: - pass +# print(sondehub.parallel_download(tasks_to_accomplish,blah)) \ No newline at end of file diff --git a/poetry.lock b/poetry.lock index 3cf58a4..3f81cdf 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,3 +1,46 @@ +[[package]] +name = "boto3" +version = "1.14.44" +description = "The AWS SDK for Python" +category = "main" +optional = false +python-versions = "*" + +[package.dependencies] +botocore = ">=1.17.44,<1.18.0" +jmespath = ">=0.7.1,<1.0.0" +s3transfer = ">=0.3.0,<0.4.0" + +[[package]] +name = "botocore" +version = "1.17.44" +description = "Low-level, data-driven core of boto 3." +category = "main" +optional = false +python-versions = "*" + +[package.dependencies] +docutils = ">=0.10,<0.16" +jmespath = ">=0.7.1,<1.0.0" +python-dateutil = ">=2.1,<3.0.0" +urllib3 = {version = ">=1.20,<1.26", markers = "python_version != \"3.4\""} + +[[package]] +name = "docutils" +version = "0.15.2" +description = "Docutils -- Python Documentation Utilities" +category = "main" +optional = false +python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*" + +[[package]] +name = "jmespath" +version = "0.10.0" +description = "JSON Matching Expressions" +category = "main" +optional = false +python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*" + [[package]] name = "paho-mqtt" version = "1.5.1" @@ -9,12 +52,88 @@ python-versions = "*" [package.extras] proxy = ["pysocks"] +[[package]] +name = "python-dateutil" +version = "2.8.1" +description = "Extensions to the standard Python datetime module" +category = "main" +optional = false +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7" + +[package.dependencies] +six = ">=1.5" + +[[package]] +name = "s3transfer" +version = "0.3.4" +description = "An Amazon S3 Transfer Manager" +category = "main" +optional = false +python-versions = "*" + +[package.dependencies] +botocore = ">=1.12.36,<2.0a.0" + +[[package]] +name = "six" +version = "1.15.0" +description = "Python 2 and 3 compatibility utilities" +category = "main" +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*" + +[[package]] +name = "urllib3" +version = "1.25.11" +description = "HTTP library with thread-safe connection pooling, file post, and more." +category = "main" +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, <4" + +[package.extras] +brotli = ["brotlipy (>=0.6.0)"] +secure = ["pyOpenSSL (>=0.14)", "cryptography (>=1.3.4)", "idna (>=2.0.0)", "certifi", "ipaddress"] +socks = ["PySocks (>=1.5.6,!=1.5.7,<2.0)"] + [metadata] lock-version = "1.1" -python-versions = "^3.9" -content-hash = "76d5dfeb617b3f477c15007458ec4f5fad94e4d9ab19f2c8080bb7581685bc1f" +python-versions = "^3.6" +content-hash = "5eea1d9524a6003f57f557fe7d9d72b25e416ed1bcdbf691db48a0dd01979cbe" [metadata.files] +boto3 = [ + {file = "boto3-1.14.44-py2.py3-none-any.whl", hash = "sha256:b1936392d1491b4de04a972636c91904ec814519d8847d11e3375d77d5df1b12"}, + {file = "boto3-1.14.44.tar.gz", hash = "sha256:01d45a986abd81d4b55f5649060fcbc49f6f87deaa508b99367a9d9ea21c0347"}, +] +botocore = [ + {file = "botocore-1.17.44-py2.py3-none-any.whl", hash = "sha256:1b46ffe1d13922066c873323186cbf97e77c137e08e27039d9d684552ccc4892"}, + {file = "botocore-1.17.44.tar.gz", hash = "sha256:1f6175bf59ffa068055b65f7d703eb1f748c338594a40dfdc645a6130280d8bb"}, +] +docutils = [ + {file = "docutils-0.15.2-py2-none-any.whl", hash = "sha256:9e4d7ecfc600058e07ba661411a2b7de2fd0fafa17d1a7f7361cd47b1175c827"}, + {file = "docutils-0.15.2-py3-none-any.whl", hash = "sha256:6c4f696463b79f1fb8ba0c594b63840ebd41f059e92b31957c46b74a4599b6d0"}, + {file = "docutils-0.15.2.tar.gz", hash = "sha256:a2aeea129088da402665e92e0b25b04b073c04b2dce4ab65caaa38b7ce2e1a99"}, +] +jmespath = [ + {file = "jmespath-0.10.0-py2.py3-none-any.whl", hash = "sha256:cdf6525904cc597730141d61b36f2e4b8ecc257c420fa2f4549bac2c2d0cb72f"}, + {file = "jmespath-0.10.0.tar.gz", hash = "sha256:b85d0567b8666149a93172712e68920734333c0ce7e89b78b3e987f71e5ed4f9"}, +] paho-mqtt = [ {file = "paho-mqtt-1.5.1.tar.gz", hash = "sha256:9feb068e822be7b3a116324e01fb6028eb1d66412bf98595ae72698965cb1cae"}, ] +python-dateutil = [ + {file = "python-dateutil-2.8.1.tar.gz", hash = "sha256:73ebfe9dbf22e832286dafa60473e4cd239f8592f699aa5adaf10050e6e1823c"}, + {file = "python_dateutil-2.8.1-py2.py3-none-any.whl", hash = "sha256:75bb3f31ea686f1197762692a9ee6a7550b59fc6ca3a1f4b5d7e32fb98e2da2a"}, +] +s3transfer = [ + {file = "s3transfer-0.3.4-py2.py3-none-any.whl", hash = "sha256:1e28620e5b444652ed752cf87c7e0cb15b0e578972568c6609f0f18212f259ed"}, + {file = "s3transfer-0.3.4.tar.gz", hash = "sha256:7fdddb4f22275cf1d32129e21f056337fd2a80b6ccef1664528145b72c49e6d2"}, +] +six = [ + {file = "six-1.15.0-py2.py3-none-any.whl", hash = "sha256:8b74bedcbbbaca38ff6d7491d76f2b06b3592611af620f8426e82dddb04a5ced"}, + {file = "six-1.15.0.tar.gz", hash = "sha256:30639c035cdb23534cd4aa2dd52c3bf48f06e5f4a941509c8bafd8ce11080259"}, +] +urllib3 = [ + {file = "urllib3-1.25.11-py2.py3-none-any.whl", hash = "sha256:f5321fbe4bf3fefa0efd0bfe7fb14e90909eb62a48ccda331726b4319897dd5e"}, + {file = "urllib3-1.25.11.tar.gz", hash = "sha256:8d7eaa5a82a1cac232164990f04874c594c9453ec55eef02eab885aa02fc17a2"}, +] diff --git a/pyproject.toml b/pyproject.toml index 1d4c0cd..559bbff 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "sondehub" -version = "0.1.5" +version = "0.1.7" description = "SDK to access SondeHub open data" authors = ["Michaela "] readme = "README.md" @@ -12,6 +12,7 @@ license = "GPL-3.0-or-later" [tool.poetry.dependencies] python = "^3.6" paho-mqtt = "^1.5.1" +boto3 = "^1.14.44" [tool.poetry.dev-dependencies] diff --git a/sondehub/__init__.py b/sondehub/__init__.py index 8c04439..f76fd29 100644 --- a/sondehub/__init__.py +++ b/sondehub/__init__.py @@ -2,6 +2,14 @@ import paho.mqtt.client as mqtt from urllib.parse import urlparse import http.client import json +import boto3 +import sys +import threading +from queue import Queue +import queue + + +S3_BUCKET = "sondehub-open-data" class Stream: @@ -73,3 +81,58 @@ class Stream: def disconnect(self): self.mqttc.disconnect() + + +class Downloader(threading.Thread): + def __init__( + self, tasks_to_accomplish, tasks_that_are_done, debug=False, *args, **kwargs + ): + self.tasks_to_accomplish = tasks_to_accomplish + self.tasks_that_are_done = tasks_that_are_done + self.debug = debug + super().__init__(*args, **kwargs) + + def run(self): + s3 = boto3.client("s3") + while True: + try: + task = self.tasks_to_accomplish.get_nowait() + except queue.Empty: + return + data = s3.get_object(Bucket=task[0], Key=task[1]) + response = json.loads(data["Body"].read()) + if self.debug: + print(response) + self.tasks_that_are_done.put(response) + self.tasks_to_accomplish.task_done() + + +def download(serial=None, datetime_prefix=None, debug=False): + if serial: + prefix_filter = f"serial/{serial}/" + elif serial and datetime_prefix: + prefix_filter = f"serial/{serial}/{datetime_prefix}" + elif datetime_prefix: + prefix_filter = f"date/{datetime_prefix}" + else: + prefix_filter = "date/" + + s3 = boto3.resource("s3") + bucket = s3.Bucket(S3_BUCKET) + data = [] + + number_of_processes = 50 + tasks_to_accomplish = Queue() + tasks_that_are_done = Queue() + + for s3_object in bucket.objects.filter(Prefix=prefix_filter): + tasks_to_accomplish.put((s3_object.bucket_name, s3_object.key)) + + for _ in range(number_of_processes): + Downloader(tasks_to_accomplish, tasks_that_are_done, debug).start() + tasks_to_accomplish.join() + + while not tasks_that_are_done.empty(): + data.append(tasks_that_are_done.get()) + + return data