cusf-standalone-predictor/predict.py

656 wiersze
24 KiB
Python
Czysty Zwykły widok Historia

#!/usr/bin/env python
# Path to predictor binary
pred_binary = './pred_src/pred'
# Modules from the Python standard library.
import datetime
2010-05-30 22:55:47 +00:00
import time as timelib
import math
import sys
import os
import socket
import logging
import traceback
import calendar
import optparse
import subprocess
2012-07-12 23:28:24 +00:00
import statsd
import tempfile
import shutil
import bisect
2010-05-29 17:51:16 +00:00
import simplejson as json
2012-07-12 23:28:24 +00:00
statsd.init_statsd({'STATSD_BUCKET_PREFIX': 'habhub.predictor'})
# We use Pydap from http://pydap.org/.
import pydap.exceptions, pydap.client, pydap.lib
2010-07-21 16:49:21 +00:00
pydap.lib.CACHE = "/tmp/pydap-cache/"
# horrid, horrid monkey patching to force
# otherwise broken caching from dods server
# this is really, really hacky
import pydap.util.http
def fresh(response_headers, request_headers):
cc = pydap.util.http.httplib2._parse_cache_control(response_headers)
if cc.has_key('no-cache'):
return 'STALE'
return 'FRESH'
pydap.util.http.httplib2._entry_disposition = fresh
# Output logger format
log = logging.getLogger('main')
log_formatter = logging.Formatter('%(levelname)s: %(message)s')
console = logging.StreamHandler()
console.setFormatter(log_formatter)
log.addHandler(console)
progress_f = ''
progress = {
'run_time': '',
'gfs_percent': 0,
'gfs_timeremaining': '',
'gfs_complete': False,
'gfs_timestamp': '',
'pred_running': False,
'pred_complete': False,
'warnings': False,
'pred_output': [],
'error': '',
}
def update_progress(**kwargs):
global progress_f
global progress
for arg in kwargs:
progress[arg] = kwargs[arg]
try:
progress_f.truncate(0)
progress_f.seek(0)
progress_f.write(json.dumps(progress))
progress_f.flush()
os.fsync(progress_f.fileno())
except IOError:
global log
log.error('Could not update progress file')
2012-07-12 23:28:24 +00:00
@statsd.StatsdTimer.wrap('time')
def main():
"""
The main program routine.
"""
2012-07-12 23:28:24 +00:00
statsd.increment('run')
# Set up our command line options
parser = optparse.OptionParser()
parser.add_option('-d', '--cd', dest='directory',
help='change to, and run in, directory DIR',
metavar='DIR')
parser.add_option('--fork', dest='fork', action="store_true",
help='detach the process and run in the background')
parser.add_option('--alarm', dest='alarm', action="store_true",
help='setup an alarm for 10 minutes time to prevent hung processes')
parser.add_option('--redirect', dest='redirect', default='/dev/null',
help='if forking, file to send stdout/stderr to', metavar='FILE')
parser.add_option('-t', '--timestamp', dest='timestamp',
help='search for dataset covering the POSIX timestamp TIME \t[default: now]',
metavar='TIME', type='int',
default=calendar.timegm(datetime.datetime.utcnow().timetuple()))
parser.add_option('-v', '--verbose', action='count', dest='verbose',
help='be verbose. The more times this is specified the more verbose.', default=False)
parser.add_option('-p', '--past', dest='past',
help='window of time to save data is at most HOURS hours in past [default: %default]',
metavar='HOURS',
type='int', default=3)
parser.add_option('-f', '--future', dest='future',
help='window of time to save data is at most HOURS hours in future [default: %default]',
metavar='HOURS',
type='int', default=9)
parser.add_option('--hd', dest='hd', action="store_true",
help='use higher definition GFS data (default: no)')
parser.add_option('--preds', dest='preds_path',
help='path that contains uuid folders for predictions [default: %default]',
default='./predict/preds/', metavar='PATH')
group = optparse.OptionGroup(parser, "Location specifiers",
"Use these options to specify a particular tile of data to download.")
group.add_option('--lat', dest='lat',
help='tile centre latitude in range (-90,90) degrees north [default: %default]',
metavar='DEGREES',
type='float', default=52)
group.add_option('--lon', dest='lon',
help='tile centre longitude in degrees east [default: %default]',
metavar='DEGREES',
type='float', default=0)
group.add_option('--latdelta', dest='latdelta',
help='tile radius in latitude in degrees [default: %default]',
metavar='DEGREES',
type='float', default=5)
group.add_option('--londelta', dest='londelta',
help='tile radius in longitude in degrees [default: %default]',
metavar='DEGREES',
type='float', default=5)
parser.add_option_group(group)
#group = optparse.OptionGroup(parser, "Tile specifiers",
#"Use these options to specify how many tiles to download.")
#group.add_option('--lattiles', dest='lattiles',
#metavar='TILES',
#help='number of tiles along latitude to download [default: %default]',
#type='int', default=1)
#group.add_option('--lontiles', dest='lontiles',
#metavar='TILES',
#help='number of tiles along longitude to download [default: %default]',
#type='int', default=1)
#parser.add_option_group(group)
(options, args) = parser.parse_args()
# Check we got a UUID in the arguments
if len(args) != 1:
log.error('Exactly one positional argument should be supplied (uuid).')
2012-07-12 23:28:24 +00:00
statsd.increment('error')
sys.exit(1)
if options.directory:
os.chdir(options.directory)
if options.fork:
detach_process(options.redirect)
if options.alarm:
setup_alarm()
uuid = args[0]
uuid_path = options.preds_path + "/" + uuid + "/"
# Check we're not already running with this UUID
for line in os.popen('ps xa'):
process = " ".join(line.split()[4:])
if process.find(uuid) > 0:
pid = int(line.split()[0])
if pid != os.getpid():
2012-07-12 23:28:24 +00:00
statsd.increment('duplicate')
log.error('A process is already running for this UUID, quitting.')
sys.exit(1)
# Make the UUID directory if non existant
if not os.path.exists(uuid_path):
os.mkdir(uuid_path, 0770)
# Open the progress.json file for writing, creating it and closing again to flush
global progress_f
global progress
try:
progress_f = open(uuid_path+"progress.json", "w")
update_progress(
gfs_percent=0,
gfs_timeremaining="Please wait...",
run_time=str(int(timelib.time())))
except IOError:
log.error('Error opening progress.json file')
2012-07-12 23:28:24 +00:00
statsd.increment('error')
sys.exit(1)
# Check the predictor binary exists
2010-05-29 17:51:16 +00:00
if not os.path.exists(pred_binary):
log.error('Predictor binary does not exist.')
2012-07-12 23:28:24 +00:00
statsd.increment('error')
sys.exit(1)
# Check the latitude is in the right range.
if (options.lat < -90) | (options.lat > 90):
log.error('Latitude %s is outside of the range (-90,90).')
2012-07-12 23:28:24 +00:00
statsd.increment('error')
sys.exit(1)
# Check the delta sizes are valid.
if (options.latdelta <= 0.5) | (options.londelta <= 0.5):
log.error('Latitiude and longitude deltas must be at least 0.5 degrees.')
2012-07-12 23:28:24 +00:00
statsd.increment('error')
sys.exit(1)
if options.londelta > 180:
log.error('Longitude window sizes greater than 180 degrees are meaningless.')
2012-07-12 23:28:24 +00:00
statsd.increment('error')
sys.exit(1)
# We need to wrap the longitude into the right range.
options.lon = canonicalise_longitude(options.lon)
# How verbose are we being?
if options.verbose > 0:
log.setLevel(logging.INFO)
if options.verbose > 1:
log.setLevel(logging.DEBUG)
if options.verbose > 2:
logging.basicConfig(level=logging.INFO)
if options.verbose > 3:
logging.basicConfig(level=logging.DEBUG)
log.debug('Using cache directory: %s' % pydap.lib.CACHE)
timestamp_to_find = options.timestamp
time_to_find = datetime.datetime.utcfromtimestamp(timestamp_to_find)
log.info('Looking for latest dataset which covers %s' % time_to_find.ctime())
try:
dataset = dataset_for_time(time_to_find, options.hd)
except:
log.error('Could not locate a dataset for the requested time.')
2012-07-12 23:28:24 +00:00
statsd.increment('no_dataset')
statsd.increment('error')
sys.exit(1)
dataset_times = map(timestamp_to_datetime, dataset.time)
dataset_timestamps = map(datetime_to_posix, dataset_times)
log.info('Found appropriate dataset:')
log.info(' Start time: %s (POSIX %s)' % \
(dataset_times[0].ctime(), dataset_timestamps[0]))
log.info(' End time: %s (POSIX %s)' % \
(dataset_times[-1].ctime(), dataset_timestamps[-1]))
log.info(' Latitude: %s -> %s' % (min(dataset.lat), max(dataset.lat)))
log.info(' Longitude: %s -> %s' % (min(dataset.lon), max(dataset.lon)))
# for dlat in range(0,options.lattiles):
# for dlon in range(0,options.lontiles):
window = ( \
options.lat, options.latdelta, \
options.lon, options.londelta)
gfs_dir = "/var/www/cusf-standalone-predictor/gfs/"
gfs_dir = tempfile.mkdtemp(dir=gfs_dir)
gfs_filename = "gfs_%(time)_%(lat)_%(lon)_%(latdelta)_%(londelta).dat"
output_format = os.path.join(gfs_dir, gfs_filename)
write_file(output_format, dataset, \
window, \
time_to_find - datetime.timedelta(hours=options.past), \
time_to_find + datetime.timedelta(hours=options.future))
2010-07-21 16:49:21 +00:00
#purge_cache()
update_progress(gfs_percent=100, gfs_timeremaining='Done', gfs_complete=True, pred_running=True)
if options.alarm:
alarm_flags = ["-a120"]
else:
alarm_flags = []
command = [pred_binary, '-i' + gfs_dir, '-v', '-o'+uuid_path+'flight_path.csv', uuid_path+'scenario.ini'] + alarm_flags
2012-12-25 00:28:42 +00:00
pred_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
pred_output = []
while True:
line = pred_process.stdout.readline()
if line == '':
break
# pass through
sys.stdout.write(line)
2012-12-25 00:28:42 +00:00
if "ERROR: Do not have wind data" in line:
2012-12-25 23:24:18 +00:00
pred_output = ["One of the latitude, longitude or time deltas ({0}, {1}, {2}) was too small."
.format(options.latdelta, options.londelta, options.future),
"Please adjust the settings accordingly and re-run your prediction.",
""] + pred_output
2012-12-25 00:28:42 +00:00
if ("WARN" in line or "ERROR" in line) and len(pred_output) < 10:
2012-12-25 00:28:42 +00:00
pred_output.append(line.strip())
exit_code = pred_process.wait()
shutil.rmtree(gfs_dir)
2012-12-25 00:28:42 +00:00
if exit_code == 1:
# Hard error from the predictor. Tell the javascript it completed, so that it will show the trace,
# but pop up a 'warnings' window with the error messages
update_progress(pred_running=False, pred_complete=True, warnings=True, pred_output=pred_output)
statsd.increment('success_serious_warnings')
elif pred_output:
# Soft error (altitude too low error, typically): pred_output being set forces the debug
# window open with the messages in
update_progress(pred_running=False, pred_complete=True, pred_output=pred_output)
statsd.increment('success_minor_warnings')
else:
assert exit_code == 0
update_progress(pred_running=False, pred_complete=True)
statsd.increment('success')
def purge_cache():
"""
Purge the pydap cache (if set).
"""
if pydap.lib.CACHE is None:
return
log.info('Purging PyDAP cache.')
for file in os.listdir(pydap.lib.CACHE):
log.debug(' Deleting %s.' % file)
os.remove(pydap.lib.CACHE + file)
def write_file(output_format, data, window, mintime, maxtime):
log.info('Downloading data in window (lat, lon) = (%s +/- %s, %s +/- %s).' % window)
# Firstly, get the hgtprs variable to extract the times we're going to use.
hgtprs_global = data['hgtprs']
# Check the dimensions are what we expect.
assert(hgtprs_global.dimensions == ('time', 'lev', 'lat', 'lon'))
# Work out what times we want to download
times = sorted(map(timestamp_to_datetime, hgtprs_global.maps['time']))
times_first = max(0, bisect.bisect_right(times, mintime) - 1)
times_last = min(len(times), bisect.bisect_left(times, maxtime) + 1)
times = times[times_first:times_last]
num_times = len(times)
current_time = 0
start_time = min(times)
end_time = max(times)
log.info('Downloading from %s to %s.' % (start_time.ctime(), end_time.ctime()))
# Filter the longitudes we're actually going to use.
longitudes = filter(lambda x: longitude_distance(x[1], window[2]) <= window[3] ,
enumerate(hgtprs_global.maps['lon']))
# Filter the latitudes we're actually going to use.
latitudes = filter(lambda x: math.fabs(x[1] - window[0]) <= window[1] ,
enumerate(hgtprs_global.maps['lat']))
update_progress(gfs_percent=10, gfs_timeremaining="Please wait...")
starttime = datetime.datetime.now()
# Write one file for each time index.
for timeidx, time in enumerate(hgtprs_global.maps['time']):
timestamp = datetime_to_posix(timestamp_to_datetime(time))
if (timestamp < datetime_to_posix(start_time)) | (timestamp > datetime_to_posix(end_time)):
continue
current_time += 1
log.info('Downloading data for %s.' % (timestamp_to_datetime(time).ctime()))
downloaded_data = { }
current_var = 0
time_per_var = datetime.timedelta()
for var in ('hgtprs', 'ugrdprs', 'vgrdprs'):
current_var += 1
grid = data[var]
log.info('Processing variable \'%s\' with shape %s...' % (var, grid.shape))
# Check the dimensions are what we expect.
assert(grid.dimensions == ('time', 'lev', 'lat', 'lon'))
# See if the longitude ragion wraps...
if (longitudes[0][0] == 0) & (longitudes[-1][0] == hgtprs_global.maps['lat'].shape[0]-1):
# Download the data. Unfortunately, OpeNDAP only supports remote access of
# contiguous regions. Since the longitude data wraps, we may require two
# windows. The 'right' way to fix this is to download a 'left' and 'right' window
# and munge them together. In terms of download speed, however, the overhead of
# downloading is so great that it is quicker to download all the longitude
# data in our slice and do the munging afterwards.
selection = grid[\
timeidx,
:, \
latitudes[0][0]:(latitudes[-1][0]+1),
: ]
else:
selection = grid[\
timeidx,
:, \
latitudes[0][0]:(latitudes[-1][0]+1),
longitudes[0][0]:(longitudes[-1][0]+1) ]
# Cache the downloaded data for later
downloaded_data[var] = selection
log.info(' Downloaded data has shape %s...', selection.shape)
assert len(selection.shape) == 3
now = datetime.datetime.now()
time_elapsed = now - starttime
num_vars = (current_time - 1)*3 + current_var
time_per_var = time_elapsed / num_vars
total_time = num_times * 3 * time_per_var
time_left = total_time - time_elapsed
2010-05-30 22:55:47 +00:00
time_left = timelib.strftime('%M:%S', timelib.gmtime(time_left.seconds))
update_progress(gfs_percent=int(
10 +
(((current_time - 1) * 90) / num_times) +
((current_var * 90) / (3 * num_times))
), gfs_timeremaining=time_left)
# Check all the downloaded data has the same shape
target_shape = downloaded_data['hgtprs']
assert( all( map( lambda x: x == target_shape,
filter( lambda x: x.shape, downloaded_data.itervalues() ) ) ) )
log.info('Writing output...')
hgtprs = downloaded_data['hgtprs']
ugrdprs = downloaded_data['ugrdprs']
vgrdprs = downloaded_data['vgrdprs']
log.debug('Using longitudes: %s' % (map(lambda x: x[1], longitudes),))
output_filename = output_format
output_filename = output_filename.replace('%(time)', str(timestamp))
output_filename = output_filename.replace('%(lat)', str(window[0]))
output_filename = output_filename.replace('%(latdelta)', str(window[1]))
output_filename = output_filename.replace('%(lon)', str(window[2]))
output_filename = output_filename.replace('%(londelta)', str(window[3]))
log.info(' Writing \'%s\'...' % output_filename)
output = open(output_filename, 'w')
# Write the header.
output.write('# window centre latitude, window latitude radius, window centre longitude, window longitude radius, POSIX timestamp\n')
header = window + (timestamp,)
output.write(','.join(map(str,header)) + '\n')
# Write the axis count.
output.write('# num_axes\n')
output.write('3\n') # FIXME: HARDCODED!
# Write each axis, a record showing the size and then one with the values.
output.write('# axis 1: pressures\n')
output.write(str(hgtprs.maps['lev'].shape[0]) + '\n')
output.write(','.join(map(str,hgtprs.maps['lev'][:])) + '\n')
output.write('# axis 2: latitudes\n')
output.write(str(len(latitudes)) + '\n')
output.write(','.join(map(lambda x: str(x[1]), latitudes)) + '\n')
output.write('# axis 3: longitudes\n')
output.write(str(len(longitudes)) + '\n')
output.write(','.join(map(lambda x: str(x[1]), longitudes)) + '\n')
# Write the number of lines of data.
output.write('# number of lines of data\n')
output.write('%s\n' % (hgtprs.shape[0] * len(latitudes) * len(longitudes)))
# Write the number of components in each data line.
output.write('# data line component count\n')
output.write('3\n') # FIXME: HARDCODED!
# Write the data itself.
output.write('# now the data in axis 3 major order\n')
output.write('# data is: '
'geopotential height [gpm], u-component wind [m/s], '
'v-component wind [m/s]\n')
for pressureidx, pressure in enumerate(hgtprs.maps['lev']):
for latidx, latitude in enumerate(hgtprs.maps['lat']):
for lonidx, longitude in enumerate(hgtprs.maps['lon']):
if longitude_distance(longitude, window[2]) > window[3]:
continue
record = ( hgtprs.array[pressureidx,latidx,lonidx], \
ugrdprs.array[pressureidx,latidx,lonidx], \
vgrdprs.array[pressureidx,latidx,lonidx] )
output.write(','.join(map(str,record)) + '\n')
def canonicalise_longitude(lon):
"""
The GFS model has all longitudes in the range 0.0 -> 359.5. Canonicalise
a longitude so that it fits in this range and return it.
"""
lon = math.fmod(lon, 360)
if lon < 0.0:
lon += 360.0
assert((lon >= 0.0) & (lon < 360.0))
return lon
def longitude_distance(lona, lonb):
"""
Return the shortest distance in degrees between longitudes lona and lonb.
"""
distances = ( \
math.fabs(lona - lonb), # Straightforward distance
360 - math.fabs(lona - lonb), # Other way 'round.
)
return min(distances)
def datetime_to_posix(time):
"""
Convert a datetime object to a POSIX timestamp.
"""
return calendar.timegm(time.timetuple())
def timestamp_to_datetime(timestamp):
"""
Convert a GFS fractional timestamp into a datetime object representing
that time.
"""
# The GFS timestmp is a floating point number fo days from the epoch,
# day '0' appears to be January 1st 1 AD.
(fractional_day, integer_day) = math.modf(timestamp)
# Unfortunately, the datetime module uses a different epoch.
ordinal_day = int(integer_day - 1)
# Convert the integer day to a time and add the fractional day.
return datetime.datetime.fromordinal(ordinal_day) + \
datetime.timedelta(days = fractional_day)
def possible_urls(time, hd):
"""
Given a datetime object representing a date and time, return a list of
possible data URLs which could cover that period.
The list is ordered from latest URL (i.e. most likely to be correct) to
earliest.
We assume that a particular data set covers a period of P days and
hence the earliest data set corresponds to time T - P and the latest
available corresponds to time T given target time T.
"""
period = datetime.timedelta(days = 7.5)
earliest = time - period
latest = time
if hd:
url_format = 'http://{host}:9090/dods/gfs_hd/gfs_hd%i%02i%02i/gfs_hd_%02iz'
else:
url_format = 'http://{host}:9090/dods/gfs/gfs%i%02i%02i/gfs_%02iz'
# Often we have issues where one IP address (the DNS resolves to 2 or more)
# will have a dataset and the other one won't yet.
# This causes "blah is not an available dataset" errors since predict.py
# thinks it's OK to use a recent one, and then by chance we end up talking
# to a server on a later request that doesn't have it.
selected_ip = socket.gethostbyname("nomads.ncep.noaa.gov")
log.info("Picked IP: {0}".format(selected_ip))
url_format = url_format.format(host=selected_ip)
# Start from the latest, work to the earliest
proposed = latest
possible_urls = []
while proposed >= earliest:
for offset in ( 18, 12, 6, 0 ):
possible_urls.append(url_format % \
(proposed.year, proposed.month, proposed.day, offset))
proposed -= datetime.timedelta(days = 1)
return possible_urls
def dataset_for_time(time, hd):
"""
Given a datetime object, attempt to find the latest dataset which covers that
time and return pydap dataset object for it.
"""
url_list = possible_urls(time, hd)
for url in url_list:
try:
log.debug('Trying dataset at %s.' % url)
dataset = pydap.client.open_url(url)
start_time = timestamp_to_datetime(dataset.time[0])
end_time = timestamp_to_datetime(dataset.time[-1])
if start_time <= time and end_time >= time:
log.info('Found good dataset at %s.' % url)
dataset_id = url.split("/")[5] + "_" + url.split("/")[6].split("_")[1]
update_progress(gfs_timestamp=dataset_id)
return dataset
except pydap.exceptions.ServerError:
# Skip server error.
pass
raise RuntimeError('Could not find appropriate dataset.')
def detach_process(redirect):
# Fork
if os.fork() > 0:
os._exit(0)
# Detach
os.setsid()
null_fd = os.open(os.devnull, os.O_RDONLY)
out_fd = os.open(redirect, os.O_WRONLY | os.O_APPEND)
os.dup2(null_fd, sys.stdin.fileno())
for s in [sys.stdout, sys.stderr]:
os.dup2(out_fd, s.fileno())
# Fork
if os.fork() > 0:
os._exit(0)
def setup_alarm():
# Prevent hung download:
import signal
signal.alarm(600)
# If this is being run from the interpreter, run the main function.
if __name__ == '__main__':
try:
main()
except SystemExit as e:
log.debug("Exit: " + repr(e))
if e.code != 0 and progress_f:
update_progress(error="Unknown error exit")
2012-07-12 23:31:08 +00:00
statsd.increment("unknown_error_exit")
raise
except Exception as e:
2012-07-12 23:31:08 +00:00
statsd.increment("uncaught_exception")
log.exception("Uncaught exception")
info = traceback.format_exc()
if progress_f:
update_progress(error="Unhandled exception: " + info)
raise