chasemapper/chasemapper/tawhiri.py

150 wiersze
4.0 KiB
Python

#!/usr/bin/env python
#
# Project Horus - Browser-Based Chase Mapper - Tawhiri Interface
#
# Grab predictions from the Tawhiri Predictions API
# Refer here for documentation on Tawhiri: https://tawhiri.readthedocs.io/en/latest/api.html
#
# Copyright (C) 2020 Mark Jessop <vk5qi@rfhead.net>
# Released under GNU GPL v3 or later
#
import datetime
import logging
import pytz
import requests
import subprocess
from dateutil.parser import parse
from threading import Thread
TAWHIRI_API_URL = "http://api.v2.sondehub.org/tawhiri"
def get_tawhiri_prediction(
launch_datetime,
launch_latitude,
launch_longitude,
launch_altitude=0,
ascent_rate=5.0,
burst_altitude=30000.0,
descent_rate=5.0,
profile="standard_profile",
dataset=None,
timeout=10,
):
""" Request a Prediction from the Tawhiri Predictor API """
# Localise supplied time to UTC if not already done
if launch_datetime.tzinfo is None:
launch_datetime = pytz.utc.localize(launch_datetime)
# Create RFC3339-compliant timestamp
_dt_rfc3339 = launch_datetime.isoformat()
# Normalise longitude to range 0 to 360
if launch_longitude < 0:
launch_longitude += 360
_params = {
"launch_latitude": launch_latitude,
"launch_longitude": launch_longitude,
"launch_altitude": launch_altitude,
"launch_datetime": _dt_rfc3339,
"ascent_rate": ascent_rate,
"descent_rate": descent_rate,
"burst_altitude": burst_altitude,
"profile": profile,
}
if dataset:
_params["dataset"] = dataset
logging.debug("Tawhiri - Requesting prediction using parameters: %s" % str(_params))
try:
_r = requests.get(TAWHIRI_API_URL, params=_params, timeout=timeout)
_json = _r.json()
if "error" in _json:
# The Tawhiri API has returned an error
_error = "%s: %s" % (_json["error"]["type"], _json["error"]["description"])
logging.error("Tawhiri - %s" % _error)
return None
else:
return parse_tawhiri_data(_json)
except Exception as e:
logging.error("Tawhiri - Error running prediction: %s" % str(e))
return None
def parse_tawhiri_data(data):
""" Parse a returned flight trajectory from Tawhiri, and convert it to a cusf_predictor_wrapper compatible format """
_epoch = pytz.utc.localize(datetime.datetime(1970, 1, 1))
# Extract dataset information
_dataset = parse(data["request"]["dataset"])
_dataset = _dataset.strftime("%Y%m%d%Hz")
_path = []
for _stage in data["prediction"]:
_trajectory = _stage["trajectory"]
for _point in _trajectory:
# Normalise longitude to range -180 to 180
if _point["longitude"] > 180:
_point["longitude"] -= 360
# Create UTC timestamp without using datetime.timestamp(), for Python 2.7 backwards compatibility.
_dt = parse(_point["datetime"])
_dt_timestamp = (_dt - _epoch).total_seconds()
_path.append(
[
_dt_timestamp,
_point["latitude"],
_point["longitude"],
_point["altitude"],
]
)
_output = {"dataset": _dataset, "path": _path}
return _output
if __name__ == "__main__":
import datetime
import pprint
logging.basicConfig(
format="%(asctime)s %(levelname)s:%(message)s", level=logging.INFO
)
_now = datetime.datetime.utcnow()
# Regular complete-flightpath prediction
_data = get_tawhiri_prediction(
launch_datetime=_now,
launch_latitude=-34.9499,
launch_longitude=138.5194,
launch_altitude=0,
)
pprint.pprint(_data)
# Descent prediction
_data = get_tawhiri_prediction(
launch_datetime=_now,
launch_latitude=-34.9499,
launch_longitude=138.5194,
launch_altitude=10000,
burst_altitude=10001,
descent_rate=abs(-6.0),
)
pprint.pprint(_data)