horusdemodlib/src/run_fer_test.py

460 wiersze
11 KiB
Python

#!/usr/bin/env python
#
# Perform automated Eb/N0 testing of the C-implementation of fsk_mod / fsk_demod
#
# Based on the analysis performed here: https://github.com/projecthorus/radiosonde_auto_rx/blob/master/auto_rx/test/notes/2019-03-03_generate_lowsnr_validation.md
#
# Copyright (C) 2020 Mark Jessop <vk5qi@rfhead.net>
# Released under GNU GPL v3 or later
#
# Requirements:
# - csdr must be installed and available on the path. https://github.com/ha7ilm/csdr
# - The following utilities from codec2 need to be built:
# - fsk_get_test_bits, fsk_put_test_bits
# - fsk_mod, fsk_demod
# - Create the directories: 'samples' and 'generated' in this directory (octave)
#
import json
import logging
import os
import time
import traceback
import subprocess
import sys
import argparse
import numpy as np
import matplotlib.pyplot as plt
import scipy.signal
import scipy.interpolate
# Variables you will want to adjust:
# Eb/N0 Range to test:
# Default: 0 through 5 dB in 0.5 db steps, then up to 20 db in 1db steps.
EBNO_RANGE = np.append(np.arange(0, 5, 0.5), np.arange(5, 20.5, 1))
# Modes to test.
MODES = ['binary']
# Baud rates to test:
BAUD_RATE = 100
# Test Length (frames)
TEST_LENGTH = 1000
# Allow the loss of N frames, at the start or end of the recording.
FRAME_IGNORE = 1
# IF sample rate
SAMPLE_RATE = 48000
# Frequency estimator limits
ESTIMATOR_LOWER_LIMIT = 100
ESTIMATOR_UPPER_LIMIT = int(SAMPLE_RATE/2 - 1000)
# Frequency of the low tone (Hz)
LOW_TONE = 1000
# Tone spacing (Hz)
TONE_SPACING = 270
# Mask Estimator
MASK_ESTIMATOR = False
# Halt simulation for a particular baud rate when the FER drops below this level.
FER_BREAK_POINT = 0.01
STATS_OUTPUT = True
# Where to place the initial test samples.
SAMPLE_DIR = "./samples"
# Where to place the generated low-SNR samples.
GENERATED_DIR = "./generated"
# Location of the horus utils
HORUS_UTILS = "../build/src"
# Definitions of Horus modes.
MODE_TYPES = {
'binary': {
'id': 0,
'nfsk': 4,
'bits_per_frame': 22*8 # UNCODED bits per frame.
},
'128bit': {
'id': 1,
'nfsk': 2, # Convert back to 4FSK once 4FSK SD/LLRs are working.
'bits_per_frame': 128
},
'256bit': {
'id': 1,
'nfsk': 2, # Convert back to 4FSK once 4FSK SD/LLRs are working.
'bits_per_frame': 256
}
}
THEORY_EBNO = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
THEORY_BER_4 = [
0.22934,
0.18475,
0.13987,
0.09772,
0.06156,
0.03395,
0.01579,
0.00591,
0.00168,
3.39e-4,
4.44e-5,
# 3.38e-6,
# 1.30e-7,
# 2.16e-9,
# 1.23e-11,
# 1.85e-14,
# 5.13e-18,
# 1.71e-22
]
THEORY_BER_2 = [
0.30327,
0.26644,
0.22637,
0.18438,
0.14240,
0.10287,
0.06831,
0.04080,
0.02132,
0.00942,
0.00337,
]
#
# Functions to read files and add noise.
#
def load_sample(filename, loadreal=True):
# If loading real samples (which is what fsk_mod outputs), apply a hilbert transform to get an analytic signal.
if loadreal:
return scipy.signal.hilbert(np.fromfile(filename, dtype="f4"))
else:
return np.fromfile(filename, dtype="c8")
def save_sample(data, filename):
# We have to make sure to convert to complex64..
data.astype(dtype="c8").tofile(filename)
# TODO: Allow saving as complex s16 - see view solution here: https://stackoverflow.com/questions/47086134/how-to-convert-a-numpy-complex-array-to-a-two-element-float-array
def calculate_variance(data, threshold=-100.0):
# Calculate the variance of a set of radiosonde samples.
# Optionally use a threshold to limit the sample the variance
# is calculated over to ones that actually have sonde packets in them.
_data_log = 20 * np.log10(np.abs(data))
# MSE is better than variance as a power estimate, as it counts DC
data_thresh = data[_data_log > threshold]
return np.mean(data_thresh * np.conj(data_thresh))
def add_noise(
data,
variance,
baud_rate,
ebno,
fs=96000,
bitspersymbol=1.0,
normalise=True,
real=False,
):
# Add calibrated noise to a sample.
# Calculate Eb/No in linear units.
_ebno = 10.0 ** ((ebno) / 10.0)
# Calculate the noise variance we need to add
_noise_variance = variance * fs / (baud_rate * _ebno * bitspersymbol)
# If we are working with real samples, we need to halve the noise contribution.
if real:
_noise_variance = _noise_variance * 0.5
# Generate complex random samples
np.random.seed(42)
_rand_i = np.sqrt(_noise_variance / 2.0) * np.random.randn(len(data))
_rand_q = np.sqrt(_noise_variance / 2.0) * np.random.randn(len(data))
_noisy = data + (_rand_i + 1j * _rand_q)
if normalise:
# print("Normalised to 1.0")
return _noisy / np.max(np.abs(_noisy))
else:
return _noisy
def generate_lowsnr(sample, outfile, fs, baud, ebno, order):
""" Generate a low SNR test file """
if order == 2:
_bits_per_symbol = 1
else:
_bits_per_symbol = 2
_var = calculate_variance(sample)
_noisy = add_noise(sample, _var, baud, ebno, fs, _bits_per_symbol)
save_sample(_noisy, outfile)
return outfile
#
# Functions to deal with horuslib utils
#
def generate_packets(mode):
""" Generate a set of FSK data """
_filename = f"{SAMPLE_DIR}/horus_{mode}_{SAMPLE_RATE}_{BAUD_RATE}_f.bin"
_mode_id = MODE_TYPES[mode]['id']
_order = MODE_TYPES[mode]['nfsk']
# Generate the command we need to make:
_cmd = f"{HORUS_UTILS}/horus_gen_test_bits {_mode_id} {TEST_LENGTH} | "\
f"{HORUS_UTILS}/fsk_mod {_order} {SAMPLE_RATE} {BAUD_RATE} {LOW_TONE} {TONE_SPACING} - - |"\
f"csdr convert_s16_f > {_filename}"
print(_cmd)
print(f"Generating test signal: {mode}, {BAUD_RATE} baud.")
# Run the command.
try:
_start = time.time()
_output = subprocess.check_output(_cmd, shell=True, stderr=None)
_output = _output.decode()
except:
# traceback.print_exc()
_output = "error"
_runtime = time.time() - _start
print("Finished generating test signal.")
return _filename
def process_packets(
filename,mode, complex_samples=True, override_frames=None, stats=False, statsfile=""
):
""" Run a generated file through horus_demod """
_estim_limits = "-b %d -u %d " % (ESTIMATOR_LOWER_LIMIT, ESTIMATOR_UPPER_LIMIT)
if MASK_ESTIMATOR:
_mask = "--tonespacing=%d " % TONE_SPACING
else:
_mask = ""
if complex_samples:
_cpx = "-q "
else:
_cpx = ""
if stats:
_stats_file = GENERATED_DIR + "/" + statsfile + ".stats"
_stats = "--stats=50 "
else:
_stats = ""
_stats_file = None
_cmd = f"cat {filename} | csdr convert_f_s16 |"\
f"{HORUS_UTILS}/horus_demod {_mask}{_cpx}{_stats}--rate={BAUD_RATE} -c -m {mode} - - "\
if stats:
_cmd += "2> %s" % _stats_file
_cmd += f"| grep OK | wc -l"
# print("Processing %s" % filename)
print(_cmd)
# Run the command.
try:
_start = time.time()
_output = subprocess.check_output(_cmd, shell=True)
_output = _output.decode()
except subprocess.CalledProcessError as e:
_output = e.output.decode()
except:
traceback.print_exc()
_output = "error"
print("Run failed!")
return (-1, _stats_file)
_runtime = time.time() - _start
# Try to grab last line of the stderr outout
try:
_packets = int(_output.strip())
if _packets > _override_frames:
_fer = 0.0
else:
_fer = 1 - (_packets/_override_frames)
except Exception as e:
print("Error parsing output: %s" % str(e))
_fer = 1.0
return (_fer,_stats_file)
def read_stats(filename, sps = 50):
""" Read in a statistics file, and re-organise it for easier calculations """
_output = {
'ebno': [],
'f1_est': [],
'f2_est': [],
'f3_est': [],
'f4_est': [],
'ppm': [],
'time': []
}
with open(filename, 'r') as _f:
for _line in _f:
if _line[0] != '{':
continue
try:
_data = json.loads(_line)
except Exception as e:
#print("Line parsing error: %s" % str(e))
continue
_output['ebno'].append(_data['EbNodB'])
_output['f1_est'].append(_data['f1_est'])
_output['f2_est'].append(_data['f2_est'])
if 'f3_est' in _data:
_output['f3_est'].append(_data['f3_est'])
_output['f4_est'].append(_data['f4_est'])
_output['ppm'].append(_data['ppm'])
if _output['time'] == []:
_output['time'] = [0]
else:
_output['time'].append(_output['time'][-1]+1.0/sps)
return _output
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Horus modem FER simulations")
parser.add_argument("--test", action="store_true", help="run automated test")
args = parser.parse_args()
plot_data = {}
for mode in MODES:
_file = generate_packets(mode)
print("Loading file and converting to complex.")
_sample = load_sample(_file)
_override_frames = TEST_LENGTH - FRAME_IGNORE
_temp_file = "%s/temp.bin" % GENERATED_DIR
_ebnos = []
_fers = []
_ber_ests = []
_fest_err = []
for _ebno in EBNO_RANGE:
generate_lowsnr(_sample, _temp_file, SAMPLE_RATE, BAUD_RATE, _ebno, MODE_TYPES[mode]['nfsk'])
_fer, _stats_file = process_packets(
_temp_file,
mode,
override_frames=_override_frames,
stats=STATS_OUTPUT,
statsfile="fsk_%s_%.1f" % (mode, _ebno),
)
print("%.1f, %.8f" % (_ebno, _fer))
_ebnos.append(_ebno)
_fers.append(_fer)
# Calculate an estimate of the bit-error rate.
_ber_est = 1 - (1-_fer)**(1/MODE_TYPES[mode]['bits_per_frame'])
_ber_ests.append(_ber_est)
# Halt the simulation if the BER drops below our break point.
if _fer < FER_BREAK_POINT:
break
plot_data[mode] = {"mode":mode, "ebno": _ebnos, "fer": _fers, "ber_est": _ber_ests, "fest_err":_fest_err}
plt.figure()
print(plot_data)
for _b in plot_data:
plt.plot(
plot_data[_b]["ebno"], plot_data[_b]["fer"], label="Simulated - Mode %s" % _b
)
# Plot FER
plt.xlabel("Eb/N0 (dB)")
plt.ylabel("FER")
# Crop plot to reasonable limits
plt.ylim(0, 1)
plt.xlim(0, 15)
plt.title("horus_demod FER Performance")
plt.grid()
plt.legend()
# Plot BER Estimate, based on frame size.
plt.figure()
for _b in plot_data:
plt.semilogy(plot_data[_b]["ebno"], plot_data[_b]["ber_est"], label="Simulated - Mode %s" % _b)
if MODE_TYPES[mode]['nfsk'] == 2:
plt.semilogy(THEORY_EBNO, THEORY_BER_2, label="Theory")
else:
plt.semilogy(THEORY_EBNO, THEORY_BER_4, label="Theory")
# Crop plot to reasonable limits
plt.ylim(1e-5, 1)
plt.xlim(0, 15)
plt.title("horus_demod Estimated BER Performance")
plt.grid()
plt.legend()
plt.show()