Rewrite code so that it can be used headless. This should enable the entire calibration process from before.

pull/697/head
tbergkvist 2024-04-10 11:52:13 +02:00
rodzic 9bbf4180fd
commit 1264a35d11
5 zmienionych plików z 820 dodań i 76 usunięć

Wyświetl plik

@ -18,40 +18,15 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from .Calibration import Calibration
from .Settings.Sweep import SweepMode
from .Sweep import SweepMode
class CalibrationHelpers():# renamed from CalibrationWindow since it is no longer a window.
class CalibrationGuide():# renamed from CalibrationWindow since it is no longer a window.
nextStep = -1
def __init__(self, calibration, settings, touchstone):
def __init__(self, calibration, touchstone, worker):
self.calibration = calibration
self.settings = settings
self.data = touchstone
self.listCalibrationStandards() # The only thing in the init that was worth saving.
def checkExpertUser(self):
if not self.settings.value("ExpertCalibrationUser", False, bool):
response = input(
"""Are you sure? \n
Use of the manual calibration buttons is non-intuitive
and primarily suited for users with very specialized
needs. The buttons do not sweep for you nor do\n
they interact with the NanoVNA calibration.\n\n
If you are trying to do a calibration of the NanoVNA,\n
do so on the device itself instead. If you are trying to\n
do a calibration with NanoVNA-Saver, use the Calibration assistant\n
if possible.\n\n
Are you certain you know what you are doing? (Y/n)
""")
if response.upper() == "Y" or response.upper() == "YES":
self.settings.setValue("ExpertCalibrationUser", True)
return True
return False
return True
self.worker = worker
def cal_save(self, name: str):
if name in {"through", "isolation"}:
@ -60,16 +35,7 @@ class CalibrationHelpers():# renamed from CalibrationWindow since it is no longe
self.calibration.insert(name, self.data.s11)
def manual_save(self, name: str):
if self.checkExpertUser():
self.cal_save(name)
def listCalibrationStandards(self):
num_standards = self.settings.beginReadArray("CalibrationStandards")
for i in range(num_standards):
self.settings.setArrayIndex(i)
name = self.settings.value("Name", defaultValue="INVALID NAME")
print(name)
self.settings.endArray()
self.cal_save(name)
def reset(self):
self.calibration = Calibration()
@ -83,7 +49,6 @@ class CalibrationHelpers():# renamed from CalibrationWindow since it is no longe
self.worker.rawData21,
self.sweepSource,
)
self.worker.signals.updated.emit()
def setOffsetDelay(self, value: float):
if self.verbose:
@ -106,7 +71,6 @@ class CalibrationHelpers():# renamed from CalibrationWindow since it is no longe
self.worker.data21,
self.sweepSource,
)
self.worker.signals.updated.emit()
def calculate(self):
cal_element = self.calibration.cal_element
@ -144,7 +108,6 @@ class CalibrationHelpers():# renamed from CalibrationWindow since it is no longe
self.worker.data21,
self.sweepSource,
)
self.worker.signals.updated.emit()
except ValueError as e:
raise Exception(f"Error applying calibration: {str(e)}\nApplying calibration failed.")
@ -161,7 +124,6 @@ class CalibrationHelpers():# renamed from CalibrationWindow since it is no longe
if i == 2 and not self.calibration.isValid2Port():
break
self.calculate()
self.settings.setValue("CalibrationFile", filename)
def saveCalibration(self, filename):
if not self.calibration.isCalculated:
@ -169,7 +131,6 @@ class CalibrationHelpers():# renamed from CalibrationWindow since it is no longe
try:
self.calibration.save(filename)
self.settings.setValue("CalibrationFile", filename)
return True
except Exception as e:
print("Save failed: ", e)
@ -185,6 +146,7 @@ class CalibrationHelpers():# renamed from CalibrationWindow since it is no longe
Before starting, ensure you have Open, Short and Load\n
standards available and the cables you wish to have\n
calibrated connected to the device.<br><br>\n
Make sure sweep is NOT in continuous mode.\n
If you want a 2-port calibration, also have a through\n
connector on hand.<br><br>\n
<b>The best results are achieved by having the NanoVNA\n
@ -198,9 +160,9 @@ class CalibrationHelpers():# renamed from CalibrationWindow since it is no longe
if not self.vna.connected():
print("NanoVNA not connected.\n\nPlease ensure the NanoVNA is connected before attempting calibration.")
return
if self.sweep.properties.mode == SweepMode.CONTINOUS:
print("Continuous sweep enabled.\n\nPlease disable continuous sweeping before attempting calibration.")
if self.worker.sweep.properties.mode == SweepMode.CONTINOUS:
print("Please disable continuous sweeping before attempting calibration.")
return
response = input("Calibrate short.\n\nPlease connect the short standard to port 0 of the NanoVNA.\n\n Press enter when you are ready to continue. (q to quit).")
@ -211,15 +173,11 @@ class CalibrationHelpers():# renamed from CalibrationWindow since it is no longe
self.reset()
self.calibration.source = "Calibration assistant"
self.nextStep = 0
self.worker.signals.finished.connect(self.automaticCalibrationStep)
self.sweep_start()
return
def automaticCalibrationStep(self):
if self.nextStep == -1:
self.worker.signals.finished.disconnect(
self.automaticCalibrationStep
)
return
if self.nextStep == 0:
@ -235,9 +193,6 @@ class CalibrationHelpers():# renamed from CalibrationWindow since it is no longe
if response.lower() == 'q':
self.nextStep = -1
self.worker.signals.finished.disconnect(
self.automaticCalibrationStep
)
return
self.sweep_start()
return
@ -252,9 +207,6 @@ class CalibrationHelpers():# renamed from CalibrationWindow since it is no longe
if response.lower() == 'q':
self.nextStep = -1
self.worker.signals.finished.disconnect(
self.automaticCalibrationStep
)
return
self.sweep_start()
return
@ -271,15 +223,9 @@ class CalibrationHelpers():# renamed from CalibrationWindow since it is no longe
if response.lower() == 'q':
self.calculate()
self.nextStep = -1
self.worker.signals.finished.disconnect(
self.automaticCalibrationStep
)
return
if response.lower() == 'y' or response.lower() == "yes":
self.nextStep = -1
self.worker.signals.finished.disconnect(
self.automaticCalibrationStep
)
return
response = input("""Calibrate isolation\n
@ -289,9 +235,6 @@ class CalibrationHelpers():# renamed from CalibrationWindow since it is no longe
if response.lower() == 'q':
self.nextStep = -1
self.worker.signals.finished.disconnect(
self.automaticCalibrationStep
)
return
self.sweep_start()
return
@ -308,9 +251,6 @@ class CalibrationHelpers():# renamed from CalibrationWindow since it is no longe
if response.lower() == 'q':
self.btn_automatic.setDisabled(False)
self.nextStep = -1
self.worker.signals.finished.disconnect(
self.automaticCalibrationStep
)
return
self.sweep_start()
return
@ -326,15 +266,9 @@ class CalibrationHelpers():# renamed from CalibrationWindow since it is no longe
if response.lower() == 'q':
self.btn_automatic.setDisabled(False)
self.nextStep = -1
self.worker.signals.finished.disconnect(
self.automaticCalibrationStep
)
return
self.calculate()
self.btn_automatic.setDisabled(False)
self.nextStep = -1
self.worker.signals.finished.disconnect(
self.automaticCalibrationStep
)
return

Wyświetl plik

@ -16,7 +16,6 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
import math
from decimal import Context, Decimal, InvalidOperation
from typing import NamedTuple

168
src/Sweep.py 100644
Wyświetl plik

@ -0,0 +1,168 @@
# NanoVNASaver
#
# A python program to view and export Touchstone data from a NanoVNA
# Copyright (C) 2019, 2020 Rune B. Broberg
# Copyright (C) 2020,2021 NanoVNA-Saver Authors
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from enum import Enum
from math import log
from threading import Lock
from typing import Iterator, NamedTuple
class SweepMode(Enum):
SINGLE = 0
CONTINOUS = 1
AVERAGE = 2
class Properties(NamedTuple):
name: str = ""
mode: "SweepMode" = SweepMode.SINGLE
averages: tuple[int, int] = (3, 0)
logarithmic: bool = False
class Sweep:
def __init__(self,
start: int = 3600000,
end: int = 30000000,
points: int = 101,
segments: int = 1,
properties: "Properties" = Properties(),
verbose=False
):
self._start = start
self._end = end
self._points = points
self._segments = segments
self._properties = properties
self._lock = Lock()
self.check()
self.verbose = verbose
if self.verbose:
print("%s", self)
def __repr__(self) -> str:
return 'Sweep(' + ', '.join(map(str, (
self.start, self.end, self.points, self.segments, self.properties
))) + ')'
def __eq__(self, other) -> bool:
return (self.start == other.start
and self.end == other.end
and self.points == other.points
and self.segments == other.segments
and self.properties == other.properties)
def copy(self) -> "Sweep":
with self._lock:
return Sweep(self.start, self.end, self.points, self.segments,
self._properties)
# Getters for attributes, either private or computed.
@property
def start(self) -> int:
return self._start
@property
def end(self) -> int:
return self._end
@property
def points(self) -> int:
return self._points
@property
def segments(self) -> int:
return self._segments
# Properties are immutable, this does not circumvent the accessors.
@property
def properties(self) -> "Properties":
return self._properties
@property
def span(self) -> int:
return self.end - self.start
@property
def stepsize(self) -> int:
return round(self.span / (self.points * self.segments - 1))
# Setters
def set_points(self, points: int) -> None:
with self._lock:
self._points = points
self.check()
def update(self, start: int, end: int, segments: int, points: int) -> None:
with self._lock:
self._start = max(start, 1)
self._end = max(end, start)
self._segments = max(segments, 1)
self._points = max(points, 1)
self.check()
def set_name(self, name: str) -> None:
with self._lock:
self._properties = self.properties._replace(name=name)
def set_mode(self, mode: "SweepMode") -> None:
with self._lock:
self._properties = self.properties._replace(mode=mode)
def set_averages(self, amount: int, truncates: int) -> None:
with self._lock:
self._properties = self.properties._replace(averages=(amount, truncates))
def set_logarithmic(self, logarithmic: bool) -> None:
with self._lock:
self._properties = self.properties._replace(logarithmic=logarithmic)
def check(self):
if (
self.segments < 1
or self.points < 1
or self.start < 1
or self.end < self.start
or self.stepsize < 0
):
raise ValueError(f"Illegal sweep settings: {self}")
def _exp_factor(self, index: int) -> float:
return 1 - log(self.segments + 1 - index) / log(self.segments + 1)
def get_index_range(self, index: int) -> tuple[int, int]:
if self.properties.logarithmic:
start = round(self.start + self.span * self._exp_factor(index))
end = round(self.start + self.span * self._exp_factor(index + 1))
else:
start = self.start + index * self.points * self.stepsize
end = start + (self.points - 1) * self.stepsize
if self.verbose:
print("get_index_range(%s) -> (%s, %s)", index, start, end)
return start, end
def get_frequencies(self) -> Iterator[int]:
for i in range(self.segments):
start, stop = self.get_index_range(i)
step = (stop - start) / self.points
freq = start
for _ in range(self.points):
yield round(freq)
freq += step

330
src/SweepWorker.py 100644
Wyświetl plik

@ -0,0 +1,330 @@
# NanoVNASaver
#
# A python program to view and export Touchstone data from a NanoVNA
# Copyright (C) 2019, 2020 Rune B. Broberg
# Copyright (C) 2020,2021 NanoVNA-Saver Authors
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
######################### TODO Rewrite this to be used headless.
from time import sleep
import numpy as np
from. Calibration import correct_delay
from .Sweep import Sweep, SweepMode
from RFTools import Datapoint
def truncate(values: list[list[tuple]], count: int, verbose=False) -> list[list[tuple]]:
"""truncate drops extrema from data list if averaging is active"""
keep = len(values) - count
if verbose:
print("Truncating from %d values to %d", len(values), keep)
if count < 1 or keep < 1:
if verbose:
print("Not doing illegal truncate")
return values
truncated = []
for valueset in np.swapaxes(values, 0, 1).tolist():
avg = complex(*np.average(valueset, 0))
truncated.append(
sorted(valueset, key=lambda v, a=avg: abs(a - complex(*v)))[:keep]
)
return np.swapaxes(truncated, 0, 1).tolist()
class SweepWorker():
def __init__(self, vna, verbose=False):
if verbose:
print("Initializing SweepWorker")
self.sweep = Sweep()
self.setAutoDelete(False)
self.percentage = 0
self.data11: list[Datapoint] = []
self.data21: list[Datapoint] = []
self.rawData11: list[Datapoint] = []
self.rawData21: list[Datapoint] = []
self.init_data()
self.stopped = False
self.running = False
self.error_message = ""
self.offsetDelay = 0
self.verbose = verbose
self.vna = vna
def run(self) -> None:
try:
self._run()
except BaseException as exc: # pylint: disable=broad-except
print("%s", exc)
print(f"ERROR during sweep\n\nStopped\n\n{exc}")
if self.verbose:
raise exc
def _run(self) -> None:
if self.verbose:
print("Initializing SweepWorker")
if not self.vna.connected():
if self.verbose:
print("Attempted to run without being connected to the NanoVNA")
self.running = False
return
self.running = True
self.percentage = 0
sweep = self.sweep.copy()
if sweep != self.sweep: # parameters changed
self.sweep = sweep
self.init_data()
self._run_loop()
if sweep.segments > 1:
start = sweep.start
end = sweep.end
if self.verbose:
print("Resetting NanoVNA sweep to full range: %d to %d", start, end)
self.vna.resetSweep(start, end)
self.percentage = 100
if self.verbose:
print('Sending "finished" signal')
self.running = False
def _run_loop(self) -> None:
sweep = self.sweep
averages = (
sweep.properties.averages[0]
if sweep.properties.mode == SweepMode.AVERAGE
else 1
)
if self.verbose:
print("%d averages", averages)
while True:
for i in range(sweep.segments):
if self.verbose:
print("Sweep segment no %d", i)
if self.stopped:
if self.verbose:
print("Stopping sweeping as signalled")
break
start, stop = sweep.get_index_range(i)
freq, values11, values21 = self.readAveragedSegment(
start, stop, averages
)
self.percentage = (i + 1) * 100 / sweep.segments
self.updateData(freq, values11, values21, i)
if sweep.properties.mode != SweepMode.CONTINOUS or self.stopped:
break
def init_data(self):
self.data11 = []
self.data21 = []
self.rawData11 = []
self.rawData21 = []
for freq in self.sweep.get_frequencies():
self.data11.append(Datapoint(freq, 0.0, 0.0))
self.data21.append(Datapoint(freq, 0.0, 0.0))
self.rawData11.append(Datapoint(freq, 0.0, 0.0))
self.rawData21.append(Datapoint(freq, 0.0, 0.0))
if self.verbose:
print("Init data length: %s", len(self.data11))
def updateData(self, frequencies, values11, values21, index):
# Update the data from (i*101) to (i+1)*101
if self.verbose:
print("Calculating data and inserting in existing data at index %d", index)
offset = self.sweep.points * index
raw_data11 = [
Datapoint(freq, values11[i][0], values11[i][1])
for i, freq in enumerate(frequencies)
]
raw_data21 = [
Datapoint(freq, values21[i][0], values21[i][1])
for i, freq in enumerate(frequencies)
]
data11, data21 = self.applyCalibration(raw_data11, raw_data21)
if self.verbose:
print("update Freqs: %s, Offset: %s", len(frequencies), offset)
for i in range(len(frequencies)):
self.data11[offset + i] = data11[i]
self.data21[offset + i] = data21[i]
self.rawData11[offset + i] = raw_data11[i]
self.rawData21[offset + i] = raw_data21[i]
if self.verbose:
print("Saving data to application (%d and %d points)", len(self.data11), len(self.data21))
self.saveData(self.data11, self.data21)
if self.verbose:
print('Sending "updated" signal')
def applyCalibration(
self, raw_data11: list[Datapoint], raw_data21: list[Datapoint]
) -> tuple[list[Datapoint], list[Datapoint]]:
data11: list[Datapoint] = []
data21: list[Datapoint] = []
if not self.calibration.isCalculated:
data11 = raw_data11.copy()
data21 = raw_data21.copy()
elif self.calibration.isValid1Port():
data11.extend(
self.calibration.correct11(dp) for dp in raw_data11
)
else:
data11 = raw_data11.copy()
if self.calibration.isValid2Port():
for counter, dp in enumerate(raw_data21):
dp11 = raw_data11[counter]
data21.append(self.calibration.correct21(dp, dp11))
else:
data21 = raw_data21
if self.offsetDelay != 0:
data11 = [
correct_delay(dp, self.offsetDelay, reflect=True)
for dp in data11
]
data21 = [correct_delay(dp, self.offsetDelay) for dp in data21]
return data11, data21
def readAveragedSegment(self, start, stop, averages=1):
values11 = []
values21 = []
freq = []
if self.verbose:
print("Reading from %d to %d. Averaging %d values", start, stop, averages)
for i in range(averages):
if self.stopped:
if self.verbose:
print("Stopping averaging as signalled.")
if averages == 1:
break
if self.verbose:
print("Stop during average. Discarding sweep result.")
return [], [], []
if self.verbose:
print("Reading average no %d / %d", i + 1, averages)
retry = 0
tmp11 = []
tmp21 = []
while not tmp11 and retry < 5:
sleep(0.5 * retry)
retry += 1
freq, tmp11, tmp21 = self.readSegment(start, stop)
if retry > 1:
if self.verbose:
print("retry %s readSegment(%s,%s)", retry, start, stop)
sleep(0.5)
values11.append(tmp11)
values21.append(tmp21)
self.percentage += 100 / (self.sweep.segments * averages)
if not values11:
raise IOError("Invalid data during swwep")
truncates = self.sweep.properties.averages[1]
if truncates > 0 and averages > 1:
if self.verbose:
print("Truncating %d values by %d", len(values11), truncates)
values11 = truncate(values11, truncates)
values21 = truncate(values21, truncates)
if self.verbose:
print("Averaging %d values", len(values11))
values11 = np.average(values11, 0).tolist()
values21 = np.average(values21, 0).tolist()
return freq, values11, values21
def readSegment(self, start, stop):
if self.verbose:
print("Setting sweep range to %d to %d", start, stop)
self.vna.setSweep(start, stop)
frequencies = self.vna.readFrequencies()
if self.verbose:
print("Read %s frequencies", len(frequencies))
values11 = self.readData("data 0")
values21 = self.readData("data 1")
if not len(frequencies) == len(values11) == len(values21):
if self.verbose:
print("No valid data during this run")
return [], [], []
return frequencies, values11, values21
def readData(self, data):
if self.verbose:
print("Reading %s", data)
done = False
returndata = []
count = 0
while not done:
done = True
returndata = []
tmpdata = self.vna.readValues(data)
if self.verbose:
print("Read %d values", len(tmpdata))
for d in tmpdata:
a, b = d.split(" ")
try:
if self.vna.validateInput and (
abs(float(a)) > 9.5 or abs(float(b)) > 9.5
):
if self.verbose:
print("Got a non plausible data value: (%s)", d)
done = False
break
returndata.append((float(a), float(b)))
except ValueError as exc:
print("An exception occurred reading %s: %s", data, exc)
done = False
if not done:
if self.verbose:
print("Re-reading %s", data)
sleep(0.2)
count += 1
if count == 5:
if self.verbose:
print("Tried and failed to read %s %d times.", data, count)
if self.verbose:
print("trying to reconnect")
self.vna.reconnect()
if count >= 10:
print("Tried and failed to read %s %d times. Giving up.", data, count)
raise IOError(
f"Failed reading {data} {count} times.\n"
f"Data outside expected valid ranges,"
f" or in an unexpected format.\n\n"
f"You can disable data validation on the"
f"device settings screen."
)
return returndata
def gui_error(self, message: str):
self.error_message = message
self.stopped = True
self.running = False

313
src/Touchstone.py 100644
Wyświetl plik

@ -0,0 +1,313 @@
# NanoVNASaver
#
# A python program to view and export Touchstone data from a NanoVNA
# Copyright (C) 2019, 2020 Rune B. Broberg
# Copyright (C) 2020,2021 NanoVNA-Saver Authors
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
######################### TODO Rewrite this to be used headless.
import logging
import math
import cmath
import io
from operator import attrgetter
from scipy.interpolate import interp1d
from NanoVNASaver.RFTools import Datapoint
logger = logging.getLogger(__name__)
class Options:
# Fun fact: In Touchstone 1.1 spec all params are optional unordered.
# Just the line has to start with "#"
UNIT_TO_FACTOR = {
"ghz": 10**9,
"mhz": 10**6,
"khz": 10**3,
"hz": 10**0,
}
VALID_UNITS = UNIT_TO_FACTOR.keys()
VALID_PARAMETERS = "syzgh"
VALID_FORMATS = ("ma", "db", "ri")
def __init__(
self,
unit: str = "GHZ",
parameter: str = "S",
t_format: str = "ma",
resistance: int = 50,
):
# set defaults
assert unit.lower() in Options.VALID_UNITS
assert parameter.lower() in Options.VALID_PARAMETERS
assert t_format.lower() in Options.VALID_FORMATS
assert resistance > 0
self.unit = unit.lower()
self.parameter = parameter.lower()
self.format = t_format.lower()
self.resistance = resistance
@property
def factor(self) -> int:
return Options.UNIT_TO_FACTOR[self.unit]
def __str__(self) -> str:
return (
f"# {self.unit} {self.parameter}"
f" {self.format} r {self.resistance}"
).upper()
def parse(self, line: str):
if not line.startswith("#"):
raise TypeError(f"Not an option line: {line}")
punit = pparam = pformat = presist = False
params = iter(line[1:].lower().split())
for p in params:
if p in Options.VALID_UNITS and not punit:
self.unit = p
punit = True
elif p in Options.VALID_PARAMETERS and not pparam:
self.parameter = p
pparam = True
elif p in Options.VALID_FORMATS and not pformat:
self.format = p
pformat = True
elif p == "r" and not presist:
rstr = next(params)
try:
self.resistance = int(rstr)
except ValueError:
logger.warning("Non integer resistance value: %s", rstr)
self.resistance = int(float(rstr))
else:
raise TypeError(f"Illegal option line: {line}")
class Touchstone:
FIELD_ORDER = ("11", "21", "12", "22")
def __init__(self, filename: str = ""):
self.filename = filename
self.sdata = [[], [], [], []] # at max 4 data pairs
self.comments = []
self.opts = Options()
self._interp = {}
@property
def s11(self) -> list[Datapoint]:
return self.s("11")
@s11.setter
def s11(self, value: list[Datapoint]):
self.sdata[0] = value
@property
def s12(self) -> list[Datapoint]:
return self.s("12")
@s12.setter
def s12(self, value: list[Datapoint]):
self.sdata[2] = value
@property
def s21(self) -> list[Datapoint]:
return self.s("21")
@s21.setter
def s21(self, value: list[Datapoint]):
self.sdata[1] = value
@property
def s22(self) -> list[Datapoint]:
return self.s("22")
@s22.setter
def s22(self, value: list[Datapoint]):
self.sdata[3] = value
@property
def r(self) -> int:
return self.opts.resistance
def s(self, name: str) -> list[Datapoint]:
return self.sdata[Touchstone.FIELD_ORDER.index(name)]
def s_freq(self, name: str, freq: int) -> Datapoint:
return Datapoint(
freq,
float(self._interp[name]["real"](freq)),
float(self._interp[name]["imag"](freq)),
)
def swap(self):
self.sdata = [self.s22, self.s12, self.s21, self.s11]
def min_freq(self) -> int:
return self.s("11")[0].freq
def max_freq(self) -> int:
return self.s("11")[-1].freq
def gen_interpolation(self):
for i in Touchstone.FIELD_ORDER:
freq = []
real = []
imag = []
for dp in self.s(i):
freq.append(dp.freq)
real.append(dp.re)
imag.append(dp.im)
self._interp[i] = {
"real": interp1d(
freq,
real,
kind="slinear",
bounds_error=False,
fill_value=(real[0], real[-1]),
),
"imag": interp1d(
freq,
imag,
kind="slinear",
bounds_error=False,
fill_value=(imag[0], imag[-1]),
),
}
def _parse_comments(self, fp) -> str:
for line in fp:
line = line.strip()
if line.startswith("!"):
logger.info(line)
self.comments.append(line)
continue
return line
def _append_line_data(self, freq: int, data: list):
data_list = iter(self.sdata)
vals = iter(data)
for v in vals:
if self.opts.format == "ri":
next(data_list).append(
Datapoint(freq, float(v), float(next(vals)))
)
if self.opts.format == "ma":
z = cmath.rect(float(v), math.radians(float(next(vals))))
next(data_list).append(Datapoint(freq, z.real, z.imag))
if self.opts.format == "db":
z = cmath.rect(
10 ** (float(v) / 20), math.radians(float(next(vals)))
)
next(data_list).append(Datapoint(freq, z.real, z.imag))
def load(self):
logger.info("Attempting to open file %s", self.filename)
try:
with open(self.filename, encoding="utf-8") as infile:
self.loads(infile.read())
except IOError as e:
logger.exception("Failed to open %s: %s", self.filename, e)
def loads(self, s: str):
"""Parse touchstone 1.1 string input
appends to existing sdata if Touchstone object exists
"""
try:
self._loads(s)
except TypeError as e:
logger.exception("Failed to parse %s: %s", self.filename, e)
def _loads(self, s: str):
need_reorder = False
with io.StringIO(s) as file:
opts_line = self._parse_comments(file)
self.opts.parse(opts_line)
prev_freq = 0.0
prev_len = 0
for line in file:
line = line.strip()
# ignore empty lines (even if not specified)
if line == "":
continue
# accept comment lines after header
if line.startswith("!"):
logger.warning("Comment after header: %s", line)
self.comments.append(line)
continue
# ignore comments at data end
data = line.split("!")[0]
data = data.split()
freq, data = round(float(data[0]) * self.opts.factor), data[1:]
data_len = len(data)
if data_len % 2 != 0:
raise TypeError("Data values aren't pairs: " + line)
# consistency checks
if freq <= prev_freq:
logger.warning("Frequency not ascending: %s", line)
need_reorder = True
prev_freq = freq
if prev_len == 0:
prev_len = data_len
elif data_len != prev_len:
raise TypeError(f"Inconsistent number of pairs: {line}")
self._append_line_data(freq, data)
if need_reorder:
logger.warning("Reordering data")
for datalist in self.sdata:
datalist.sort(key=attrgetter("freq"))
def save(self, nr_params: int = 1):
"""Save touchstone data to file.
Args:
nr_params: Number of s-parameters. 2 for s1p, 4 for s2p
"""
logger.info("Attempting to open file %s for writing", self.filename)
with open(self.filename, "w", encoding="utf-8") as outfile:
outfile.write(self.saves(nr_params))
def saves(self, nr_params: int = 1) -> str:
"""Returns touchstone data as string.
Args:
nr_params: Number of s-parameters. 1 for s1p, 4 for s2p
"""
assert nr_params in {1, 4}
ts_str = "# HZ S RI R 50\n"
for i, dp_s11 in enumerate(self.s11):
ts_str += f"{dp_s11.freq} {dp_s11.re} {dp_s11.im}"
for j in range(1, nr_params):
dp = self.sdata[j][i]
if dp.freq != dp_s11.freq:
raise LookupError("Frequencies of sdata not correlated")
ts_str += f" {dp.re} {dp.im}"
ts_str += "\n"
return ts_str