#! /usr/bin/env python
from PIL import Image, ImageDraw, ImageFont
import os, sys, gzip, math, argparse, colorsys, datetime
from collections import defaultdict
from itertools import *
urlretrieve = lambda a, b: None
import urllib.request
urlretrieve = urllib.request.urlretrieve
import urllib
urlretrieve = urllib.urlretrieve
# todo:
# matplotlib powered --interactive
# arbitrary freq marker spacing
# ppm
# blue-less marker grid
# fast summary thing
# gain normalization
# check pil version for brokenness
vera_url = ""
vera_path = os.path.join(sys.path[0], "Vera.ttf")
tape_height = 25
tape_pt = 10
if not os.path.isfile(vera_path):
urlretrieve(vera_url, vera_path)
font = ImageFont.truetype(vera_path, 10)
print('Please download the Vera.ttf font and place it in the current directory.')
def build_parser():
parser = argparse.ArgumentParser(description='Convert rtl_power CSV files into graphics.')
parser.add_argument('input_path', metavar='INPUT', type=str,
help='Input CSV file. (may be a .csv.gz)')
parser.add_argument('output_path', metavar='OUTPUT', type=str,
help='Output image. (various extensions supported)')
parser.add_argument('--offset', dest='offset_freq', default=None,
help='Shift the entire frequency range, for up/down converters.')
parser.add_argument('--ytick', dest='time_tick', default=None,
help='Place ticks along the Y axis every N seconds/minutes/hours/days.')
parser.add_argument('--db', dest='db_limit', nargs=2, default=None,
help='Minimum and maximum db values.')
parser.add_argument('--compress', dest='compress', default=0,
help='Apply a gradual asymptotic time compression. Values > 1 are the new target height, values < 1 are a scaling factor.')
slicegroup = parser.add_argument_group('Slicing',
'Efficiently render a portion of the data. (optional) Frequencies can take G/M/k suffixes. Timestamps look like "YYYY-MM-DD HH:MM:SS" Durations take d/h/m/s suffixes.')
slicegroup.add_argument('--low', dest='low_freq', default=None,
help='Minimum frequency for a subrange.')
slicegroup.add_argument('--high', dest='high_freq', default=None,
help='Maximum frequency for a subrange.')
slicegroup.add_argument('--begin', dest='begin_time', default=None,
help='Timestamp to start at.')
slicegroup.add_argument('--end', dest='end_time', default=None,
help='Timestamp to stop at.')
slicegroup.add_argument('--head', dest='head_time', default=None,
help='Duration to use, starting at the beginning.')
slicegroup.add_argument('--tail', dest='tail_time', default=None,
help='Duration to use, stopping at the end.')
parser.add_argument('--palette', dest='palette', default='default',
help='Set Color Palette: default, extended, charolastra, twente')
return parser
def frange(start, stop, step):
i = 0
while (i*step + start <= stop):
yield i*step + start
i += 1
def min_filter(row):
size = 3
result = []
for i in range(size):
here = row[i]
near = row[0:i] + row[i+1:size]
if here > min(near):
for i in range(size-1, len(row)):
here = row[i]
near = row[i-(size-1):i]
if here > min(near):
return result
def floatify(zs):
# nix errors with -inf, windows errors with -1.#J
zs2 = []
previous = 0 # awkward for single-column rows
for z in zs:
z = float(z)
except ValueError:
z = previous
if math.isinf(z):
z = previous
if math.isnan(z):
z = previous
previous = z
return zs2
def freq_parse(s):
suffix = 1
if s.lower().endswith('k'):
suffix = 1e3
if s.lower().endswith('m'):
suffix = 1e6
if s.lower().endswith('g'):
suffix = 1e9
if suffix != 1:
s = s[:-1]
return float(s) * suffix
def duration_parse(s):
suffix = 1
if s.lower().endswith('s'):
suffix = 1
if s.lower().endswith('m'):
suffix = 60
if s.lower().endswith('h'):
suffix = 60 * 60
if s.lower().endswith('d'):
suffix = 24 * 60 * 60
if suffix != 1 or s.lower().endswith('s'):
s = s[:-1]
return float(s) * suffix
def date_parse(s):
if '-' not in s:
return datetime.datetime.fromtimestamp(int(s))
return datetime.datetime.strptime(s, '%Y-%m-%d %H:%M:%S')
def palette_parse(s):
palettes = {'default': default_palette,
'extended': extended_palette,
'charolastra': charolastra_palette,
'twente': twente_palette,
if s not in palettes:
print('WARNING: %s not a valid palette' % s)
return palettes.get(s, default_palette)
def gzip_wrap(path):
"hides silly CRC errors"
iterator =, 'rb')
running = True
while running:
line = next(iterator)
if type(line) == bytes:
line = line.decode('utf-8')
yield line
except IOError:
running = False
def time_compression(y, decay):
return int(round((1/decay)*math.exp(y*decay) - 1/decay))
def reparse(args, label, fn):
if args.__getattribute__(label) is None:
args.__setattr__(label, fn(args.__getattribute__(label)))
def prepare_args():
# hack,
for i, arg in enumerate(sys.argv):
if (arg[0] == '-') and arg[1].isdigit():
sys.argv[i] = ' ' + arg
parser = build_parser()
args = parser.parse_args()
reparse(args, 'low_freq', freq_parse)
reparse(args, 'high_freq', freq_parse)
reparse(args, 'offset_freq', freq_parse)
if args.offset_freq is None:
args.offset_freq = 0
reparse(args, 'time_tick', duration_parse)
reparse(args, 'begin_time', date_parse)
reparse(args, 'end_time', date_parse)
reparse(args, 'head_time', duration_parse)
reparse(args, 'tail_time', duration_parse)
reparse(args, 'palette', palette_parse)
reparse(args, 'head_time', lambda s: datetime.timedelta(seconds=s))
reparse(args, 'tail_time', lambda s: datetime.timedelta(seconds=s))
args.compress = float(args.compress)
if args.db_limit:
a,b = args.db_limit
args.db_limit = (float(a), float(b))
if args.begin_time and args.tail_time:
print("Can't combine --begin and --tail")
if args.end_time and args.head_time:
print("Can't combine --end and --head")
if args.head_time and args.tail_time:
print("Can't combine --head and --tail")
return args
def open_raw_data(path):
raw_data = lambda: open(path)
if path.endswith('.gz'):
raw_data = lambda: gzip_wrap(path)
return raw_data
def slice_columns(columns, low_freq, high_freq):
start_col = 0
stop_col = len(columns)
if low_freq is not None and low <= low_freq <= high:
start_col = sum(f<low_freq for f in columns)
if high_freq is not None and low <= high_freq <= high:
stop_col = sum(f<=high_freq for f in columns)
return start_col, stop_col-1
def summarize_pass(args):
"pumps a bunch of data back into the args construct"
freqs = set()
f_cache = set()
times = set()
labels = set()
min_z = 0
max_z = -100
start, stop = None, None
for line in raw_data():
line = [s.strip() for s in line.strip().split(',')]
#line = [line[0], line[1]] + [float(s) for s in line[2:] if s]
line = [s for s in line if s]
low = int(line[2]) + args.offset_freq
high = int(line[3]) + args.offset_freq
step = float(line[4])
t = line[0] + ' ' + line[1]
if '-' not in line[0]:
t = line[0]
if args.low_freq is not None and high < args.low_freq:
if args.high_freq is not None and args.high_freq < low:
if args.begin_time is not None and date_parse(t) < args.begin_time:
if args.end_time is not None and date_parse(t) > args.end_time:
columns = list(frange(low, high, step))
start_col, stop_col = slice_columns(columns, args.low_freq, args.high_freq)
f_key = (columns[start_col], columns[stop_col], step)
zs = line[6+start_col:6+stop_col+1]
if not zs:
if f_key not in f_cache:
freq2 = list(frange(*f_key))[:len(zs)]
#freqs.add(f_key[1]) # high
#labels.add(f_key[0]) # low
if not args.db_limit:
zs = floatify(zs)
min_z = min(min_z, min(zs))
max_z = max(max_z, max(zs))
if start is None:
start = date_parse(t)
stop = date_parse(t)
if args.head_time is not None and args.end_time is None:
args.end_time = start + args.head_time
if not args.db_limit:
args.db_limit = (min_z, max_z)
if args.tail_time is not None:
times = [t for t in times if date_parse(t) >= (stop - args.tail_time)]
start = date_parse(min(times))
freqs = list(sorted(list(freqs)))
times = list(sorted(list(times)))
labels = list(sorted(list(labels)))
if len(labels) == 1:
delta = (max(freqs) - min(freqs)) / (len(freqs) / 500.0)
delta = round(delta / 10**int(math.log10(delta))) * 10**int(math.log10(delta))
delta = int(delta)
lower = int(math.ceil(min(freqs) / delta) * delta)
labels = list(range(lower, int(max(freqs)), delta))
height = len(times)
pix_height = height
if args.compress:
if args.compress > height:
args.compress = 0
print("Image too short, disabling time compression")
if 0 < args.compress < 1:
args.compress *= height
if args.compress:
args.compress = -1 / args.compress
pix_height = time_compression(height, args.compress)
print("x: %i, y: %i, z: (%f, %f)" % (len(freqs), pix_height, args.db_limit[0], args.db_limit[1]))
args.freqs = freqs
args.times = times
args.labels = labels
args.pix_height = pix_height
args.start_stop = (start, stop)
args.pixel_bandwidth = step
def default_palette():
return [(i, i, 50) for i in range(256)]
def extended_palette():
p = [(0,0,50)]
for i in range(1, 256):
p.append((i, i-1, 50))
p.append((i-1, i, 50))
p.append((i, i, 50))
return p
def charolastra_palette():
p = []
for i in range(1024):
g = i / 1023.0
c = colorsys.hsv_to_rgb(0.65-(g-0.08), 1, 0.2+g)
p.append((int(c[0]*256), int(c[1]*256), int(c[2]*256)))
return p
def twente_palette():
p = []
for i in range(20, 100, 2):
p.append((0, 0, i))
for i in range(256):
g = i / 255.0
p.append((int(g*255), 0, int(g*155)+100))
for i in range(256):
p.append((255, i, 255))
# intentionally blow out the highs
for i in range(100):
p.append((255, 255, 255))
return p
def rgb_fn(palette, min_z, max_z):
"palette is a list of tuples, returns a function of z"
def rgb_inner(z):
tone = (z - min_z) / (max_z - min_z)
tone_scaled = int(tone * (len(palette)-1))
return palette[tone_scaled]
return rgb_inner
def collate_row(x_size):
# this is more fragile than the old code
# sensitive to timestamps that are out of order
old_t = None
row = [0.0] * x_size
for line in raw_data():
line = [s.strip() for s in line.strip().split(',')]
#line = [line[0], line[1]] + [float(s) for s in line[2:] if s]
line = [s for s in line if s]
t = line[0] + ' ' + line[1]
if '-' not in line[0]:
t = line[0]
if t not in args.times:
continue # happens with live files and time cropping
if old_t is None:
old_t = t
low = int(line[2]) + args.offset_freq
high = int(line[3]) + args.offset_freq
step = float(line[4])
columns = list(frange(low, high, step))
start_col, stop_col = slice_columns(columns, args.low_freq, args.high_freq)
if args.low_freq and columns[stop_col] < args.low_freq:
if args.high_freq and columns[start_col] > args.high_freq:
start_freq = columns[start_col]
if args.low_freq:
start_freq = max(args.low_freq, start_freq)
# sometimes fails? skip or abort?
x_start = args.freqs.index(start_freq)
zs = floatify(line[6+start_col:6+stop_col+1])
if t != old_t:
yield old_t, row
row = [0.0] * x_size
old_t = t
for i in range(len(zs)):
x = x_start + i
if x >= x_size:
row[x] = zs[i]
yield old_t, row
def push_pixels(args):
"returns PIL img"
width = len(args.freqs)
rgb = rgb_fn(args.palette(), args.db_limit[0], args.db_limit[1])
img ="RGB", (width, tape_height + args.pix_height))
pix = img.load()
x_size = img.size[0]
average = [0.0] * width
tally = 0
old_y = None
for t, zs in collate_row(x_size):
y = args.times.index(t)
if not args.compress:
for x in range(len(zs)):
pix[x,y+tape_height] = rgb(zs[x])
# ugh
y = args.pix_height - time_compression(height - y, args.compress)
if old_y is None:
old_y = y
if old_y != y:
for x in range(len(average)):
pix[x,old_y+tape_height] = rgb(average[x]/tally)
tally = 0
average = [0.0] * width
old_y = y
for x in range(len(zs)):
average[x] += zs[x]
tally += 1
return img
def closest_index(n, m_list, interpolate=False):
"assumes sorted m_list, returns two points for interpolate"
i = len(m_list) // 2
jump = len(m_list) // 2
while jump > 1:
i_down = i - jump
i_here = i
i_up = i + jump
if i_down < 0:
i_down = i
if i_up >= len(m_list):
i_up = i
e_down = abs(m_list[i_down] - n)
e_here = abs(m_list[i_here] - n)
e_up = abs(m_list[i_up] - n)
e_best = min([e_down, e_here, e_up])
if e_down == e_best:
i = i_down
if e_up == e_best:
i = i_up
if e_here == e_best:
i = i_here
jump = jump // 2
if not interpolate:
return i
if n < m_list[i] and i > 0:
return i-1, i
if n > m_list[i] and i < len(m_list)-1:
return i, i+1
return i, i
def word_aa(label, pt, fg_color, bg_color):
f = ImageFont.truetype(vera_path, pt*3)
s = f.getsize(label)
s = (s[0], pt*3 + 3) # getsize lies, manually compute
w_img ="RGB", s, bg_color)
w_draw = ImageDraw.Draw(w_img)
w_draw.text((0, 0), label, font=f, fill=fg_color)
return w_img.resize((s[0]//3, s[1]//3), Image.ANTIALIAS)
def blend(percent, c1, c2):
"c1 and c2 are RGB tuples"
# probably isn't gamma correct
r = c1[0] * percent + c2[0] * (1 - percent)
g = c1[1] * percent + c2[1] * (1 - percent)
b = c1[2] * percent + c2[2] * (1 - percent)
c3 = map(int, map(round, [r,g,b]))
return tuple(c3)
def tape_lines(draw, freqs, interval, y1, y2, used=set()):
min_f = min(freqs)
max_f = max(freqs)
"returns the number of lines"
low_f = (min_f // interval) * interval
high_f = (1 + max_f // interval) * interval
hits = 0
blur = lambda p: blend(p, (255, 255, 0), (0, 0, 0))
for i in range(int(low_f), int(high_f), int(interval)):
if not (min_f < i < max_f):
hits += 1
if i in used:
x1,x2 = closest_index(i, args.freqs, interpolate=True)
if x1 == x2:
draw.line([x1,y1,x1,y2], fill='black')
percent = (i - args.freqs[x1]) / float(args.freqs[x2] - args.freqs[x1])
draw.line([x1,y1,x1,y2], fill=blur(percent))
draw.line([x2,y1,x2,y2], fill=blur(1-percent))
return hits
def tape_text(img, freqs, interval, y, used=set()):
min_f = min(freqs)
max_f = max(freqs)
low_f = (min_f // interval) * interval
high_f = (1 + max_f // interval) * interval
for i in range(int(low_f), int(high_f), int(interval)):
if i in used:
if not (min_f < i < max_f):
x = closest_index(i, freqs)
s = str(i)
if interval >= 1e6:
s = '%iM' % (i/1e6)
elif interval > 1000:
s = '%ik' % ((i/1e3) % 1000)
if s.startswith('0'):
s = '%iM' % (i/1e6)
s = '%i' % (i%1000)
if s.startswith('0'):
s = '%ik' % ((i/1e3) % 1000)
if s.startswith('0'):
s = '%iM' % (i/1e6)
w = word_aa(s, tape_pt, 'black', 'yellow')
img.paste(w, (x - w.size[0]//2, y))
def shadow_text(draw, x, y, s, font, fg_color='white', bg_color='black'):
draw.text((x+1, y+1), s, font=font, fill=bg_color)
draw.text((x, y), s, font=font, fill=fg_color)
def create_labels(args, img):
draw = ImageDraw.Draw(img)
font = ImageFont.load_default()
pixel_bandwidth = args.pixel_bandwidth
draw.rectangle([0,0,img.size[0],tape_height], fill='yellow')
min_freq = min(args.freqs)
max_freq = max(args.freqs)
delta = max_freq - min_freq
width = len(args.freqs)
label_base = 9
for i in range(label_base, 0, -1):
interval = int(10**i)
low_f = (min_freq // interval) * interval
high_f = (1 + max_freq // interval) * interval
hits = len(range(int(low_f), int(high_f), interval))
if hits >= 4:
label_base = i
label_base = 10**label_base
for scale,y in [(1,10), (5,15), (10,19), (50,22), (100,24), (500, 25)]:
hits = tape_lines(draw, args.freqs, label_base/scale, y, tape_height)
pixels_per_hit = width / hits
if pixels_per_hit > 50:
tape_text(img, args.freqs, label_base/scale, y-tape_pt)
if pixels_per_hit < 10:
if args.time_tick:
label_format = "%H:%M:%S"
if args.time_tick % (60*60*24) == 0:
label_format = "%Y-%m-%d"
elif args.time_tick % 60 == 0:
label_format = "%H:%M"
label_next = datetime.datetime(start.year, start.month,, start.hour)
tick_delta = datetime.timedelta(seconds = args.time_tick)
while label_next < start:
label_next += tick_delta
last_y = -100
for y,t in enumerate(args.times):
label_time = date_parse(t)
if label_time < label_next:
if args.compress:
y = args.full_height - time_compression(height - y, args.compress)
if y - last_y > 15:
shadow_text(draw, 2, y+tape_height, label_next.strftime(label_format), font)
last_y = y
label_next += tick_delta
start, stop = args.start_stop
duration = stop - start
duration = duration.days * 24*60*60 + duration.seconds + 30
pixel_height = duration / len(args.times)
hours = int(duration / 3600)
minutes = int((duration - 3600*hours) / 60)
margin = 2
if args.time_tick:
margin = 60
shadow_text(draw, margin, img.size[1] - 45, 'Duration: %i:%02i' % (hours, minutes), font)
shadow_text(draw, margin, img.size[1] - 35, 'Range: %.2fMHz - %.2fMHz' % (min_freq/1e6, (max_freq+pixel_bandwidth)/1e6), font)
shadow_text(draw, margin, img.size[1] - 25, 'Pixel: %.2fHz x %is' % (pixel_bandwidth, int(round(pixel_height))), font)
shadow_text(draw, margin, img.size[1] - 15, 'Started: {0}'.format(start), font)
# bin size
args = prepare_args()
raw_data = open_raw_data(args.input_path)
img = push_pixels(args)
create_labels(args, img)