pysondehub/examples/sondehub-athena.ipynb

456 wiersze
44 KiB
Plaintext

{
"cells": [
{
"cell_type": "markdown",
"id": "velvet-valley",
"metadata": {},
"source": [
"# Imports\n",
"Next we import the python libraries we'll be using."
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "attractive-korea",
"metadata": {},
"outputs": [],
"source": [
"# AWS SDK\n",
"import boto3\n",
"\n",
"#numpy, matplotlib and scipy to help us graph and process some of the data\n",
"import numpy as np\n",
"import matplotlib.pyplot as plt\n",
"import scipy.stats as stats\n",
"\n",
"import datetime"
]
},
{
"cell_type": "markdown",
"id": "nuclear-halifax",
"metadata": {},
"source": [
"# Create Athena Data Catalog and Table\n",
"Athena lets us query large datasets quickly. We need to create a table to query against."
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "pacific-content",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'ResponseMetadata': {'RequestId': '2F0CD07F90744DD1',\n",
" 'HostId': '931DypU3uiY5Je0Lncke/D752W7TL+to/vwO7cJCRNADpoSP7fZhPUuZCnaCMh02VhtYnvEZ69M=',\n",
" 'HTTPStatusCode': 200,\n",
" 'HTTPHeaders': {'x-amz-id-2': '931DypU3uiY5Je0Lncke/D752W7TL+to/vwO7cJCRNADpoSP7fZhPUuZCnaCMh02VhtYnvEZ69M=',\n",
" 'x-amz-request-id': '2F0CD07F90744DD1',\n",
" 'date': 'Wed, 10 Feb 2021 09:15:01 GMT',\n",
" 'location': '/sondehub-temp-athena',\n",
" 'content-length': '0',\n",
" 'server': 'AmazonS3'},\n",
" 'RetryAttempts': 0},\n",
" 'Location': '/sondehub-temp-athena'}"
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"athena = boto3.client('athena', region_name=\"us-east-1\")\n",
"s3 = boto3.client('s3', region_name=\"us-east-1\")\n",
"\n",
"athena_bucket_name = \"sondehub-temp-athena\" # change this to be a unique name for you\n",
"# Create a bucket where we are going to store the results\n",
"s3.create_bucket(Bucket=athena_bucket_name) \n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": 26,
"id": "collected-partition",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'ResultSet': {'Rows': [], 'ResultSetMetadata': {'ColumnInfo': []}},\n",
" 'ResponseMetadata': {'RequestId': '28342d38-3090-4d9c-8f89-0df5ce953d1c',\n",
" 'HTTPStatusCode': 200,\n",
" 'HTTPHeaders': {'content-type': 'application/x-amz-json-1.1',\n",
" 'date': 'Wed, 10 Feb 2021 09:33:21 GMT',\n",
" 'x-amzn-requestid': '28342d38-3090-4d9c-8f89-0df5ce953d1c',\n",
" 'content-length': '108',\n",
" 'connection': 'keep-alive'},\n",
" 'RetryAttempts': 0}}"
]
},
"execution_count": 26,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Create database\n",
"create_database = athena.start_query_execution(\n",
" QueryString='CREATE DATABASE sondehub',\n",
" ResultConfiguration={\n",
" 'OutputLocation': f\"s3://{athena_bucket_name}/\"\n",
" }\n",
"\n",
" )\n",
"\n",
"# wait for query to finish\n",
"waiter = s3.get_waiter(\"object_exists\")\n",
"waiter.wait(\n",
" Bucket=athena_bucket_name,\n",
" Key=f\"{create_database['QueryExecutionId']}.txt\"\n",
")\n",
"\n",
"#\n",
"athena.get_query_results(\n",
" QueryExecutionId=create_database['QueryExecutionId']\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 48,
"id": "respiratory-harmony",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'ResultSet': {'Rows': [], 'ResultSetMetadata': {'ColumnInfo': []}},\n",
" 'ResponseMetadata': {'RequestId': '65315949-56cc-403a-9815-a5995ab05d85',\n",
" 'HTTPStatusCode': 200,\n",
" 'HTTPHeaders': {'content-type': 'application/x-amz-json-1.1',\n",
" 'date': 'Wed, 10 Feb 2021 09:50:50 GMT',\n",
" 'x-amzn-requestid': '65315949-56cc-403a-9815-a5995ab05d85',\n",
" 'content-length': '108',\n",
" 'connection': 'keep-alive'},\n",
" 'RetryAttempts': 0}}"
]
},
"execution_count": 48,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Create table\n",
"\n",
"create_table_query = \"\"\"CREATE EXTERNAL TABLE IF NOT EXISTS sondehub.S4620525 (\n",
" `datetime` string,\n",
" `software_name` string,\n",
" `software_version` string,\n",
" `uploader_callsign` string,\n",
" `time_received` string,\n",
" `manufacturer` string,\n",
" `type` string,\n",
" `serial` string,\n",
" `frame` int,\n",
" `lat` float,\n",
" `lon` float,\n",
" `alt` float,\n",
" `subtype` string,\n",
" `temp` float,\n",
" `humidity` float,\n",
" `pressure` float,\n",
" `vel_h` float,\n",
" `vel_v` float,\n",
" `heading` float,\n",
" `sats` int,\n",
" `batt` float,\n",
" `burst_timer` int,\n",
" `xdata` string,\n",
" `frequency` float,\n",
" `snr` float,\n",
" `uploader_position` string,\n",
" `uploader_antenna` string \n",
")\n",
"ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'\n",
"WITH SERDEPROPERTIES (\n",
" 'serialization.format' = '1'\n",
") LOCATION 's3://sondehub-open-data/serial/S4620525/'\n",
"TBLPROPERTIES ('has_encrypted_data'='false');\n",
"\"\"\" # To save costs we are just going to look at one particular serial number S4620525\n",
"create_table = athena.start_query_execution(\n",
" QueryString=create_table_query,\n",
" QueryExecutionContext={\n",
" 'Database': 'sondehub' },\n",
" ResultConfiguration={\n",
" 'OutputLocation': f\"s3://{athena_bucket_name}/\"\n",
" }\n",
"\n",
" )\n",
"\n",
"# wait for query to finish\n",
"waiter = s3.get_waiter(\"object_exists\")\n",
"waiter.wait(\n",
" Bucket=athena_bucket_name,\n",
" Key=f\"{create_table['QueryExecutionId']}.txt\"\n",
")\n",
"\n",
"#\n",
"athena.get_query_results(\n",
" QueryExecutionId=create_table['QueryExecutionId']\n",
")\n",
"\n"
]
},
{
"cell_type": "markdown",
"id": "grave-activation",
"metadata": {},
"source": [
"# Query the Athena database"
]
},
{
"cell_type": "code",
"execution_count": 75,
"id": "hindu-paper",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{'QueryExecutionId': 'baddaa58-e67c-49f6-aad5-0deb91d165f8', 'ResponseMetadata': {'RequestId': '57c956de-ec94-4d4b-b29c-8eaa25101940', 'HTTPStatusCode': 200, 'HTTPHeaders': {'content-type': 'application/x-amz-json-1.1', 'date': 'Wed, 10 Feb 2021 10:06:00 GMT', 'x-amzn-requestid': '57c956de-ec94-4d4b-b29c-8eaa25101940', 'content-length': '59', 'connection': 'keep-alive'}, 'RetryAttempts': 0}}\n"
]
}
],
"source": [
"# query data\n",
"data_query = \"\"\"\n",
"SELECT datetime,alt,snr FROM \"sondehub\".\"S4620525\" \n",
"\"\"\" \n",
"data_athena = athena.start_query_execution(\n",
" QueryString=data_query,\n",
" QueryExecutionContext={\n",
" 'Database': 'sondehub'\n",
" },\n",
" ResultConfiguration={\n",
" 'OutputLocation': f\"s3://{athena_bucket_name}/\"\n",
" }\n",
"\n",
" )\n",
"print(data_athena)\n",
"# wait for query to finish\n",
"waiter = s3.get_waiter(\"object_exists\")\n",
"waiter.wait(\n",
" Bucket=athena_bucket_name,\n",
" Key=f\"{data_athena['QueryExecutionId']}.csv\"\n",
")\n",
"\n",
"#\n",
"query_output = []\n",
"paginator = athena.get_paginator('get_query_results')\n",
"response_iterator = paginator.paginate(\n",
" QueryExecutionId=data_athena['QueryExecutionId']\n",
")\n",
"for query_page in response_iterator:\n",
" query_output = query_output + query_page[\"ResultSet\"][\"Rows\"]\n",
"\n",
"\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": 76,
"id": "western-strap",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{'Data': [{'VarCharValue': 'datetime'}, {'VarCharValue': 'alt'}, {'VarCharValue': 'snr'}]}\n",
"{'Data': [{'VarCharValue': '2021-02-09T22:38:45.001000Z'}, {'VarCharValue': '4.09143'}, {'VarCharValue': '13.6'}]}\n"
]
}
],
"source": [
"# explore the data\n",
"print(query_output[0])\n",
"print(query_output[1])"
]
},
{
"cell_type": "markdown",
"id": "addressed-superior",
"metadata": {},
"source": [
"# Parse data\n",
"To make things easier we use numpy to make an array out of the data. This will help us when we go to graph the data.\n",
"\n",
"In this case we are going to graph altitude, snr vs time"
]
},
{
"cell_type": "code",
"execution_count": 77,
"id": "liquid-draft",
"metadata": {},
"outputs": [],
"source": [
"data = np.array(\n",
" [\n",
" [ \n",
" datetime.datetime.strptime(each[\"Data\"][0][\"VarCharValue\"],\"%Y-%m-%dT%H:%M:%S.%fZ\"),\n",
" float(each[\"Data\"][1][\"VarCharValue\"]),\n",
" float(each[\"Data\"][2][\"VarCharValue\"]) if \"VarCharValue\" in each[\"Data\"][2] else None\n",
" ]\n",
" for each in query_output[1:]\n",
" ]\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 78,
"id": "polish-heritage",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"array([[datetime.datetime(2021, 2, 9, 22, 30, 16, 1000), -6.80747, 7.8],\n",
" [datetime.datetime(2021, 2, 9, 22, 30, 18, 179618), -4.0, None],\n",
" [datetime.datetime(2021, 2, 9, 22, 30, 28, 1000), -7.21371, 10.3],\n",
" ...,\n",
" [datetime.datetime(2021, 2, 10, 4, 43, 0, 118196), 277.0, None],\n",
" [datetime.datetime(2021, 2, 10, 4, 43, 30, 212890), 277.0, None],\n",
" [datetime.datetime(2021, 2, 10, 4, 44, 0, 188437), 277.0, None]],\n",
" dtype=object)"
]
},
"execution_count": 78,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"data = data[np.argsort(data[:, 0])] # order the data by the datetime\n",
"\n",
"data # have a look at the data"
]
},
{
"cell_type": "markdown",
"id": "lesbian-halifax",
"metadata": {},
"source": [
"# Graph the data"
]
},
{
"cell_type": "code",
"execution_count": 79,
"id": "working-membrane",
"metadata": {},
"outputs": [
{
"data": {
"image/png": "\n",
"text/plain": [
"<Figure size 432x288 with 1 Axes>"
]
},
"metadata": {
"needs_background": "light"
},
"output_type": "display_data"
}
],
"source": [
"sonde_time = data[:,0] # get the datetime from the ndarray\n",
"alt = data[:,1] # get the altitude from the ndarray\n",
"\n",
"\n",
"plt.plot(sonde_time, alt)\n",
"plt.show()"
]
},
{
"cell_type": "code",
"execution_count": 80,
"id": "rational-maldives",
"metadata": {},
"outputs": [
{
"data": {
"image/png": "\n",
"text/plain": [
"<Figure size 432x288 with 2 Axes>"
]
},
"metadata": {
"needs_background": "light"
},
"output_type": "display_data"
}
],
"source": [
"# Plot SNR along side\n",
"\n",
"sonde_time = data[:,0] # get the datetime from the ndarray\n",
"alt = data[:,1] # get the altitude from the ndarray\n",
"snr = data[:,2] # get the altitude from the ndarray\n",
"\n",
"fig,ax = plt.subplots()\n",
"\n",
"ax.plot(sonde_time, alt) # plot altitude on the first axis\n",
"ax.set_ylabel(\"Alt\",fontsize=14)\n",
"\n",
"ax2=ax.twinx() # get a second axis\n",
"ax2.set_ylabel(\"SNR\",fontsize=14)\n",
"ax2.plot(sonde_time, snr, color=\"red\")\n",
"\n",
"\n",
"plt.show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "clean-tractor",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.1"
}
},
"nbformat": 4,
"nbformat_minor": 5
}