kopia lustrzana https://github.com/projecthorus/horusdemodlib
460 wiersze
11 KiB
Python
460 wiersze
11 KiB
Python
#!/usr/bin/env python
|
|
#
|
|
# Perform automated Eb/N0 testing of the C-implementation of fsk_mod / fsk_demod
|
|
#
|
|
# Based on the analysis performed here: https://github.com/projecthorus/radiosonde_auto_rx/blob/master/auto_rx/test/notes/2019-03-03_generate_lowsnr_validation.md
|
|
#
|
|
# Copyright (C) 2020 Mark Jessop <vk5qi@rfhead.net>
|
|
# Released under GNU GPL v3 or later
|
|
#
|
|
# Requirements:
|
|
# - csdr must be installed and available on the path. https://github.com/ha7ilm/csdr
|
|
# - The following utilities from codec2 need to be built:
|
|
# - fsk_get_test_bits, fsk_put_test_bits
|
|
# - fsk_mod, fsk_demod
|
|
# - Create the directories: 'samples' and 'generated' in this directory (octave)
|
|
#
|
|
import json
|
|
import logging
|
|
import os
|
|
import time
|
|
import traceback
|
|
import subprocess
|
|
import sys
|
|
import argparse
|
|
|
|
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
import scipy.signal
|
|
import scipy.interpolate
|
|
|
|
|
|
# Variables you will want to adjust:
|
|
|
|
# Eb/N0 Range to test:
|
|
# Default: 0 through 5 dB in 0.5 db steps, then up to 20 db in 1db steps.
|
|
EBNO_RANGE = np.append(np.arange(0, 5, 0.5), np.arange(5, 20.5, 1))
|
|
|
|
# Modes to test.
|
|
MODES = ['binary']
|
|
|
|
# Baud rates to test:
|
|
BAUD_RATE = 100
|
|
|
|
# Test Length (frames)
|
|
TEST_LENGTH = 1000
|
|
|
|
# Allow the loss of N frames, at the start or end of the recording.
|
|
FRAME_IGNORE = 1
|
|
|
|
# IF sample rate
|
|
SAMPLE_RATE = 48000
|
|
|
|
# Frequency estimator limits
|
|
ESTIMATOR_LOWER_LIMIT = 100
|
|
ESTIMATOR_UPPER_LIMIT = int(SAMPLE_RATE/2 - 1000)
|
|
|
|
# Frequency of the low tone (Hz)
|
|
LOW_TONE = 1000
|
|
|
|
# Tone spacing (Hz)
|
|
TONE_SPACING = 270
|
|
|
|
# Mask Estimator
|
|
MASK_ESTIMATOR = False
|
|
|
|
# Halt simulation for a particular baud rate when the FER drops below this level.
|
|
FER_BREAK_POINT = 0.01
|
|
|
|
STATS_OUTPUT = True
|
|
|
|
# Where to place the initial test samples.
|
|
SAMPLE_DIR = "./samples"
|
|
|
|
# Where to place the generated low-SNR samples.
|
|
GENERATED_DIR = "./generated"
|
|
|
|
# Location of the horus utils
|
|
HORUS_UTILS = "../build/src"
|
|
|
|
# Definitions of Horus modes.
|
|
MODE_TYPES = {
|
|
'binary': {
|
|
'id': 0,
|
|
'nfsk': 4,
|
|
'bits_per_frame': 22*8 # UNCODED bits per frame.
|
|
},
|
|
'128bit': {
|
|
'id': 1,
|
|
'nfsk': 2, # Convert back to 4FSK once 4FSK SD/LLRs are working.
|
|
'bits_per_frame': 128
|
|
},
|
|
'256bit': {
|
|
'id': 1,
|
|
'nfsk': 2, # Convert back to 4FSK once 4FSK SD/LLRs are working.
|
|
'bits_per_frame': 256
|
|
}
|
|
}
|
|
|
|
|
|
THEORY_EBNO = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
|
|
THEORY_BER_4 = [
|
|
0.22934,
|
|
0.18475,
|
|
0.13987,
|
|
0.09772,
|
|
0.06156,
|
|
0.03395,
|
|
0.01579,
|
|
0.00591,
|
|
0.00168,
|
|
3.39e-4,
|
|
4.44e-5,
|
|
# 3.38e-6,
|
|
# 1.30e-7,
|
|
# 2.16e-9,
|
|
# 1.23e-11,
|
|
# 1.85e-14,
|
|
# 5.13e-18,
|
|
# 1.71e-22
|
|
]
|
|
THEORY_BER_2 = [
|
|
0.30327,
|
|
0.26644,
|
|
0.22637,
|
|
0.18438,
|
|
0.14240,
|
|
0.10287,
|
|
0.06831,
|
|
0.04080,
|
|
0.02132,
|
|
0.00942,
|
|
0.00337,
|
|
]
|
|
|
|
#
|
|
# Functions to read files and add noise.
|
|
#
|
|
|
|
|
|
def load_sample(filename, loadreal=True):
|
|
# If loading real samples (which is what fsk_mod outputs), apply a hilbert transform to get an analytic signal.
|
|
if loadreal:
|
|
return scipy.signal.hilbert(np.fromfile(filename, dtype="f4"))
|
|
else:
|
|
return np.fromfile(filename, dtype="c8")
|
|
|
|
|
|
def save_sample(data, filename):
|
|
# We have to make sure to convert to complex64..
|
|
data.astype(dtype="c8").tofile(filename)
|
|
|
|
# TODO: Allow saving as complex s16 - see view solution here: https://stackoverflow.com/questions/47086134/how-to-convert-a-numpy-complex-array-to-a-two-element-float-array
|
|
|
|
|
|
def calculate_variance(data, threshold=-100.0):
|
|
# Calculate the variance of a set of radiosonde samples.
|
|
# Optionally use a threshold to limit the sample the variance
|
|
# is calculated over to ones that actually have sonde packets in them.
|
|
|
|
_data_log = 20 * np.log10(np.abs(data))
|
|
|
|
# MSE is better than variance as a power estimate, as it counts DC
|
|
data_thresh = data[_data_log > threshold]
|
|
return np.mean(data_thresh * np.conj(data_thresh))
|
|
|
|
|
|
def add_noise(
|
|
data,
|
|
variance,
|
|
baud_rate,
|
|
ebno,
|
|
fs=96000,
|
|
bitspersymbol=1.0,
|
|
normalise=True,
|
|
real=False,
|
|
):
|
|
# Add calibrated noise to a sample.
|
|
|
|
# Calculate Eb/No in linear units.
|
|
_ebno = 10.0 ** ((ebno) / 10.0)
|
|
|
|
# Calculate the noise variance we need to add
|
|
_noise_variance = variance * fs / (baud_rate * _ebno * bitspersymbol)
|
|
|
|
# If we are working with real samples, we need to halve the noise contribution.
|
|
if real:
|
|
_noise_variance = _noise_variance * 0.5
|
|
|
|
# Generate complex random samples
|
|
np.random.seed(42)
|
|
_rand_i = np.sqrt(_noise_variance / 2.0) * np.random.randn(len(data))
|
|
_rand_q = np.sqrt(_noise_variance / 2.0) * np.random.randn(len(data))
|
|
|
|
_noisy = data + (_rand_i + 1j * _rand_q)
|
|
|
|
if normalise:
|
|
# print("Normalised to 1.0")
|
|
return _noisy / np.max(np.abs(_noisy))
|
|
else:
|
|
return _noisy
|
|
|
|
|
|
def generate_lowsnr(sample, outfile, fs, baud, ebno, order):
|
|
""" Generate a low SNR test file """
|
|
|
|
if order == 2:
|
|
_bits_per_symbol = 1
|
|
else:
|
|
_bits_per_symbol = 2
|
|
|
|
_var = calculate_variance(sample)
|
|
|
|
_noisy = add_noise(sample, _var, baud, ebno, fs, _bits_per_symbol)
|
|
|
|
save_sample(_noisy, outfile)
|
|
|
|
return outfile
|
|
|
|
|
|
#
|
|
# Functions to deal with horuslib utils
|
|
#
|
|
|
|
|
|
def generate_packets(mode):
|
|
""" Generate a set of FSK data """
|
|
|
|
|
|
_filename = f"{SAMPLE_DIR}/horus_{mode}_{SAMPLE_RATE}_{BAUD_RATE}_f.bin"
|
|
|
|
_mode_id = MODE_TYPES[mode]['id']
|
|
_order = MODE_TYPES[mode]['nfsk']
|
|
|
|
# Generate the command we need to make:
|
|
|
|
_cmd = f"{HORUS_UTILS}/horus_gen_test_bits {_mode_id} {TEST_LENGTH} | "\
|
|
f"{HORUS_UTILS}/fsk_mod {_order} {SAMPLE_RATE} {BAUD_RATE} {LOW_TONE} {TONE_SPACING} - - |"\
|
|
f"csdr convert_s16_f > {_filename}"
|
|
|
|
print(_cmd)
|
|
|
|
print(f"Generating test signal: {mode}, {BAUD_RATE} baud.")
|
|
|
|
# Run the command.
|
|
try:
|
|
_start = time.time()
|
|
_output = subprocess.check_output(_cmd, shell=True, stderr=None)
|
|
_output = _output.decode()
|
|
except:
|
|
# traceback.print_exc()
|
|
_output = "error"
|
|
|
|
_runtime = time.time() - _start
|
|
|
|
print("Finished generating test signal.")
|
|
|
|
return _filename
|
|
|
|
|
|
def process_packets(
|
|
filename,mode, complex_samples=True, override_frames=None, stats=False, statsfile=""
|
|
):
|
|
""" Run a generated file through horus_demod """
|
|
|
|
_estim_limits = "-b %d -u %d " % (ESTIMATOR_LOWER_LIMIT, ESTIMATOR_UPPER_LIMIT)
|
|
|
|
if MASK_ESTIMATOR:
|
|
_mask = "--tonespacing=%d " % TONE_SPACING
|
|
else:
|
|
_mask = ""
|
|
|
|
if complex_samples:
|
|
_cpx = "-q "
|
|
else:
|
|
_cpx = ""
|
|
|
|
if stats:
|
|
_stats_file = GENERATED_DIR + "/" + statsfile + ".stats"
|
|
_stats = "--stats=50 "
|
|
else:
|
|
_stats = ""
|
|
_stats_file = None
|
|
|
|
|
|
_cmd = f"cat {filename} | csdr convert_f_s16 |"\
|
|
f"{HORUS_UTILS}/horus_demod {_mask}{_cpx}{_stats}--rate={BAUD_RATE} -c -m {mode} - - "\
|
|
|
|
|
|
if stats:
|
|
_cmd += "2> %s" % _stats_file
|
|
|
|
_cmd += f"| grep OK | wc -l"
|
|
|
|
# print("Processing %s" % filename)
|
|
|
|
print(_cmd)
|
|
|
|
# Run the command.
|
|
try:
|
|
_start = time.time()
|
|
_output = subprocess.check_output(_cmd, shell=True)
|
|
_output = _output.decode()
|
|
except subprocess.CalledProcessError as e:
|
|
_output = e.output.decode()
|
|
except:
|
|
traceback.print_exc()
|
|
_output = "error"
|
|
print("Run failed!")
|
|
return (-1, _stats_file)
|
|
|
|
_runtime = time.time() - _start
|
|
|
|
# Try to grab last line of the stderr outout
|
|
|
|
try:
|
|
_packets = int(_output.strip())
|
|
|
|
if _packets > _override_frames:
|
|
_fer = 0.0
|
|
else:
|
|
_fer = 1 - (_packets/_override_frames)
|
|
except Exception as e:
|
|
print("Error parsing output: %s" % str(e))
|
|
_fer = 1.0
|
|
|
|
return (_fer,_stats_file)
|
|
|
|
|
|
def read_stats(filename, sps = 50):
|
|
""" Read in a statistics file, and re-organise it for easier calculations """
|
|
|
|
_output = {
|
|
'ebno': [],
|
|
'f1_est': [],
|
|
'f2_est': [],
|
|
'f3_est': [],
|
|
'f4_est': [],
|
|
'ppm': [],
|
|
'time': []
|
|
}
|
|
|
|
with open(filename, 'r') as _f:
|
|
for _line in _f:
|
|
if _line[0] != '{':
|
|
continue
|
|
|
|
try:
|
|
_data = json.loads(_line)
|
|
except Exception as e:
|
|
#print("Line parsing error: %s" % str(e))
|
|
continue
|
|
|
|
_output['ebno'].append(_data['EbNodB'])
|
|
_output['f1_est'].append(_data['f1_est'])
|
|
_output['f2_est'].append(_data['f2_est'])
|
|
|
|
if 'f3_est' in _data:
|
|
_output['f3_est'].append(_data['f3_est'])
|
|
_output['f4_est'].append(_data['f4_est'])
|
|
|
|
_output['ppm'].append(_data['ppm'])
|
|
|
|
if _output['time'] == []:
|
|
_output['time'] = [0]
|
|
else:
|
|
_output['time'].append(_output['time'][-1]+1.0/sps)
|
|
|
|
return _output
|
|
|
|
if __name__ == "__main__":
|
|
|
|
parser = argparse.ArgumentParser(description="Horus modem FER simulations")
|
|
parser.add_argument("--test", action="store_true", help="run automated test")
|
|
args = parser.parse_args()
|
|
|
|
plot_data = {}
|
|
|
|
for mode in MODES:
|
|
_file = generate_packets(mode)
|
|
|
|
print("Loading file and converting to complex.")
|
|
_sample = load_sample(_file)
|
|
|
|
_override_frames = TEST_LENGTH - FRAME_IGNORE
|
|
|
|
_temp_file = "%s/temp.bin" % GENERATED_DIR
|
|
|
|
_ebnos = []
|
|
_fers = []
|
|
_ber_ests = []
|
|
_fest_err = []
|
|
|
|
for _ebno in EBNO_RANGE:
|
|
generate_lowsnr(_sample, _temp_file, SAMPLE_RATE, BAUD_RATE, _ebno, MODE_TYPES[mode]['nfsk'])
|
|
|
|
_fer, _stats_file = process_packets(
|
|
_temp_file,
|
|
mode,
|
|
override_frames=_override_frames,
|
|
stats=STATS_OUTPUT,
|
|
statsfile="fsk_%s_%.1f" % (mode, _ebno),
|
|
)
|
|
|
|
print("%.1f, %.8f" % (_ebno, _fer))
|
|
|
|
_ebnos.append(_ebno)
|
|
_fers.append(_fer)
|
|
|
|
# Calculate an estimate of the bit-error rate.
|
|
_ber_est = 1 - (1-_fer)**(1/MODE_TYPES[mode]['bits_per_frame'])
|
|
_ber_ests.append(_ber_est)
|
|
|
|
# Halt the simulation if the BER drops below our break point.
|
|
if _fer < FER_BREAK_POINT:
|
|
break
|
|
|
|
plot_data[mode] = {"mode":mode, "ebno": _ebnos, "fer": _fers, "ber_est": _ber_ests, "fest_err":_fest_err}
|
|
|
|
|
|
plt.figure()
|
|
|
|
print(plot_data)
|
|
|
|
for _b in plot_data:
|
|
plt.plot(
|
|
plot_data[_b]["ebno"], plot_data[_b]["fer"], label="Simulated - Mode %s" % _b
|
|
)
|
|
|
|
# Plot FER
|
|
|
|
plt.xlabel("Eb/N0 (dB)")
|
|
plt.ylabel("FER")
|
|
|
|
# Crop plot to reasonable limits
|
|
plt.ylim(0, 1)
|
|
plt.xlim(0, 15)
|
|
|
|
plt.title("horus_demod FER Performance")
|
|
plt.grid()
|
|
plt.legend()
|
|
|
|
# Plot BER Estimate, based on frame size.
|
|
plt.figure()
|
|
for _b in plot_data:
|
|
plt.semilogy(plot_data[_b]["ebno"], plot_data[_b]["ber_est"], label="Simulated - Mode %s" % _b)
|
|
|
|
if MODE_TYPES[mode]['nfsk'] == 2:
|
|
plt.semilogy(THEORY_EBNO, THEORY_BER_2, label="Theory")
|
|
else:
|
|
plt.semilogy(THEORY_EBNO, THEORY_BER_4, label="Theory")
|
|
|
|
# Crop plot to reasonable limits
|
|
plt.ylim(1e-5, 1)
|
|
plt.xlim(0, 15)
|
|
|
|
plt.title("horus_demod Estimated BER Performance")
|
|
plt.grid()
|
|
plt.legend()
|
|
|
|
plt.show() |