kopia lustrzana https://github.com/f4exb/sdrangel
qo100_datv.py: move all logic to a class. Apply pylint
rodzic
d355d0b2ae
commit
b11e7ba4cd
|
@ -3,15 +3,17 @@
|
||||||
Quick control of configuration for receiving QO-100 DATV
|
Quick control of configuration for receiving QO-100 DATV
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import requests, traceback, sys, json, time
|
import traceback
|
||||||
from optparse import OptionParser
|
import sys
|
||||||
|
from optparse import OptionParser # pylint: disable=deprecated-module
|
||||||
|
import requests
|
||||||
import sdrangel
|
import sdrangel
|
||||||
|
|
||||||
base_url_pattern = "http://{0}/sdrangel"
|
BASE_URL_PATTERN = "http://{0}/sdrangel"
|
||||||
base_url = None
|
BASE_URL = None
|
||||||
|
|
||||||
# ======================================================================
|
# ======================================================================
|
||||||
def getInputOptions():
|
def get_input_options():
|
||||||
""" Parse options """
|
""" Parse options """
|
||||||
# ----------------------------------------------------------------------
|
# ----------------------------------------------------------------------
|
||||||
parser = OptionParser(usage="usage: %%prog [-t]\n")
|
parser = OptionParser(usage="usage: %%prog [-t]\n")
|
||||||
|
@ -37,115 +39,122 @@ def getInputOptions():
|
||||||
|
|
||||||
return options
|
return options
|
||||||
|
|
||||||
# ======================================================================
|
class Controller:
|
||||||
def change_center_frequency(content, new_frequency):
|
""" Controller main class """
|
||||||
""" Change center frequency searching recursively """
|
def __init__(self):
|
||||||
# ----------------------------------------------------------------------
|
self.device_hwid = None
|
||||||
for k in content:
|
|
||||||
if isinstance(content[k], dict):
|
|
||||||
change_center_frequency(content[k], new_frequency)
|
|
||||||
elif k == "centerFrequency":
|
|
||||||
content[k] = new_frequency
|
|
||||||
|
|
||||||
# ======================================================================
|
|
||||||
def change_decim(content, new_decim):
|
|
||||||
""" Change log2 decimation searching recursively """
|
|
||||||
# ----------------------------------------------------------------------
|
|
||||||
for k in content:
|
|
||||||
if isinstance(content[k], dict):
|
|
||||||
change_decim(content[k], new_decim)
|
|
||||||
elif k == "log2Decim":
|
|
||||||
content[k] = new_decim
|
|
||||||
|
|
||||||
# ======================================================================
|
# ======================================================================
|
||||||
def change_rfbw(content, new_rfbw):
|
def change_center_frequency(self, content, new_frequency):
|
||||||
""" Change RF bandwidth searching recursively """
|
""" Change center frequency searching recursively """
|
||||||
# ----------------------------------------------------------------------
|
# ----------------------------------------------------------------------
|
||||||
for k in content:
|
for k in content:
|
||||||
if isinstance(content[k], dict):
|
if isinstance(content[k], dict):
|
||||||
change_rfbw(content[k], new_rfbw)
|
self.change_center_frequency(content[k], new_frequency)
|
||||||
elif k == "rfBandwidth":
|
elif k == "centerFrequency":
|
||||||
content[k] = new_rfbw
|
content[k] = new_frequency
|
||||||
|
|
||||||
# ======================================================================
|
# ======================================================================
|
||||||
def change_symbol_rate(content, new_symbol_rate):
|
def change_decim(self, content, new_decim):
|
||||||
""" Change symbol rate searching recursively """
|
""" Change log2 decimation searching recursively """
|
||||||
# ----------------------------------------------------------------------
|
# ----------------------------------------------------------------------
|
||||||
for k in content:
|
for k in content:
|
||||||
if isinstance(content[k], dict):
|
if isinstance(content[k], dict):
|
||||||
change_symbol_rate(content[k], new_symbol_rate)
|
self.change_decim(content[k], new_decim)
|
||||||
elif k == "symbolRate":
|
elif k == "log2Decim":
|
||||||
content[k] = new_symbol_rate
|
content[k] = new_decim
|
||||||
|
|
||||||
# ======================================================================
|
# ======================================================================
|
||||||
def get_device_sr_and_decim(hwtype, settings):
|
def change_rfbw(self, content, new_rfbw):
|
||||||
""" Return device sample rate and decimation """
|
""" Change RF bandwidth searching recursively """
|
||||||
# ----------------------------------------------------------------------
|
# ----------------------------------------------------------------------
|
||||||
if hwtype == "Airspy" or hwtype == "AirspyHF":
|
for k in content:
|
||||||
sr_index = settings.get("devSampleRateIndex", 0)
|
if isinstance(content[k], dict):
|
||||||
if sr_index == 1:
|
self.change_rfbw(content[k], new_rfbw)
|
||||||
sr = 3000000
|
elif k == "rfBandwidth":
|
||||||
|
content[k] = new_rfbw
|
||||||
|
|
||||||
|
# ======================================================================
|
||||||
|
def change_symbol_rate(self, content, new_symbol_rate):
|
||||||
|
""" Change symbol rate searching recursively """
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
for k in content:
|
||||||
|
if isinstance(content[k], dict):
|
||||||
|
self.change_symbol_rate(content[k], new_symbol_rate)
|
||||||
|
elif k == "symbolRate":
|
||||||
|
content[k] = new_symbol_rate
|
||||||
|
|
||||||
|
# ======================================================================
|
||||||
|
def get_device_sr_and_decim(self, hwtype, settings):
|
||||||
|
""" Return device sample rate and decimation """
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
if hwtype == "Airspy" or hwtype == "AirspyHF":
|
||||||
|
sr_index = settings.get("devSampleRateIndex", 0)
|
||||||
|
if sr_index == 1:
|
||||||
|
sr = 3000000
|
||||||
|
else:
|
||||||
|
sr = 6000000
|
||||||
|
log2_decim = settings.get("log2Decim", 0)
|
||||||
else:
|
else:
|
||||||
sr = 6000000
|
sr = settings.get("devSampleRate", 0)
|
||||||
log2_decim = settings.get("log2Decim", 0)
|
log2_decim = settings.get("log2Decim", 0)
|
||||||
else:
|
return sr, 1<<log2_decim
|
||||||
sr = settings.get("devSampleRate", 0)
|
|
||||||
log2_decim = settings.get("log2Decim", 0)
|
|
||||||
return sr, 1<<log2_decim
|
|
||||||
|
|
||||||
# ======================================================================
|
# ======================================================================
|
||||||
def calc_decim(device_sr, symbol_rate):
|
@staticmethod
|
||||||
""" Calculate ideal decimation and return as log2 """
|
def calc_decim(device_sr, symbol_rate):
|
||||||
# ----------------------------------------------------------------------
|
""" Calculate ideal decimation and return as log2 """
|
||||||
quot = device_sr // (2*symbol_rate*1000)
|
# ----------------------------------------------------------------------
|
||||||
for i in range(6):
|
quot = device_sr // (2*symbol_rate*1000)
|
||||||
if quot < 1<<i:
|
for i in range(6):
|
||||||
break
|
if quot < 1<<i:
|
||||||
return 0 if i < 1 else i - 1
|
break
|
||||||
|
return 0 if i < 1 else i - 1
|
||||||
|
|
||||||
# ======================================================================
|
# ======================================================================
|
||||||
def set_device(options):
|
def set_device(self, options):
|
||||||
""" Set the device """
|
""" Set the device """
|
||||||
# ----------------------------------------------------------------------
|
# ----------------------------------------------------------------------
|
||||||
device_settings_url = f"{base_url}/deviceset/{options.device_index}/device/settings"
|
device_settings_url = f"{BASE_URL}/deviceset/{options.device_index}/device/settings"
|
||||||
r = requests.get(url=device_settings_url)
|
r = requests.get(url=device_settings_url, timeout=10)
|
||||||
if r.status_code // 100 == 2: # OK
|
if r.status_code // 100 == 2: # OK
|
||||||
rj = r.json()
|
rj = r.json()
|
||||||
hwtype = rj.get("deviceHwType", None)
|
hwtype = rj.get("deviceHwType", None)
|
||||||
device_settings = rj.get(sdrangel.DEVICE_TYPES[hwtype]["settings"], None)
|
device_settings = rj.get(sdrangel.DEVICE_TYPES[hwtype]["settings"], None)
|
||||||
device_sr, device_decim = get_device_sr_and_decim(hwtype, device_settings)
|
device_sr, device_decim = self.get_device_sr_and_decim(hwtype, device_settings)
|
||||||
new_decim = calc_decim(device_sr, options.symbol_rate)
|
new_decim = self.calc_decim(device_sr, options.symbol_rate)
|
||||||
print(f"sr: {device_sr} S/s decim: {device_decim} new decim: {1<<new_decim}")
|
print(f"sr: {device_sr} S/s decim: {device_decim} new decim: {1<<new_decim}")
|
||||||
if not options.no_decim:
|
if not options.no_decim:
|
||||||
change_decim(rj, new_decim)
|
self.change_decim(rj, new_decim)
|
||||||
change_center_frequency(rj, options.frequency*1000)
|
self.change_center_frequency(rj, options.frequency*1000)
|
||||||
r = requests.patch(url=device_settings_url, json=rj)
|
r = requests.patch(url=device_settings_url, json=rj, timeout=10)
|
||||||
if r.status_code / 100 == 2:
|
if r.status_code / 100 == 2:
|
||||||
print(f'set_device: changed #{options.device_index}: frequency: {options.frequency} kHz log2Decim: {new_decim} No decim: {options.no_decim}')
|
print(f'set_device: changed #{options.device_index}: frequency: {options.frequency} kHz log2Decim: {new_decim} No decim: {options.no_decim}')
|
||||||
|
else:
|
||||||
|
raise RuntimeError(f'set_device: failed to change device {options.device_index} with error {r.status_code}')
|
||||||
else:
|
else:
|
||||||
raise RuntimeError(f'set_device: failed to change device {options.device_index} with error {r.status_code}')
|
raise RuntimeError(f"Cannot get device info for device set index {options.device_index} HTTP return code {r.status_code}")
|
||||||
else:
|
|
||||||
raise RuntimeError(f"Cannot get device info for device set index {options.device_index} HTTP return code {r.status_code}")
|
|
||||||
|
|
||||||
# ======================================================================
|
# ======================================================================
|
||||||
def set_channel(options):
|
def set_channel(self, options):
|
||||||
""" Set the channel - assume DATV demod """
|
""" Set the channel - assume DATV demod """
|
||||||
# ----------------------------------------------------------------------
|
# ----------------------------------------------------------------------
|
||||||
channel_settings_url = f"{base_url}/deviceset/{options.device_index}/channel/{options.channel_index}/settings"
|
channel_settings_url = f"{BASE_URL}/deviceset/{options.device_index}/channel/{options.channel_index}/settings"
|
||||||
r = requests.get(url=channel_settings_url)
|
r = requests.get(url=channel_settings_url, timeout=10)
|
||||||
if r.status_code // 100 == 2: # OK
|
if r.status_code // 100 == 2: # OK
|
||||||
rj = r.json()
|
rj = r.json()
|
||||||
rfbw = round(1.5*options.symbol_rate)*1000
|
rfbw = round(1.5*options.symbol_rate)*1000
|
||||||
change_center_frequency(rj, 0)
|
self.change_center_frequency(rj, 0)
|
||||||
change_rfbw(rj, rfbw)
|
self.change_rfbw(rj, rfbw)
|
||||||
change_symbol_rate(rj, options.symbol_rate*1000)
|
self.change_symbol_rate(rj, options.symbol_rate*1000)
|
||||||
r = requests.patch(url=channel_settings_url, json=rj)
|
r = requests.patch(url=channel_settings_url, json=rj, timeout=10)
|
||||||
if r.status_code / 100 == 2:
|
if r.status_code / 100 == 2:
|
||||||
print(f'set_channel: changed {options.device_index}:{options.channel_index}: rfbw: {rfbw} Hz symbol rate: {options.symbol_rate} kS/s')
|
print(f'set_channel: changed {options.device_index}:{options.channel_index}: rfbw: {rfbw} Hz symbol rate: {options.symbol_rate} kS/s')
|
||||||
|
else:
|
||||||
|
raise RuntimeError(f'set_device: failed to change channel {options.device_index}:{options.channel_index} with error {r.status_code}')
|
||||||
else:
|
else:
|
||||||
raise RuntimeError(f'set_device: failed to change channel {options.device_index}:{options.channel_index} with error {r.status_code}')
|
raise RuntimeError(f"Cannot get channel info for channel {options.device_index}:{options.channel_index} HTTP return code {r.status_code}")
|
||||||
else:
|
|
||||||
raise RuntimeError(f"Cannot get channel info for channel {options.device_index}:{options.channel_index} HTTP return code {r.status_code}")
|
|
||||||
|
|
||||||
|
|
||||||
# ======================================================================
|
# ======================================================================
|
||||||
|
@ -153,16 +162,18 @@ def main():
|
||||||
""" Main program """
|
""" Main program """
|
||||||
# ----------------------------------------------------------------------
|
# ----------------------------------------------------------------------
|
||||||
try:
|
try:
|
||||||
options = getInputOptions()
|
options = get_input_options()
|
||||||
|
|
||||||
global base_url
|
global BASE_URL # pylint: disable=global-statement
|
||||||
base_url = base_url_pattern.format(options.address)
|
BASE_URL = BASE_URL_PATTERN.format(options.address)
|
||||||
print(f"Frequency: 10{options.frequency/1000} MHz Symbol rate: {options.symbol_rate} kS/s")
|
print(f"Frequency: 10{options.frequency/1000} MHz Symbol rate: {options.symbol_rate} kS/s")
|
||||||
set_device(options)
|
controller = Controller()
|
||||||
set_channel(options)
|
controller.set_device(options)
|
||||||
|
controller.set_channel(options)
|
||||||
|
|
||||||
except Exception as ex:
|
except Exception as ex: # pylint: disable=broad-except
|
||||||
tb = traceback.format_exc()
|
tb = traceback.format_exc()
|
||||||
|
print(f"Exception caught {ex}")
|
||||||
print(tb, file=sys.stderr)
|
print(tb, file=sys.stderr)
|
||||||
|
|
||||||
# ======================================================================
|
# ======================================================================
|
||||||
|
|
Ładowanie…
Reference in New Issue