Feature/refactoring (#556)

* github workflow naming for ubuntu 22.04
* refactored Hardware.py
* use contextlib instead of try
* use dataclass instead of userdict
* simplyfied sweep worker
* fixed calibration data loading
* explicit import of scipy functions - may fix #555
pull/557/head
Holger Müller 2022-10-06 18:15:59 +02:00 zatwierdzone przez GitHub
rodzic 114b815c72
commit 7b9dd5ab0a
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
19 zmienionych plików z 356 dodań i 339 usunięć

Wyświetl plik

@ -1,4 +1,4 @@
name: Linux Release
name: Modern Linux Release
on:
push:
@ -33,5 +33,5 @@ jobs:
- name: Archive production artifacts
uses: actions/upload-artifact@v1
with:
name: NanoVNASaver.linux
name: NanoVNASaver.linux_modern
path: dist/nanovna-saver

Wyświetl plik

@ -19,8 +19,8 @@
import logging
from PyQt5 import QtWidgets
import scipy
import numpy as np
from scipy.signal import find_peaks, peak_prominences
from NanoVNASaver.Analysis.Base import QHLine
from NanoVNASaver.Analysis.SimplePeakSearchAnalysis import (
@ -60,17 +60,17 @@ class PeakSearchAnalysis(SimplePeakSearchAnalysis):
inverted = False
if self.button['peak_l'].isChecked():
inverted = True
peaks, _ = scipy.signal.find_peaks(
peaks, _ = find_peaks(
-np.array(data), width=3, distance=3, prominence=1)
else:
self.button['peak_h'].setChecked(True)
peaks, _ = scipy.signal.find_peaks(
peaks, _ = find_peaks(
data, width=3, distance=3, prominence=1)
# Having found the peaks, get the prominence data
for i, p in np.ndenumerate(peaks):
logger.debug("Peak %i at %d", i, p)
prominences = scipy.signal.peak_prominences(data, peaks)[0]
prominences = peak_prominences(data, peaks)[0]
logger.debug("%d prominences", len(prominences))
# Find the peaks with the most extreme values

Wyświetl plik

@ -21,7 +21,7 @@ import math
from typing import Callable, List, Tuple
import numpy as np
import scipy
from scipy.signal import find_peaks
from NanoVNASaver.RFTools import Datapoint
@ -60,7 +60,7 @@ def maxima(data: List[float], threshold: float = 0.0) -> List[int]:
Returns:
List[int]: indices of maxima
"""
peaks = scipy.signal.find_peaks(
peaks = find_peaks(
data, width=2, distance=3, prominence=1)[0].tolist()
return [
i for i in peaks if data[i] > threshold
@ -76,7 +76,7 @@ def minima(data: List[float], threshold: float = 0.0) -> List[int]:
Returns:
List[int]: indices of minima
"""
bottoms = scipy.signal.find_peaks(
bottoms = find_peaks(
-np.array(data), width=2, distance=3, prominence=1)[0].tolist()
return [
i for i in bottoms if data[i] < threshold

Wyświetl plik

@ -22,21 +22,51 @@ import math
import os
import re
from collections import defaultdict, UserDict
from dataclasses import dataclass
from typing import List
from scipy.interpolate import interp1d
from NanoVNASaver.RFTools import Datapoint
RXP_CAL_LINE = re.compile(r"""^\s*
(?P<freq>\d+) \s+
(?P<shortr>[-0-9Ee.]+) \s+ (?P<shorti>[-0-9Ee.]+) \s+
(?P<openr>[-0-9Ee.]+) \s+ (?P<openi>[-0-9Ee.]+) \s+
(?P<loadr>[-0-9Ee.]+) \s+ (?P<loadi>[-0-9Ee.]+)(?: \s
(?P<throughr>[-0-9Ee.]+) \s+ (?P<throughi>[-0-9Ee.]+) \s+
(?P<thrureflr>[-0-9Ee.]+) \s+ (?P<thrurefli>[-0-9Ee.]+) \s+
(?P<isolationr>[-0-9Ee.]+) \s+ (?P<isolationi>[-0-9Ee.]+)
)?
IDEAL_SHORT = complex(-1, 0)
IDEAL_OPEN = complex(1, 0)
IDEAL_LOAD = complex(0, 0)
IDEAL_THROUGH = complex(1, 0)
RXP_CAL_LINE = {
"short": re.compile(r"""
^ \s*
(?P<freq>\d+) \s+
(?P<shortr>[-0-9Ee.]+) \s+ (?P<shorti>[-0-9Ee.]+) \s+
(?P<openr>[-0-9Ee.]+) \s+ (?P<openi>[-0-9Ee.]+) \s+
(?P<loadr>[-0-9Ee.]+) \s+ (?P<loadi>[-0-9Ee.]+)
( \s+ # optional for backword compatibility
(?P<throughr>[-0-9Ee.]+) \s+ (?P<throughi>[-0-9Ee.]+) \s+
(?P<isolationr>[-0-9Ee.]+) \s+ (?P<isolationi>[-0-9Ee.]+)
)? \s* $
""", re.VERBOSE),
"long": re.compile(r"""
^ \s*
(?P<freq>\d+) \s+
(?P<shortr>[-0-9Ee.]+) \s+ (?P<shorti>[-0-9Ee.]+) \s+
(?P<openr>[-0-9Ee.]+) \s+ (?P<openi>[-0-9Ee.]+) \s+
(?P<loadr>[-0-9Ee.]+) \s+ (?P<loadi>[-0-9Ee.]+) \s+
(?P<throughr>[-0-9Ee.]+) \s+ (?P<throughi>[-0-9Ee.]+) \s+
(?P<thrureflr>[-0-9Ee.]+) \s+ (?P<thrurefli>[-0-9Ee.]+) \s+
(?P<isolationr>[-0-9Ee.]+) \s+ (?P<isolationi>[-0-9Ee.]+)
\s* $
""", re.VERBOSE),
}
RXP_CAL_HEADER = re.compile(r"""
^ \# \s+ Hz \s+
ShortR \s+ ShortI \s+ OpenR \s+ OpenI \s+
LoadR \s+ LoadI \s+ ThroughR \s+ ThroughI \s+
(?P<t_refl>ThrureflR \s+ ThrureflI \s+)? IsolationR \s+ IsolationI \s*
$
""", re.VERBOSE)
logger = logging.getLogger(__name__)
@ -49,57 +79,84 @@ def correct_delay(d: Datapoint, delay: float, reflect: bool = False):
return Datapoint(d.freq, corr_data.real, corr_data.imag)
class CalData(UserDict):
def __init__(self):
data = {
"short": None,
"open": None,
"load": None,
"through": None,
"thrurefl": None,
"isolation": None,
# the frequence
"freq": 0,
# 1 Port
"e00": 0.0, # Directivity
"e11": 0.0, # Port1 match
"delta_e": 0.0, # Tracking
"e10e01": 0.0, # Forward Reflection Tracking
# 2 port
"e30": 0.0, # Forward isolation
"e22": 0.0, # Port2 match
"e10e32": 0.0, # Forward transmission
}
super().__init__(data)
@dataclass
class CalData:
# pylint: disable=too-many-instance-attributes
short: complex = complex(0.0, 0.0)
open: complex = complex(0.0, 0.0)
load: complex = complex(0.0, 0.0)
through: complex = complex(0.0, 0.0)
thrurefl: complex = complex(0.0, 0.0)
isolation: complex = complex(0.0, 0.0)
freq: int = 0
e00: float = 0.0 # Directivity
e11: float = 0.0 # Port1 match
delta_e: float = 0.0 # Tracking
e10e01: float = 0.0 # Forward Reflection Tracking
# 2 port
e30: float = 0.0 # Forward isolation
e22: float = 0.0 # Port2 match
e10e32: float = 0.0 # Forward transmission
def __str__(self):
d = self.data
s = (f'{d["freq"]}'
f' {d["short"].re} {d["short"].im}'
f' {d["open"].re} {d["open"].im}'
f' {d["load"].re} {d["load"].im}')
if d["through"] is not None:
s += (f' {d["through"].re} {d["through"].im}'
f' {d["thrurefl"].re} {d["thrurefl"].im}'
f' {d["isolation"].re} {d["isolation"].im}')
return s
return (
f'{self.freq}'
f' {self.short.real} {self.short.imag}'
f' {self.open.real} {self.open.imag}'
f' {self.load.real} {self.load.imag}' + (
f' {self.through.real} {self.through.imag}'
f' {self.thrurefl.real} {self.thrurefl.imag}'
f' {self.isolation.real} {self.isolation.imag}'
if self.through else ''
)
)
class CalDataSet:
@dataclass
class CalElement:
# pylint: disable=too-many-instance-attributes
short_is_ideal: bool = True
short_l0: float = 5.7e-12
short_l1: float = -8.96e-20
short_l2: float = -1.1e-29
short_l3: float = -4.12e-37
short_length: float = -34.2 # ps
open_is_ideal: bool = True
open_c0: float = 2.1e-14
open_c1: float = 5.67e-23
open_c2: float = -2.39e-31
open_c3: float = 2.0e-40
open_length: float = 0.0
load_is_ideal: bool = True
load_r: float = 50.0
load_l: float = 0.0
load_c: float = 0.0
load_length: float = 0.0
through_is_ideal: bool = True
through_length: float = 0.0
class CalDataSet(UserDict):
def __init__(self):
self.data = defaultdict(CalData)
super().__init__()
self.data: defaultdict[int, CalData] = defaultdict(CalData)
def insert(self, name: str, dp: Datapoint):
if name not in self.data[dp.freq]:
if name not in {'short', 'open', 'load',
'through', 'thrurefl', 'isolation'}:
raise KeyError(name)
self.data[dp.freq]["freq"] = dp.freq
self.data[dp.freq][name] = dp
freq = dp.freq
setattr(self.data[freq], name, (dp.z))
self.data[freq].freq = freq
def frequencies(self) -> List[int]:
return sorted(self.data.keys())
def get(self, freq: int) -> CalData:
return self.data[freq]
def get(self, key: int, default: CalData = None) -> CalData:
return self.data.get(key, default)
def items(self):
yield from self.data.items()
@ -109,63 +166,32 @@ class CalDataSet:
yield self.get(freq)
def size_of(self, name: str) -> int:
return len([v for v in self.data.values() if v[name] is not None])
return len(
[True for val in self.data.values() if getattr(val, name)]
)
def complete1port(self) -> bool:
for val in self.data.values():
for name in ("short", "open", "load"):
if val[name] is None:
return False
if not all((val.short, val.open, val.load)):
return False
return any(self.data)
def complete2port(self) -> bool:
if not self.complete1port():
return False
for val in self.data.values():
for name in ("short", "open", "load", "through", "thrurefl",
"isolation"):
if val[name] is None:
return False
if not all((val.through, val.thrurefl, val.isolation)):
return False
return any(self.data)
class Calibration:
CAL_NAMES = ("short", "open", "load", "through", "thrurefl", "isolation",)
IDEAL_SHORT = complex(-1, 0)
IDEAL_OPEN = complex(1, 0)
IDEAL_LOAD = complex(0, 0)
def __init__(self):
self.notes = []
self.dataset = CalDataSet()
self.cal_element = CalElement()
self.interp = {}
self.useIdealShort = True
self.shortL0 = 5.7 * 10E-12
self.shortL1 = -8960 * 10E-24
self.shortL2 = -1100 * 10E-33
self.shortL3 = -41200 * 10E-42
self.shortLength = -34.2 # Picoseconfrequenciesds
# These numbers look very large, considering what Keysight
# suggests their numbers are.
self.useIdealOpen = True
# Subtract 50fF for the nanoVNA calibration if nanoVNA is
# calibrated?
self.openC0 = 2.1 * 10E-14
self.openC1 = 5.67 * 10E-23
self.openC2 = -2.39 * 10E-31
self.openC3 = 2.0 * 10E-40
self.openLength = 0
self.useIdealLoad = True
self.loadR = 25
self.loadL = 0
self.loadC = 0
self.loadLength = 0
self.useIdealThrough = True
self.throughLength = 0
self.isCalculated = False
self.source = "Manual"
@ -191,37 +217,37 @@ class Calibration:
g2 = self.gamma_open(freq)
g3 = self.gamma_load(freq)
gm1 = cal["short"].z
gm2 = cal["open"].z
gm3 = cal["load"].z
gm1 = cal.short
gm2 = cal.open
gm3 = cal.load
denominator = (g1 * (g2 - g3) * gm1 +
g2 * g3 * gm2 - g2 * g3 * gm3 -
(g2 * gm2 - g3 * gm3) * g1)
cal["e00"] = - ((g2 * gm3 - g3 * gm3) * g1 * gm2 -
(g2 * g3 * gm2 - g2 * g3 * gm3 -
cal.e00 = - ((g2 * gm3 - g3 * gm3) * g1 * gm2 -
(g2 * g3 * gm2 - g2 * g3 * gm3 -
(g3 * gm2 - g2 * gm3) * g1) * gm1
) / denominator
cal["e11"] = ((g2 - g3) * gm1 - g1 * (gm2 - gm3) +
g3 * gm2 - g2 * gm3) / denominator
cal["delta_e"] = - ((g1 * (gm2 - gm3) - g2 * gm2 + g3 *
gm3) * gm1 + (g2 * gm3 - g3 * gm3) *
gm2) / denominator
) / denominator
cal.e11 = ((g2 - g3) * gm1 - g1 * (gm2 - gm3) +
g3 * gm2 - g2 * gm3) / denominator
cal.delta_e = - ((g1 * (gm2 - gm3) - g2 * gm2 + g3 *
gm3) * gm1 + (g2 * gm3 - g3 * gm3) *
gm2) / denominator
def _calc_port_2(self, freq: int, cal: CalData):
gt = self.gamma_through(freq)
gm4 = cal["through"].z
gm5 = cal["thrurefl"].z
gm6 = cal["isolation"].z
gm7 = gm5 - cal["e00"]
gm4 = cal.through
gm5 = cal.thrurefl
gm6 = cal.isolation
gm7 = gm5 - cal.e00
cal["e30"] = cal["isolation"].z
cal["e10e01"] = cal["e00"] * cal["e11"] - cal["delta_e"]
cal["e22"] = gm7 / (
gm7 * cal["e11"] * gt ** 2 + cal["e10e01"] * gt ** 2)
cal["e10e32"] = (gm4 - gm6) * (
1 - cal["e11"] * cal["e22"] * gt ** 2) / gt
cal.e30 = cal.isolation
cal.e10e01 = cal.e00 * cal.e11 - cal.delta_e
cal.e22 = gm7 / (
gm7 * cal.e11 * gt ** 2 + cal.e10e01 * gt ** 2)
cal.e10e32 = (gm4 - gm6) * (
1 - cal.e11 * cal.e22 * gt ** 2) / gt
def calc_corrections(self):
if not self.isValid1Port():
@ -251,69 +277,57 @@ class Calibration:
logger.debug("Calibration correctly calculated.")
def gamma_short(self, freq: int) -> complex:
g = Calibration.IDEAL_SHORT
if not self.useIdealShort:
logger.debug("Using short calibration set values.")
Zsp = complex(0, 2 * math.pi * freq * (
self.shortL0 + self.shortL1 * freq +
self.shortL2 * freq ** 2 + self.shortL3 * freq ** 3))
# Referencing https://arxiv.org/pdf/1606.02446.pdf (18) - (21)
g = (Zsp / 50 - 1) / (Zsp / 50 + 1) * cmath.exp(
complex(0, 2 * math.pi * 2 * freq * self.shortLength * -1))
return g
if self.cal_element.short_is_ideal:
return IDEAL_SHORT
logger.debug("Using short calibration set values.")
cal_element = self.cal_element
Zsp = complex(0.0, 2.0 * math.pi * freq * (
cal_element.short_l0 + cal_element.short_l1 * freq +
cal_element.short_l2 * freq**2 + cal_element.short_l3 * freq**3))
# Referencing https://arxiv.org/pdf/1606.02446.pdf (18) - (21)
return (Zsp / 50.0 - 1.0) / (Zsp / 50.0 + 1.0) * cmath.exp(
complex(0.0,
-4.0 * math.pi * freq * cal_element.short_length))
def gamma_open(self, freq: int) -> complex:
g = Calibration.IDEAL_OPEN
if not self.useIdealOpen:
logger.debug("Using open calibration set values.")
Zop = complex(0, 2 * math.pi * freq * (
self.openC0 + self.openC1 * freq +
self.openC2 * freq ** 2 + self.openC3 * freq ** 3))
g = ((1 - 50 * Zop) / (1 + 50 * Zop)) * cmath.exp(
complex(0, 2 * math.pi * 2 * freq * self.openLength * -1))
return g
if self.cal_element.open_is_ideal:
return IDEAL_OPEN
logger.debug("Using open calibration set values.")
cal_element = self.cal_element
Zop = complex(0.0, 2.0 * math.pi * freq * (
cal_element.open_c0 + cal_element.open_c1 * freq +
cal_element.open_c2 * freq**2 + cal_element.open_c3 * freq**3))
return ((1.0 - 50.0 * Zop) / (1.0 + 50.0 * Zop)) * cmath.exp(
complex(0.0,
-4.0 * math.pi * freq * cal_element.open_length))
def gamma_load(self, freq: int) -> complex:
g = Calibration.IDEAL_LOAD
if not self.useIdealLoad:
logger.debug("Using load calibration set values.")
Zl = complex(self.loadR, 0)
if self.loadC > 0:
Zl = self.loadR / \
complex(1, 2 * self.loadR * math.pi * freq * self.loadC)
if self.loadL > 0:
Zl = Zl + complex(0, 2 * math.pi * freq * self.loadL)
g = (Zl / 50 - 1) / (Zl / 50 + 1) * cmath.exp(
complex(0, 2 * math.pi * 2 * freq * self.loadLength * -1))
return g
if self.cal_element.load_is_ideal:
return IDEAL_LOAD
logger.debug("Using load calibration set values.")
cal_element = self.cal_element
Zl = complex(cal_element.load_r, 0.0)
if cal_element.load_c > 0.0:
Zl = cal_element.load_r / complex(
1.0,
2.0 * cal_element.load_r * math.pi * freq * cal_element.load_c)
if cal_element.load_l > 0.0:
Zl = Zl + complex(0.0, 2 * math.pi * freq * cal_element.load_l)
return (Zl / 50.0 - 1.0) / (Zl / 50.0 + 1.0) * cmath.exp(
complex(0.0, -4 * math.pi * freq * cal_element.load_length))
def gamma_through(self, freq: int) -> complex:
g = complex(1, 0)
if not self.useIdealThrough:
logger.debug("Using through calibration set values.")
g = cmath.exp(complex(0, 1) * 2 * math.pi *
self.throughLength * freq * -1)
return g
if self.cal_element.through_is_ideal:
return IDEAL_THROUGH
logger.debug("Using through calibration set values.")
cal_element = self.cal_element
return cmath.exp(
complex(0.0, -2.0 * math.pi * cal_element.through_length * freq))
def gen_interpolation(self):
freq = []
e00 = []
e11 = []
delta_e = []
e10e01 = []
e30 = []
e22 = []
e10e32 = []
for caldata in self.dataset.values():
freq.append(caldata["freq"])
e00.append(caldata["e00"])
e11.append(caldata["e11"])
delta_e.append(caldata["delta_e"])
e10e01.append(caldata["e10e01"])
e30.append(caldata["e30"])
e22.append(caldata["e22"])
e10e32.append(caldata["e10e32"])
(freq, e00, e11, delta_e, e10e01, e30, e22, e10e32) = zip(*[
(c.freq, c.e00, c.e11, c.delta_e, c.e10e01, c.e30, c.e22, c.e10e32)
for c in self.dataset.values()])
self.interp = {
"e00": interp1d(freq, e00,
@ -349,14 +363,14 @@ class Calibration:
i = self.interp
s21 = (dp.z - i["e30"](dp.freq)) / i["e10e32"](dp.freq)
s21 = s21 * (i["e10e01"](dp.freq) / (i["e11"](dp.freq)
* dp11.z - i["delta_e"](dp.freq)))
* dp11.z - i["delta_e"](dp.freq)))
return Datapoint(dp.freq, s21.real, s21.imag)
# TODO: implement tests
def save(self, filename: str):
# Save the calibration data to file
if not self.isValid1Port():
raise ValueError("Not a valid 1-Port calibration")
raise ValueError("Not a valid calibration")
with open(filename, mode="w", encoding='utf-8') as calfile:
calfile.write("# Calibration data for NanoVNA-Saver\n")
for note in self.notes:
@ -369,13 +383,21 @@ class Calibration:
calfile.write(f"{self.dataset.get(freq)}\n")
# TODO: implement tests
# TODO: Exception should be catched by caller
def load(self, filename):
self.source = os.path.basename(filename)
self.dataset = CalDataSet()
self.notes = []
parsed_header = False
header = ""
cols = {
"": (),
"sol": ("short", "open", "load"),
"short": ("short", "open", "load",
"through", "isolation"),
"long": ("short", "open", "load",
"through", "thrurefl", "isolation"),
}
with open(filename, encoding='utf-8') as calfile:
for i, line in enumerate(calfile):
line = line.strip()
@ -383,26 +405,29 @@ class Calibration:
note = line[2:]
self.notes.append(note)
continue
if line.startswith("#"):
if not parsed_header and line == (
"# Hz ShortR ShortI OpenR OpenI LoadR LoadI"
" ThroughR ThroughI ThrureflR ThrureflI"
" IsolationR IsolationI"):
parsed_header = True
if m := RXP_CAL_HEADER.search(line):
header = "long" if m.group(1) else "short"
columns = cols[header]
logger.debug("found %s header type", header)
continue
if not parsed_header:
if line.startswith("#"):
continue
if not header:
logger.warning(
"Warning: Read line without having read header: %s",
line)
continue
m = RXP_CAL_LINE.search(line)
m = RXP_CAL_LINE[header].search(line)
if not m:
logger.warning("Illegal data in cal file. Line %i", i)
logger.warning("Illegal data in cal file. Line %i", i + 1)
continue
if (header == "short" and not m.group(8) and
columns != cols["sol"]):
logger.debug("only SOL cal data")
columns = cols["sol"]
cal = m.groupdict()
nr_cals = 6 if cal["throughr"] else 3
for name in Calibration.CAL_NAMES[:nr_cals]:
for name in columns:
self.dataset.insert(
name,
Datapoint(int(cal["freq"]),

Wyświetl plik

@ -26,7 +26,7 @@ from PyQt5.QtCore import pyqtSignal
from NanoVNASaver import Defaults
from NanoVNASaver.RFTools import Datapoint
from NanoVNASaver.Marker import Marker
from NanoVNASaver.Marker.Widget import Marker
logger = logging.getLogger(__name__)

Wyświetl plik

@ -22,7 +22,7 @@ from typing import List
from PyQt5 import QtGui
from NanoVNASaver.Marker import Marker
from NanoVNASaver.Marker.Widget import Marker
from NanoVNASaver.RFTools import Datapoint
from NanoVNASaver.SITools import Format, Value
from NanoVNASaver.Charts.Chart import Chart

Wyświetl plik

@ -23,7 +23,7 @@ from typing import List, Optional
from PyQt5 import QtWidgets, QtGui
from NanoVNASaver.Formatting import format_frequency_chart
from NanoVNASaver.Marker import Marker
from NanoVNASaver.Marker.Widget import Marker
from NanoVNASaver.RFTools import Datapoint
from NanoVNASaver.SITools import Format, Value

Wyświetl plik

@ -22,7 +22,7 @@ from PyQt5 import QtWidgets, QtCore
from PyQt5.QtWidgets import QCheckBox
from NanoVNASaver import Defaults
from NanoVNASaver.Marker import Marker
from NanoVNASaver.Marker.Widget import Marker
from NanoVNASaver.Controls.Control import Control
logger = logging.getLogger(__name__)

Wyświetl plik

@ -24,6 +24,7 @@ from typing import List
import serial
from serial.tools import list_ports
from serial.tools.list_ports_common import ListPortInfo
from NanoVNASaver.Hardware.VNA import VNA
from NanoVNASaver.Hardware.AVNA import AVNA
@ -74,29 +75,50 @@ def _fix_v2_hwinfo(dev):
return dev
def usb_typename(device: ListPortInfo) -> str:
return next((t.name for t in USBDEVICETYPES if
device.vid == t.vid and device.pid == t.pid),
"")
# Get list of interfaces with VNAs connected
def get_interfaces() -> List[Interface]:
interfaces = []
# serial like usb interfaces
for d in list_ports.comports():
if platform.system() == 'Windows' and d.vid is None:
d = _fix_v2_hwinfo(d)
for t in USBDEVICETYPES:
if d.vid != t.vid or d.pid != t.pid:
continue
logger.debug("Found %s USB:(%04x:%04x) on port %s",
t.name, d.vid, d.pid, d.device)
iface = Interface('serial', t.name)
iface.port = d.device
iface.open()
iface.comment = get_comment(iface)
iface.close()
interfaces.append(iface)
if not (typename := usb_typename(d)):
continue
logger.debug("Found %s USB:(%04x:%04x) on port %s",
typename, d.vid, d.pid, d.device)
iface = Interface('serial', typename)
iface.port = d.device
iface.open()
iface.comment = get_comment(iface)
iface.close()
interfaces.append(iface)
logger.debug("Interfaces: %s", interfaces)
return interfaces
def get_portinfos() -> List[str]:
portinfos = []
# serial like usb interfaces
for d in list_ports.comports():
logger.debug("Found USB:(%04x:%04x) on port %s",
d.vid, d.pid, d.device)
iface = Interface('serial', "DEBUG")
iface.port = d.device
iface.open()
version = detect_version(iface)
iface.close()
portinfos.append(version)
return portinfos
def get_VNA(iface: Interface) -> VNA:
# serial_port.timeout = TIMEOUT
return NAME2DEVICE[iface.comment](iface)

Wyświetl plik

@ -35,7 +35,7 @@ from NanoVNASaver.Formatting import (
format_wavelength,
)
from .Widget import Marker
from NanoVNASaver.Marker.Widget import Marker
class DeltaMarker(Marker):

Wyświetl plik

@ -1,3 +0,0 @@
from .Widget import Marker
from .Delta import DeltaMarker
from .Values import Value, default_label_ids

Wyświetl plik

@ -16,6 +16,7 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import contextlib
import logging
import sys
import threading
@ -46,9 +47,11 @@ from .Charts import (
SmithChart, SParameterChart, TDRChart,
)
from .Calibration import Calibration
from .Marker import Marker, DeltaMarker
from .Marker.Widget import Marker
from .Marker.Delta import DeltaMarker
from .SweepWorker import SweepWorker
from .Settings import BandsModel, Sweep
from .Settings.Bands import BandsModel
from .Settings.Sweep import Sweep
from .Touchstone import Touchstone
from .About import VERSION
@ -100,10 +103,7 @@ class NanoVNASaver(QtWidgets.QWidget):
self.bands = BandsModel()
self.interface = Interface("serial", "None")
try:
self.vna = VNA(self.interface)
except IOError as exc:
self.showError(f"{exc}\n\nPlease try reconnect")
self.vna = VNA(self.interface)
self.dataLock = threading.Lock()
self.data = Touchstone()
@ -490,10 +490,8 @@ class NanoVNASaver(QtWidgets.QWidget):
else:
self.delta_marker.set_markers(m1, m2)
self.delta_marker.resetLabels()
try:
with contextlib.suppress(IndexError):
self.delta_marker.updateLabels()
except IndexError:
pass
def dataUpdated(self):
with self.dataLock:
@ -571,11 +569,7 @@ class NanoVNASaver(QtWidgets.QWidget):
self.btnResetReference.setDisabled(False)
if source is not None:
# Save the reference source info
self.referenceSource = source
else:
self.referenceSource = self.sweepSource
self.referenceSource = source or self.sweepSource
self.updateTitle()
def updateTitle(self):
@ -589,7 +583,7 @@ class NanoVNASaver(QtWidgets.QWidget):
f"Reference: {self.referenceSource} @"
f" {len(self.ref_data.s11)} points")
insert += ")"
title = f"{self.baseTitle} {insert if insert else ''}"
title = f"{self.baseTitle} {insert or ''}"
self.setWindowTitle(title)
def resetReference(self):
@ -612,11 +606,9 @@ class NanoVNASaver(QtWidgets.QWidget):
def showSweepError(self):
self.showError(self.worker.error_message)
try:
with contextlib.suppress(IOError):
self.vna.flushSerialBuffers() # Remove any left-over data
self.vna.reconnect() # try reconnection
except IOError:
pass
self.vna.reconnect() # try reconnection
self.sweepFinished()
def popoutChart(self, chart: Chart):

Wyświetl plik

@ -84,7 +84,7 @@ class Value:
def __str__(self) -> str:
fmt = self.fmt
if math.isnan(self._value):
return (f"-{fmt.space_str}{self._unit}")
return f"-{fmt.space_str}{self._unit}"
if (fmt.assume_infinity and
abs(self._value) >= 10 ** ((fmt.max_offset + 1) * 3)):
return (("-" if self._value < 0 else "") +

Wyświetl plik

@ -1,2 +0,0 @@
from .Bands import BandsModel
from .Sweep import Sweep

Wyświetl plik

@ -74,57 +74,34 @@ class SweepWorker(QtCore.QRunnable):
self.offsetDelay = 0
@pyqtSlot()
def run(self):
def run(self) -> None:
try:
self._run()
except BaseException as exc: # pylint: disable=broad-except
logger.exception("%s", exc)
self.gui_error(f"ERROR during sweep\n\nStopped\n\n{exc}")
return
# raise exc
if logger.isEnabledFor(logging.DEBUG):
raise exc
def _run(self):
def _run(self) -> None:
logger.info("Initializing SweepWorker")
self.running = True
self.percentage = 0
if not self.app.vna.connected():
logger.debug(
"Attempted to run without being connected to the NanoVNA")
self.running = False
return
self.running = True
self.percentage = 0
with self.app.sweep.lock:
sweep = self.app.sweep.copy()
averages = 1
if sweep.properties.mode == SweepMode.AVERAGE:
averages = sweep.properties.averages[0]
logger.info("%d averages", averages)
if sweep != self.sweep: # parameters changed
self.sweep = sweep
self.init_data()
while True:
for i in range(sweep.segments):
logger.debug("Sweep segment no %d", i)
if self.stopped:
logger.debug("Stopping sweeping as signalled")
break
start, stop = sweep.get_index_range(i)
try:
freq, values11, values21 = self.readAveragedSegment(
start, stop, averages)
self.percentage = (i + 1) * 100 / sweep.segments
self.updateData(freq, values11, values21, i)
except ValueError as e:
self.gui_error(str(e))
else:
if sweep.properties.mode == SweepMode.CONTINOUS:
continue
break
self._run_loop()
if sweep.segments > 1:
start = sweep.start
@ -138,6 +115,28 @@ class SweepWorker(QtCore.QRunnable):
self.signals.finished.emit()
self.running = False
def _run_loop(self) -> None:
sweep = self.sweep
averages = (sweep.properties.averages[0]
if sweep.properties.mode == SweepMode.AVERAGE
else 1)
logger.info("%d averages", averages)
while True:
for i in range(sweep.segments):
logger.debug("Sweep segment no %d", i)
if self.stopped:
logger.debug("Stopping sweeping as signalled")
break
start, stop = sweep.get_index_range(i)
freq, values11, values21 = self.readAveragedSegment(
start, stop, averages)
self.percentage = (i + 1) * 100 / sweep.segments
self.updateData(freq, values11, values21, i)
if sweep.properties.mode != SweepMode.CONTINOUS:
break
def init_data(self):
self.data11 = []
self.data21 = []
@ -156,16 +155,11 @@ class SweepWorker(QtCore.QRunnable):
"Calculating data and inserting in existing data at index %d",
index)
offset = self.sweep.points * index
v11 = values11[:]
v21 = values21[:]
raw_data11 = []
raw_data21 = []
for freq in frequencies:
real11, imag11 = v11.pop(0)
real21, imag21 = v21.pop(0)
raw_data11.append(Datapoint(freq, real11, imag11))
raw_data21.append(Datapoint(freq, real21, imag21))
raw_data11 = [Datapoint(freq, values11[i][0], values11[i][1])
for i, freq in enumerate(frequencies)]
raw_data21 = [Datapoint(freq, values21[i][0], values21[i][1])
for i, freq in enumerate(frequencies)]
data11, data21 = self.applyCalibration(raw_data11, raw_data21)
logger.debug("update Freqs: %s, Offset: %s", len(frequencies), offset)

Wyświetl plik

@ -70,7 +70,8 @@ class CalibrationWindow(QtWidgets.QWidget):
calibration_control_group)
cal_btn = {}
self.cal_label = {}
for label_name in Calibration.CAL_NAMES:
for label_name in ("short", "open", "load",
"through", "thrurefl", "isolation"):
self.cal_label[label_name] = QtWidgets.QLabel("Uncalibrated")
cal_btn[label_name] = QtWidgets.QPushButton(
label_name.capitalize())
@ -497,77 +498,60 @@ class CalibrationWindow(QtWidgets.QWidget):
self.app.worker.signals.updated.emit()
def calculate(self):
def _warn_ideal(cal_type: str) -> str:
return (
'Invalid data for "{cal_type}" calibration standard.'
' Using ideal values.')
cal_element = self.app.calibration.cal_element
if self.app.sweep_control.btn_stop.isEnabled():
# Currently sweeping
self.app.showError(
"Unable to apply calibration while a sweep is running."
" Please stop the sweep and try again.")
return
if self.use_ideal_values.isChecked():
self.app.calibration.useIdealShort = True
self.app.calibration.useIdealOpen = True
self.app.calibration.useIdealLoad = True
self.app.calibration.useIdealThrough = True
else:
cal_element.short_is_ideal = True
cal_element.open_is_ideal = True
cal_element.load_is_ideal = True
cal_element.throuh_is_ideal = True
# TODO: all ideal or not?
if not self.use_ideal_values.isChecked():
cal_element.short_is_ideal = False
cal_element.open_is_ideal = False
cal_element.load_is_ideal = False
cal_element.throuh_is_ideal = False
# We are using custom calibration standards
try:
self.app.calibration.shortL0 = self.getFloatValue(
self.short_l0_input.text()) / 10 ** 12
self.app.calibration.shortL1 = self.getFloatValue(
self.short_l1_input.text()) / 10 ** 24
self.app.calibration.shortL2 = self.getFloatValue(
self.short_l2_input.text()) / 10 ** 33
self.app.calibration.shortL3 = self.getFloatValue(
self.short_l3_input.text()) / 10 ** 42
self.app.calibration.shortLength = self.getFloatValue(
self.short_length.text()) / 10 ** 12
self.app.calibration.useIdealShort = False
except ValueError:
self.app.calibration.useIdealShort = True
logger.warning(_warn_ideal("short"))
try:
self.app.calibration.openC0 = self.getFloatValue(
self.open_c0_input.text()) / 10 ** 15
self.app.calibration.openC1 = self.getFloatValue(
self.open_c1_input.text()) / 10 ** 27
self.app.calibration.openC2 = self.getFloatValue(
self.open_c2_input.text()) / 10 ** 36
self.app.calibration.openC3 = self.getFloatValue(
self.open_c3_input.text()) / 10 ** 45
self.app.calibration.openLength = self.getFloatValue(
self.open_length.text()) / 10 ** 12
self.app.calibration.useIdealOpen = False
except ValueError:
self.app.calibration.useIdealOpen = True
logger.warning(_warn_ideal("open"))
cal_element.short_l0 = self.getFloatValue(
self.short_l0_input.text()) / 1.0e12
cal_element.short_l1 = self.getFloatValue(
self.short_l1_input.text()) / 1.0e24
cal_element.short_l2 = self.getFloatValue(
self.short_l2_input.text()) / 1.0e33
cal_element.short_l3 = self.getFloatValue(
self.short_l3_input.text()) / 1.0e42
cal_element.short_length = self.getFloatValue(
self.short_length.text()) / 1.0e12
try:
self.app.calibration.loadR = self.getFloatValue(
self.load_resistance.text())
self.app.calibration.loadL = self.getFloatValue(
self.load_inductance.text()) / 10 ** 12
self.app.calibration.loadC = self.getFloatValue(
self.load_capacitance.text()) / 10 ** 15
self.app.calibration.loadLength = self.getFloatValue(
self.load_length.text()) / 10 ** 12
self.app.calibration.useIdealLoad = False
except ValueError:
self.app.calibration.useIdealLoad = True
logger.warning(_warn_ideal("load"))
cal_element.open_c0 = self.getFloatValue(
self.open_c0_input.text()) / 1.e15
cal_element.open_c1 = self.getFloatValue(
self.open_c1_input.text()) / 1.e27
cal_element.open_c2 = self.getFloatValue(
self.open_c2_input.text()) / 1.0e36
cal_element.open_c3 = self.getFloatValue(
self.open_c3_input.text()) / 1.0e45
cal_element.openLength = self.getFloatValue(
self.open_length.text()) / 1.0e12
try:
self.app.calibration.throughLength = self.getFloatValue(
self.through_length.text()) / 10 ** 12
self.app.calibration.useIdealThrough = False
except ValueError:
self.app.calibration.useIdealThrough = True
logger.warning(_warn_ideal("through"))
cal_element.load_r = self.getFloatValue(
self.load_resistance.text())
cal_element.load_l = self.getFloatValue(
self.load_inductance.text()) / 1.0e12
cal_element.load_c = self.getFloatValue(
self.load_capacitance.text()) / 1.0e15
cal_element.load_length = self.getFloatValue(
self.load_length.text()) / 1.0e12
cal_element.through_length = self.getFloatValue(
self.through_length.text()) / 1.0e12
logger.debug("Attempting calibration calculation.")
try:
@ -594,6 +578,8 @@ class CalibrationWindow(QtWidgets.QWidget):
self.app.worker.data21, self.app.sweepSource)
self.app.worker.signals.updated.emit()
except ValueError as e:
if logger.isEnabledFor(logging.DEBUG):
raise
# showError here hides the calibration window,
# so we need to pop up our own
QtWidgets.QMessageBox.warning(
@ -604,7 +590,10 @@ class CalibrationWindow(QtWidgets.QWidget):
@staticmethod
def getFloatValue(text: str) -> float:
return float(text) if text else 0.0
try:
return float(text)
except (TypeError, ValueError):
return 0.0
def loadCalibration(self):
filename, _ = QtWidgets.QFileDialog.getOpenFileName(

Wyświetl plik

@ -26,7 +26,7 @@ from NanoVNASaver.Charts.Chart import (
Chart, ChartColors)
from NanoVNASaver.Windows.Bands import BandsWindow
from NanoVNASaver.Windows.MarkerSettings import MarkerSettingsWindow
from NanoVNASaver.Marker import Marker
from NanoVNASaver.Marker.Widget import Marker
logger = logging.getLogger(__name__)

Wyświetl plik

@ -21,7 +21,7 @@ import logging
from PyQt5 import QtWidgets, QtCore, QtGui
from NanoVNASaver.RFTools import Datapoint
from NanoVNASaver.Marker import Marker
from NanoVNASaver.Marker.Widget import Marker
from NanoVNASaver.Marker.Values import TYPES, default_label_ids
logger = logging.getLogger(__name__)

Wyświetl plik

@ -20,7 +20,7 @@ import logging
import math
import numpy as np
import scipy
from scipy.signal import convolve
from scipy.constants import speed_of_light
from PyQt5 import QtWidgets, QtCore
@ -137,7 +137,7 @@ class TDRWindow(QtWidgets.QWidget):
windowed_s11 = window * s11
self.td = np.abs(np.fft.ifft(windowed_s11, FFT_POINTS))
step = np.ones(FFT_POINTS)
step_response = scipy.signal.convolve(self.td, step)
step_response = convolve(self.td, step)
self.step_response_Z = 50 * (
1 + step_response) / (1 - step_response)