Added superscanner.py script and dependencies from v5

pull/714/head
f4exb 2020-11-12 19:46:18 +01:00
rodzic 8ccb2e80c1
commit 0d6e629436
5 zmienionych plików z 831 dodań i 2 usunięć

Wyświetl plik

@ -3,6 +3,7 @@
These scripts are designed to work in Python 3 preferably with version 3.6 or higher. Dependencies are installed with pip in a virtual environment. The sequence of operations is the following:
```
sudo apt-get install virtualenv gcc g++ gfortran python3-dev
virtualenv -p /usr/bin/python3 venv # Create virtual environment
. ./venv/bin/activate # Activate virtual environment
pip install -r requirements.txt # Install requirements
@ -188,4 +189,104 @@ If you have presets defined you may also use presets instead of having to set up
"msg": "Start device on deviceset R0"
}
]
```
```
<h2>superscanner.py</h2>
Connects to spectrum server to monitor PSD and detect local PSD hotspots to pilot channel(s). Thus channels can follow band activity. This effectively implements a "scanner" feature with parallel tracking of any number of channels. It is FFT based so can effectively track spectrum hotspots simultaneously. Therefore the "super" superlative.
It requires SDRangel version 5.6 or above. On SDRangel instance baseband spectrum should be set in log mode and the spectrum server activated with an accessible address and a port that matches the port given to `superscanner.py`. Please refer to SDRangel documentation for details.
The script runs in daemon mode and is stopped using `Ctl-C`.
<h3>Options</h3>
- `-a` or `--address` SDRangel web base address. Default: `127.0.0.1`
- `-p` or `--api-port` SDRangel API port. Default: `8091`
- `-w` or `--ws-port` SDRangel websocket spectrum server port. Default: `8887`
- `-c` or `--config-file` JSON configuration file. Mandatory. See next for format details
- `-j` or `--psd-in` JSON file containing PSD floor information previously saved with the `-J` option
- `-J` or `--psd-out` Write PSD floor information to JSON file
- `-n` or `--nb-passes` Number of passes for PSD floor estimation. Default: `10`
- `-f` or `--psd-level` Use a fixed PSD floor value therefore do not perform PSD floor estimaton
- `-X` or `--psd-exclude-higher` Level above which to exclude bin scan during PSD floor estimation
- `-x` or `--psd-exclude-lower` Level below which to exclude bin scan during PSD floor estimation
- `-G` or `--psd-graph` Show PSD floor graphs. Requires `matplotlib`
- `-N` or `--hotspots-noise` Number of hotspots above which detection is considered as noise. Default `8`
- `-m` or `--margin` Margin in dB above PSD floor to detect acivity. Default: `3`
- `-g` or `--group-tolerance` Radius (1D) tolerance in points (bins) for hotspot aggregation. Default `1`
- `-r` or `--freq-round` Frequency rounding value in Hz. Default: `1` (no rounding)
- `-o` or `--freq-offset` Frequency rounding offset in Hz. Default: `0` (no offset)
Command examples:
- `python ./superscanner.py -a 127.0.0.1 -p 8889 -w 8886 -c 446M.json -g 10 -r 12500 -o 6250 -J psd_pmr.json`
- `python ./superscanner.py -a 192.168.0.3 -j psd.json -c 145M.json -g 10 -r 2500`
<h3>Configuration file</h3>
This file drives how channels in the connected SDRangel instance are managed.
```json
{
"deviceset_index": 0, // SDRangel instance deviceset index addressed - required
"freqrange_inclusions": [
[145170000, 145900000] // List of frequency ranges in Hz to include in processing - optional
],
"freqrange_exclusions": [ // List of frequency ranges in Hz to exclude from processing - optional
[145000000, 145170000],
[145290000, 145335000],
[145800000, 146000000]
],
"channel_info": [ // List of controlled channels - required
{ // Channel information - at least one required
"index": 0, // Index of channel in deviceset - required
"fc_pos": "usb", // Center frequency position in hotspot - optional: default center
// lsb: center frequency at end of hotspot (higer frequency)
// usb: center frequency at beginning of hotspot (lower frequency)
// canter: center frequency at mid-point of hotspot (center frequency)
"fc_shift": -300 // Center frequency constant shift from computed frequency - optional
},
{
"index": 2
},
{
"index": 3
}
]
}
```
<h3>Run with supervisord</h3>
Refer to supervisord documentation.
Esample of `superscanner.conf` file to put in your `/etc//etc/supervisor/conf.d/` folder (add it in the `[incude]` section of `/etc/supervisor/supervisord.conf`). Environment variable `PYTHONUNBUFFERED=1` is important for the log tail to work correctly.
```
[program:superscanner]
command = /opt/build/sdrangel/scriptsapi/venv/bin/python /opt/build/sdrangel/scriptsapi/superscanner.py -a 192.168.0.24 -c /home/f4exb/145M_scan.config.json -g 4 -r 3125 -f -65
process_name = superscanner
user = f4exb
stopsignal = INT
autostart = false
autorestart = false
environment =
USER=f4exb,
PATH="/home/f4exb/bin:/home/f4exb/.local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games",
HOME="/home/f4exb",
PYTHONUNBUFFERED=1
stdout_logfile = /home/f4exb/log/superscanner.log
stdout_logfile_maxbytes = 10MB
stdout_logfile_backups = 3
redirect_stderr=true
```
<h2>sdrangel.py</h2>
Holds constants related to SDRangel software required by other scripts
<h2>Unit tests</h2>
Run as `python <file>` in the virtual environment
- `test_superscanner.py` is testing `superscanner.py`

Wyświetl plik

@ -1,2 +1,6 @@
requests
Flask
Flask
numpy
websockets
websocket-client
mock

Wyświetl plik

@ -0,0 +1,60 @@
"""
Constants that refer to SDRangel software
"""
# Device keys depending on hardware type (deviceHwType)
DEVICE_TYPES = {
"AirspyHF": {
"settings": "airspyHFSettings",
"cf_key": "centerFrequency",
}
}
# Channel keys depending on channel type (id)
CHANNEL_TYPES = {
"AMDemod": {
"settings": "AMDemodSettings",
"df_key": "inputFrequencyOffset",
"mute_key": "audioMute"
},
"BFMDemod": {
"settings": "BFMDemodSettings",
"df_key": "inputFrequencyOffset",
"mute_key": "volume"
},
"ChirpChatDemod": {
"settings": "ChirpChatDemodSettings",
"df_key": "inputFrequencyOffset",
"mute_key": "decodeActive"
},
"DSDDemod": {
"settings": "DSDDemodSettings",
"df_key": "inputFrequencyOffset",
"mute_key": "audioMute"
},
"FreeDVDemod": {
"settings": "FreeDVDemodSettings",
"df_key": "inputFrequencyOffset",
"mute_key": "audioMute"
},
"NFMDemod": {
"settings": "NFMDemodSettings",
"df_key": "inputFrequencyOffset",
"mute_key": "audioMute"
},
"SSBDemod": {
"settings": "SSBDemodSettings",
"df_key": "inputFrequencyOffset",
"mute_key": "audioMute"
},
"UDPSink": {
"settings": "UDPSinkSettings",
"df_key": "inputFrequencyOffset",
"mute_key": "channelMute"
},
"WFMDemod": {
"settings": "WFMDemodSettings",
"df_key": "inputFrequencyOffset",
"mute_key": "audioMute"
}
}

Wyświetl plik

@ -0,0 +1,453 @@
#!/usr/bin/env python3
"""
Connects to spectrum server to monitor PSD and detect local increase to pilot channel(s)
"""
import requests, traceback, sys, json, time
import struct, operator
import math
import numpy as np
import websocket
try:
import thread
except ImportError:
import _thread as thread
import time
from datetime import datetime
from optparse import OptionParser
import sdrangel
OPTIONS = None
API_URI = None
WS_URI = None
PASS_INDEX = 0
PSD_FLOOR = []
CONFIG = {}
UNSERVED_FREQUENCIES = []
# ======================================================================
class SuperScannerError(Exception):
def __init__(self, message):
self.message = message
# ======================================================================
class SuperScannerWebsocketError(SuperScannerError):
pass
# ======================================================================
class SuperScannerWebsocketClosed(SuperScannerError):
pass
# ======================================================================
class SuperScannerOptionsError(SuperScannerError):
pass
# ======================================================================
class SuperScannerAPIError(SuperScannerError):
pass
# ======================================================================
def log_with_timestamp(message):
t = datetime.utcnow()
print(f'{t.isoformat()} {message}')
# ======================================================================
def get_input_options(args=None):
if args is None:
args = sys.argv[1:]
parser = OptionParser(usage="usage: %%prog [-t]\n")
parser.add_option("-a", "--address", dest="address", help="SDRangel web base address. Default: 127.0.0.1", metavar="ADDRESS", type="string")
parser.add_option("-p", "--api-port", dest="api_port", help="SDRangel API port. Default: 8091", metavar="PORT", type="int")
parser.add_option("-w", "--ws-port", dest="ws_port", help="SDRangel websocket spectrum server port. Default: 8887", metavar="PORT", type="int")
parser.add_option("-c", "--config-file", dest="config_file", help="JSON configuration file. Mandatory", metavar="FILE", type="string")
parser.add_option("-j", "--psd-in", dest="psd_input_file", help="JSON file containing PSD floor information.", metavar="FILE", type="string")
parser.add_option("-J", "--psd-out", dest="psd_output_file", help="Write PSD floor information to JSON file.", metavar="FILE", type="string")
parser.add_option("-n", "--nb-passes", dest="passes", help="Number of passes for PSD floor estimation. Default: 10", metavar="NUM", type="int")
parser.add_option("-m", "--margin", dest="margin", help="Margin in dB above PSD floor to detect acivity. Default: 3", metavar="DB", type="int")
parser.add_option("-f", "--psd-level", dest="psd_fixed", help="Use a fixed PSD floor value.", metavar="DB", type="float")
parser.add_option("-X", "--psd-exclude-higher", dest="psd_exclude_higher", help="Level above which to exclude bin scan.", metavar="DB", type="float")
parser.add_option("-x", "--psd-exclude-lower", dest="psd_exclude_lower", help="Level below which to exclude bin scan.", metavar="DB", type="float")
parser.add_option("-N", "--hotspots-noise", dest="hotspots_noise", help="Number of hotspots above which detection is considered as noise. Default 8", metavar="NUM", type="int")
parser.add_option("-G", "--psd-graph", dest="psd_graph", help="Show PSD floor graphs. Requires matplotlib", action="store_true")
parser.add_option("-g", "--group-tolerance", dest="group_tolerance", help="Radius (1D) tolerance in points (bins) for hotspots grouping. Default 1.", metavar="NUM", type="int")
parser.add_option("-r", "--freq-round", dest="freq_round", help="Frequency rounding value in Hz. Default: 1 (no rounding)", metavar="NUM", type="int")
parser.add_option("-o", "--freq-offset", dest="freq_offset", help="Frequency rounding offset in Hz. Default: 0 (no offset)", metavar="NUM", type="int")
(options, args) = parser.parse_args(args)
if (options.config_file == None):
raise SuperScannerOptionsError('A configuration file is required. Option -c or --config-file')
if (options.address == None):
options.address = "127.0.0.1"
if (options.api_port == None):
options.api_port = 8091
if (options.ws_port == None):
options.ws_port = 8887
if (options.passes == None):
options.passes = 10
elif options.passes < 1:
options.passes = 1
if (options.margin == None):
options.margin = 3
if (options.hotspots_noise == None):
options.hotspots_noise = 8
if (options.group_tolerance == None):
options.group_tolerance = 1
if (options.freq_round == None):
options.freq_round = 1
if (options.freq_offset == None):
options.freq_offset = 0
return options
# ======================================================================
def on_ws_message(ws, message):
global PASS_INDEX
try:
struct_message = decode_message(message)
if OPTIONS.psd_fixed is not None and OPTIONS.passes > 0:
compute_fixed_floor(struct_message)
OPTIONS.passes = 0 # done
elif OPTIONS.psd_input_file is not None and OPTIONS.passes > 0:
global PSD_FLOOR
with open(OPTIONS.psd_input_file) as json_file:
PSD_FLOOR = json.load(json_file)
OPTIONS.passes = 0 # done
elif OPTIONS.passes > 0:
compute_floor(struct_message)
OPTIONS.passes -= 1
PASS_INDEX += 1
print(f'PSD floor pass no {PASS_INDEX}')
elif OPTIONS.passes == 0:
OPTIONS.passes -= 1
if OPTIONS.psd_output_file:
with open(OPTIONS.psd_output_file, 'w') as outfile:
json.dump(PSD_FLOOR, outfile)
if OPTIONS.psd_graph:
show_floor()
else:
scan(struct_message)
except Exception as ex:
tb = traceback.format_exc()
print(tb, file=sys.stderr)
# ======================================================================
def on_ws_error(ws, error):
raise SuperScannerWebsocketError(f'{error}')
# ======================================================================
def on_ws_close(ws):
raise SuperScannerWebsocketClosed('websocket closed')
# ======================================================================
def on_ws_open(ws):
log_with_timestamp('Web socket opened starting...')
def run(*args):
pass
thread.start_new_thread(run, ())
# ======================================================================
def decode_message(byte_message):
struct_message = {}
struct_message['cf'] = int.from_bytes(byte_message[0:8], byteorder='little', signed=False)
struct_message['elasped'] = int.from_bytes(byte_message[8:16], byteorder='little', signed=False)
struct_message['ts'] = int.from_bytes(byte_message[16:24], byteorder='little', signed=False)
struct_message['fft_size'] = int.from_bytes(byte_message[24:28], byteorder='little', signed=False)
struct_message['fft_bw'] = int.from_bytes(byte_message[28:32], byteorder='little', signed=False)
indicators = int.from_bytes(byte_message[32:36], byteorder='little', signed=False)
struct_message['linear'] = (indicators & 1) == 1
struct_message['ssb'] = ((indicators & 2) >> 1) == 1
struct_message['usb'] = ((indicators & 4) >> 2) == 1
struct_message['samples'] = []
for sample_index in range(struct_message['fft_size']):
psd = struct.unpack('f', byte_message[36 + 4*sample_index: 40 + 4*sample_index])[0]
struct_message['samples'].append(psd)
return struct_message
# ======================================================================
def compute_fixed_floor(struct_message):
global PSD_FLOOR
nb_samples = len(struct_message['samples'])
PSD_FLOOR = [(OPTIONS.psd_fixed, False)] * nb_samples
# ======================================================================
def compute_floor(struct_message):
global PSD_FLOOR
fft_size = struct_message['fft_size']
psd_samples = struct_message['samples']
for psd_index, psd in enumerate(psd_samples):
exclude = False
if OPTIONS.psd_exclude_higher:
exclude = psd > OPTIONS.psd_exclude_higher
if OPTIONS.psd_exclude_lower:
exclude = psd < OPTIONS.psd_exclude_lower
if psd_index < len(PSD_FLOOR):
PSD_FLOOR[psd_index][1] = exclude or PSD_FLOOR[psd_index][1]
if psd > PSD_FLOOR[psd_index][0]:
PSD_FLOOR[psd_index][0] = psd
else:
PSD_FLOOR.append([])
PSD_FLOOR[psd_index].append(psd)
PSD_FLOOR[psd_index].append(exclude)
# ======================================================================
def show_floor():
import matplotlib
import matplotlib.pyplot as plt
print('show_floor')
plt.figure(1)
plt.subplot(211)
plt.plot([x[1] for x in PSD_FLOOR])
plt.ylabel('PSD exclusion')
plt.subplot(212)
plt.plot([x[0] for x in PSD_FLOOR])
plt.ylabel('PSD floor')
plt.show()
# ======================================================================
def freq_rounding(freq, round_freq, round_offset):
shifted_freq = freq - round_offset
return round(shifted_freq/round_freq)*round_freq + round_offset
# ======================================================================
def scan(struct_message):
ts = struct_message['ts']
freq_density = struct_message['fft_bw'] / struct_message['fft_size']
hotspots = []
hotspot ={}
last_hotspot_index = 0
if struct_message['ssb']:
freq_start = struct_message['cf']
freq_stop = struct_message['cf'] + struct_message['fft_bw']
else:
freq_start = struct_message['cf'] - (struct_message['fft_bw'] / 2)
freq_stop = struct_message['cf'] + (struct_message['fft_bw'] / 2)
psd_samples = struct_message['samples']
psd_sum = 0
psd_count = 1
for psd_index, psd in enumerate(psd_samples):
freq = freq_start + psd_index*freq_density
if PSD_FLOOR[psd_index][1]: # exclusion zone
continue
if psd > PSD_FLOOR[psd_index][0] + OPTIONS.margin: # detection
psd_sum += 10**(psd/10)
psd_count += 1
if psd_index > last_hotspot_index + OPTIONS.group_tolerance: # new hotspot
if hotspot.get("begin"): # finalize previous hotspot
hotspot["end"] = hotspot_end
hotspot["power"] = psd_sum / psd_count
hotspots.append(hotspot)
hotspot = {"begin": freq}
psd_sum = 10**(psd/10)
psd_count = 1
hotspot_end = freq
last_hotspot_index = psd_index
if hotspot.get("begin"): # finalize last hotspot
hotspot["end"] = hotspot_end
hotspot["power"] = psd_sum / psd_count
hotspots.append(hotspot)
process_hotspots(hotspots)
# ======================================================================
def allocate_channel():
channels = CONFIG['channel_info']
for channel in channels:
if channel['usage'] == 0:
return channel
return None
# ======================================================================
def freq_in_ranges_check(freq):
freqrange_inclusions = CONFIG.get('freqrange_inclusions', [])
freqrange_exclusions = CONFIG.get('freqrange_exclusions', [])
for freqrange in freqrange_exclusions:
if freqrange[0] <= freq <= freqrange[1]:
return False
for freqrange in freqrange_inclusions:
if freqrange[0] <= freq <= freqrange[1]:
return True
return False
# ======================================================================
def get_hotspot_frequency(channel, hotspot):
fc_pos = channel.get('fc_pos', 'center')
if fc_pos == 'lsb':
channel_frequency = freq_rounding(hotspot['end'], OPTIONS.freq_round, OPTIONS.freq_offset)
elif fc_pos == 'usb':
channel_frequency = freq_rounding(hotspot['begin'], OPTIONS.freq_round, OPTIONS.freq_offset)
else:
channel_frequency = freq_rounding(hotspot['fc'], OPTIONS.freq_round, OPTIONS.freq_offset)
fc_shift = channel.get('fc_shift', 0)
return channel_frequency + fc_shift
# ======================================================================
def process_hotspots(scanned_hotspots):
global CONFIG
global UNSERVED_FREQUENCIES
if len(scanned_hotspots) > OPTIONS.hotspots_noise:
return
# calculate frequency for each hotspot and create list of valid hotspots
hotspots = []
for hotspot in scanned_hotspots:
width = hotspot['end'] - hotspot['begin']
fc = hotspot['begin'] + width/2
if not freq_in_ranges_check(fc):
continue
hotspot['fc'] = fc
hotspot['begin'] = fc - (width/2) # re-center around fc
hotspot['end'] = fc + (width/2)
hotspots.append(hotspot)
# calculate hotspot distances for each used channel and reuse the channel for the closest hotspot
channels = CONFIG['channel_info']
used_channels = [channel for channel in channels if channel['usage'] == 1]
consolidated_distances = []
for channel in used_channels: # loop on used channels
distances = [[abs(channel['frequency'] - get_hotspot_frequency(channel, hotspot)), hotspot] for hotspot in hotspots]
distances = sorted(distances, key=operator.itemgetter(0))
if distances:
consolidated_distances.append([distances[0][0], channel, distances[0][1]]) # [distance, channel, hotspot]
consolidated_distances = sorted(consolidated_distances, key=operator.itemgetter(0)) # get (channel, hotspot) pair with shortest distance first
# reallocate used channels on their closest hotspot
for distance in consolidated_distances:
channel = distance[1]
hotspot = distance[2]
if hotspot in hotspots: # hotspot is not processed yet
channel_frequency = get_hotspot_frequency(channel, hotspot)
channel['usage'] = 2 # mark channel used on this pass
if channel['frequency'] != channel_frequency: # optimization: do not move to same frequency
channel['frequency'] = channel_frequency
channel_index = channel['index']
set_channel_frequency(channel)
log_with_timestamp(f'Moved channel {channel_index} to frequency {channel_frequency} Hz')
hotspots.remove(hotspot) # done with this hotspot
# for remaining hotspots we need to allocate new channels
for hotspot in hotspots:
channel = allocate_channel()
if channel:
channel_index = channel['index']
channel_frequency = get_hotspot_frequency(channel, hotspot)
channel['usage'] = 2 # mark channel used on this pass
channel['frequency'] = channel_frequency
set_channel_frequency(channel)
log_with_timestamp(f'Allocated channel {channel_index} on frequency {channel_frequency} Hz')
else:
fc = hotspot['fc']
if fc not in UNSERVED_FREQUENCIES:
UNSERVED_FREQUENCIES.append(fc)
log_with_timestamp(f'All channels allocated. Cannot process signal at {fc} Hz')
# cleanup
for channel in CONFIG['channel_info']:
if channel['usage'] == 1: # channel unused on this pass
channel['usage'] = 0 # release it
channel_index = channel['index']
fc = channel['frequency']
set_channel_mute(channel)
UNSERVED_FREQUENCIES.clear() # at least one channel is able to serve next time
log_with_timestamp(f'Released channel {channel_index} on frequency {fc} Hz')
elif channel['usage'] == 2: # channel used on this pass
channel['usage'] = 1 # reset usage for next pass
# ======================================================================
def set_channel_frequency(channel):
deviceset_index = CONFIG['deviceset_index']
channel_index = channel['index']
channel_id = channel['id']
df = channel['frequency'] - CONFIG['device_frequency']
url = f'{API_URI}/sdrangel/deviceset/{deviceset_index}/channel/{channel_index}/settings'
payload = {
sdrangel.CHANNEL_TYPES[channel_id]['settings']: {
sdrangel.CHANNEL_TYPES[channel_id]['df_key']: df,
sdrangel.CHANNEL_TYPES[channel_id]['mute_key']: 0
},
'channelType': channel_id,
'direction': 0
}
r = requests.patch(url=url, json=payload)
if r.status_code // 100 != 2:
raise SuperScannerAPIError(f'Set channel {channel_index} frequency failed')
# ======================================================================
def set_channel_mute(channel):
deviceset_index = CONFIG['deviceset_index']
channel_index = channel['index']
channel_id = channel['id']
url = f'{API_URI}/sdrangel/deviceset/{deviceset_index}/channel/{channel_index}/settings'
payload = {
sdrangel.CHANNEL_TYPES[channel_id]['settings']: {
sdrangel.CHANNEL_TYPES[channel_id]['mute_key']: 1
},
'channelType': channel_id,
'direction': 0
}
r = requests.patch(url=url, json=payload)
if r.status_code // 100 != 2:
raise SuperScannerAPIError(f'Set channel {channel_index} mute failed')
# ======================================================================
def get_deviceset_info(deviceset_index):
url = f'{API_URI}/sdrangel/deviceset/{deviceset_index}'
r = requests.get(url=url)
if r.status_code // 100 != 2:
raise SuperScannerAPIError(f'Get deviceset {deviceset_index} info failed')
return r.json()
# ======================================================================
def make_config():
global CONFIG
deviceset_index = CONFIG['deviceset_index']
deviceset_info = get_deviceset_info(deviceset_index)
device_frequency = deviceset_info["samplingDevice"]["centerFrequency"]
CONFIG['device_frequency'] = device_frequency
for channel_info in CONFIG['channel_info']:
channel_index = channel_info['index']
if channel_index < deviceset_info['channelcount']:
channel_offset = deviceset_info['channels'][channel_index]['deltaFrequency']
channel_id = deviceset_info['channels'][channel_index]['id']
channel_info['id'] = channel_id
channel_info['usage'] = 0 # 0: unused 1: used 2: reused in current allocation step (temporary state)
channel_info['frequency'] = device_frequency + channel_offset
else:
raise SuperScannerAPIError(f'There is no channel with index {channel_index} in deviceset {deviceset_index}')
# ======================================================================
def main():
try:
global OPTIONS
global CONFIG
global API_URI
global WS_URI
OPTIONS = get_input_options()
log_with_timestamp(f'Start with options: {OPTIONS}')
with open(OPTIONS.config_file) as json_file: # get base config
CONFIG = json.load(json_file)
log_with_timestamp(f'Initial configuration: {CONFIG}')
API_URI = f'http://{OPTIONS.address}:{OPTIONS.api_port}'
WS_URI = f'ws://{OPTIONS.address}:{OPTIONS.ws_port}'
make_config() # complete config with device set information from SDRangel
ws = websocket.WebSocketApp(WS_URI,
on_message = on_ws_message,
on_error = on_ws_error,
on_close = on_ws_close)
ws.on_open = on_ws_open
ws.run_forever()
except SuperScannerWebsocketError as ex:
print(ex.message)
except SuperScannerWebsocketClosed:
print("Spectrum websocket closed")
except Exception as ex:
tb = traceback.format_exc()
print(tb, file=sys.stderr)
# ======================================================================
if __name__ == "__main__":
main()

Wyświetl plik

@ -0,0 +1,211 @@
import unittest
import mock
import superscanner
# ======================================================================
def print_hex(bytestring):
print('\\x' + '\\x'.join('{:02x}'.format(x) for x in bytestring))
# ======================================================================
def get_deviceset_info(deviceset_index):
return {
"channelcount": 4,
"channels": [
{
"deltaFrequency": 170000,
"direction": 0,
"id": "NFMDemod",
"index": 0,
"title": "NFM Demodulator",
"uid": 1590355926650308
},
{
"deltaFrequency": -155000,
"direction": 0,
"id": "DSDDemod",
"index": 1,
"title": "DSD Demodulator",
"uid": 1590355926718405
},
{
"deltaFrequency": 170000,
"direction": 0,
"id": "NFMDemod",
"index": 2,
"title": "NFM Demodulator",
"uid": 1590355926939766
},
{
"deltaFrequency": -95000,
"direction": 0,
"id": "NFMDemod",
"index": 3,
"title": "NFM Demodulator",
"uid": 1590355926945674
}
],
"samplingDevice": {
"bandwidth": 768000,
"centerFrequency": 145480000,
"deviceNbStreams": 1,
"deviceStreamIndex": 0,
"direction": 0,
"hwType": "AirspyHF",
"index": 0,
"sequence": 0,
"serial": "c852a98040c73f93",
"state": "running"
}
}
# ======================================================================
def set_channel_frequency(channel):
pass
# ======================================================================
def set_channel_mute(channel):
pass
# ======================================================================
class TestStringMethods(unittest.TestCase):
def test_upper(self):
self.assertEqual('foo'.upper(), 'FOO')
def test_isupper(self):
self.assertTrue('FOO'.isupper())
self.assertFalse('Foo'.isupper())
def test_split(self):
s = 'hello world'
self.assertEqual(s.split(), ['hello', 'world'])
# check that s.split fails when the separator is not a string
with self.assertRaises(TypeError):
s.split(2)
# ======================================================================
class TestSuperScannerOptions(unittest.TestCase):
def test_options_minimal(self):
options = superscanner.get_input_options(["-ctoto"])
self.assertEqual(options.config_file, 'toto')
# ======================================================================
class TestSuperScannerDecode(unittest.TestCase):
def test_decode_bytes(self):
msg_bytes = b'\x40\xd9\xab\x08\x00\x00\x00\x00' + \
b'\xff\x00\x00\x00\x00\x00\x00\x00' + \
b'\x69\x63\xbb\x55\x72\x01\x00\x00' + \
b'\x00\x04\x00\x00' + \
b'\x00\xb8\x0b\x00' + \
b'\x04\x00\x00\x00'
for i in range(1024):
msg_bytes += b'\x00\x00\x00\x00'
msg_struct = superscanner.decode_message(msg_bytes)
self.assertEqual(msg_struct['fft_size'], 1024)
# ======================================================================
class TestSuperScannerProcessHotspots(unittest.TestCase):
@mock.patch('superscanner.get_deviceset_info', side_effect=get_deviceset_info)
def setUp(self, urandom_function):
self.options = type('options', (object,), {})()
self.options.address = '127.0.0.1'
self.options.passes = 10
self.options.api_port = 8091
self.options.ws_port = 8887
self.options.config_file = 'toto'
self.options.psd_input_file = None
self.options.psd_output_file = None
self.options.passes = 10
self.options.margin = 3
self.options.psd_fixed = None
self.options.psd_exclude_higher = None
self.options.psd_exclude_lower = None
self.options.psd_graph = None
self.options.group_tolerance = 1
self.options.freq_round = 1
self.options.freq_offset = 0
superscanner.OPTIONS = self.options
superscanner.CONFIG = {
"deviceset_index": 0,
"freqrange_exclusions": [
[145290000, 145335000],
[145692500, 145707500]
],
"freqrange_inclusions": [
[145170000, 145800000]
],
"channel_info": [
{
"index": 0,
"fc_pos": "center"
},
{
"index": 2
},
{
"index": 3
}
]
}
superscanner.make_config()
def test_make_config(self):
self.assertEqual(superscanner.CONFIG['device_frequency'], 145480000)
@mock.patch('superscanner.set_channel_frequency', side_effect=set_channel_frequency)
@mock.patch('superscanner.set_channel_mute', side_effect=set_channel_mute)
def test_process_hotspot(self, set_channel_frequency, set_channel_mute):
hotspots1 = [
{
'begin': 145550000,
'end': 145550000,
'power': -50
}
]
superscanner.process_hotspots(hotspots1)
channel_info = superscanner.CONFIG['channel_info']
self.assertEqual(channel_info[0]['usage'], 1)
self.assertEqual(channel_info[1]['usage'], 0)
self.assertEqual(channel_info[2]['usage'], 0)
self.assertEqual(channel_info[0]['frequency'], 145550000)
hotspots2 = [
{
'begin': 145200000,
'end': 145200000,
'power': -35
},
{
'begin': 145550000,
'end': 145550000,
'power': -50
}
]
superscanner.process_hotspots(hotspots2)
channel_info = superscanner.CONFIG['channel_info']
self.assertEqual(channel_info[0]['usage'], 1)
self.assertEqual(channel_info[1]['usage'], 1)
self.assertEqual(channel_info[2]['usage'], 0)
self.assertEqual(channel_info[0]['frequency'], 145550000)
self.assertEqual(channel_info[1]['frequency'], 145200000)
hotspots3 = [
{
'begin': 145200000,
'end': 145200000,
'power': -35
}
]
superscanner.process_hotspots(hotspots3)
channel_info = superscanner.CONFIG['channel_info']
self.assertEqual(channel_info[0]['usage'], 0)
self.assertEqual(channel_info[1]['usage'], 1)
self.assertEqual(channel_info[2]['usage'], 0)
self.assertEqual(channel_info[1]['frequency'], 145200000)
# ======================================================================
if __name__ == '__main__':
unittest.main()