Fix merge conflict.

pull/697/head
Otto Edgren 2024-04-09 16:17:16 +02:00
commit 0ecbad1833
12 zmienionych plików z 59 dodań i 99 usunięć

Wyświetl plik

@ -42,3 +42,5 @@ Contributors
* Thomas de Lellis <24543390+t52ta6ek@users.noreply.github.com>
* zstadler <zeev.stadler@gmail.com>
* tbergkvist <bergkvist.teo@protonmail.com>
* timmythetank <maans.jacobsson@gmail.com>
* ottoedgren <edgrenotto@gmail.com>

Wyświetl plik

@ -6,4 +6,5 @@ vna = NanoVNASaverHeadless(vna_index=0, verbose=True)
vna.set_sweep(2.9e9, 3.1e9)
vna.calibrate()
vna.stream_data()
# vna.calibrate()
vna.kill()

Wyświetl plik

@ -149,29 +149,22 @@ class CalDataSet(UserDict):
+ "\n"
+ "# Hz ShortR ShortI OpenR OpenI LoadR LoadI"
+ (
" ThroughR ThroughI ThrureflR"
" ThrureflI IsolationR IsolationI\n"
" ThroughR ThroughI ThrureflR" " ThrureflI IsolationR IsolationI\n"
if self.complete2port()
else "\n"
)
+ "\n".join(
[f"{self.data.get(freq)}" for freq in self.frequencies()]
)
+ "\n".join([f"{self.data.get(freq)}" for freq in self.frequencies()])
+ "\n"
)
if self.complete1port()
else ""
)
def _append_match(
self, m: re.Match, header: str, line_nr: int, line: str
) -> None:
def _append_match(self, m: re.Match, header: str, line_nr: int, line: str) -> None:
cal = m.groupdict()
columns = {col[:-1] for col in cal.keys() if cal[col] and col != "freq"}
if "through" in columns and header == "sol":
logger.warning(
"Through data with sol header. %i: %s", line_nr, line
)
logger.warning("Through data with sol header. %i: %s", line_nr, line)
# fix short data (without thrurefl)
if "thrurefl" in columns and "isolation" not in columns:
cal["isolationr"] = cal["thrureflr"]
@ -201,9 +194,7 @@ class CalDataSet(UserDict):
continue
if m := RXP_CAL_HEADER.search(line):
if header:
logger.warning(
"Duplicate header in cal data. %i: %s", i, line
)
logger.warning("Duplicate header in cal data. %i: %s", i, line)
header = "through" if m.group("through") else "sol"
continue
if not line or line.startswith("#"):
@ -214,9 +205,7 @@ class CalDataSet(UserDict):
logger.warning("Illegal caldata. Line %i: %s", i, line)
continue
if not header:
logger.warning(
"Caldata without having read header: %i: %s", i, line
)
logger.warning("Caldata without having read header: %i: %s", i, line)
self._append_match(m, header, line, i)
return self
@ -290,7 +279,7 @@ class Calibration:
def isValid2Port(self) -> bool:
return self.dataset.complete2port()
def _calc_port_1(self, freq: int, cal: CalData):
g1 = self.gamma_short(freq)
g2 = self.gamma_open(freq)
@ -309,8 +298,7 @@ class Calibration:
cal.e00 = (
-(
(g2 * gm3 - g3 * gm3) * g1 * gm2
- (g2 * g3 * gm2 - g2 * g3 * gm3 - (g3 * gm2 - g2 * gm3) * g1)
* gm1
- (g2 * g3 * gm2 - g2 * g3 * gm3 - (g3 * gm2 - g2 * gm3) * g1) * gm1
)
/ denominator
)
@ -388,9 +376,7 @@ class Calibration:
return (
(Zsp / 50.0 - 1.0)
/ (Zsp / 50.0 + 1.0)
* cmath.exp(
complex(0.0, -4.0 * math.pi * freq * cal_element.short_length)
)
* cmath.exp(complex(0.0, -4.0 * math.pi * freq * cal_element.short_length))
)
def gamma_open(self, freq: int) -> complex:
@ -430,9 +416,7 @@ class Calibration:
return (
(Zl / 50.0 - 1.0)
/ (Zl / 50.0 + 1.0)
* cmath.exp(
complex(0.0, -4 * math.pi * freq * cal_element.load_length)
)
* cmath.exp(complex(0.0, -4 * math.pi * freq * cal_element.load_length))
)
def gamma_through(self, freq: int) -> complex:
@ -524,8 +508,7 @@ class Calibration:
i = self.interp
s21 = (dp.z - i["e30"](dp.freq)) / i["e10e32"](dp.freq)
s21 = s21 * (
i["e10e01"](dp.freq)
/ (i["e11"](dp.freq) * dp11.z - i["delta_e"](dp.freq))
i["e10e01"](dp.freq) / (i["e11"](dp.freq) * dp11.z - i["delta_e"](dp.freq))
)
return Datapoint(dp.freq, s21.real, s21.imag)

Wyświetl plik

@ -84,11 +84,7 @@ def _fix_v2_hwinfo(dev):
def usb_typename(device: ListPortInfo) -> str:
return next(
(
t.name
for t in USBDEVICETYPES
if device.vid == t.vid and device.pid == t.pid
),
(t.name for t in USBDEVICETYPES if device.vid == t.vid and device.pid == t.pid),
"",
)
@ -135,9 +131,11 @@ def get_portinfos() -> list[str]:
portinfos.append(version)
return portinfos
def get_VNA(iface: Interface) -> VNA:
return NAME2DEVICE[iface.comment](iface)
def get_comment(iface: Interface) -> str:
logger.info("Finding correct VNA type...")
with iface.lock:

Wyświetl plik

@ -62,9 +62,7 @@ class NanoVNA(VNA):
self.serial.write("capture\r".encode("ascii"))
self.serial.readline()
self.serial.timeout = 4
image_data = self.serial.read(
self.screenwidth * self.screenheight * 2
)
image_data = self.serial.read(self.screenwidth * self.screenheight * 2)
self.serial.timeout = timeout
self.serial.timeout = timeout
return image_data
@ -145,9 +143,7 @@ class NanoVNA(VNA):
f"scan {self.start} {self.stop} {self.datapoints} 0b110"
):
data = line.split()
self._sweepdata.append(
(f"{data[0]} {data[1]}", f"{data[2]} {data[3]}")
)
self._sweepdata.append((f"{data[0]} {data[1]}", f"{data[2]} {data[3]}"))
if value == "data 0":
return [x[0] for x in self._sweepdata]
if value == "data 1":

Wyświetl plik

@ -121,10 +121,7 @@ class NanoVNA_V2(VNA):
self.txPowerRanges = [
(
(140e6, self.sweep_max_freq_Hz),
[
_ADF4350_TXPOWER_DESC_MAP[value]
for value in (3, 2, 1, 0)
],
[_ADF4350_TXPOWER_DESC_MAP[value] for value in (3, 2, 1, 0)],
),
]
@ -172,14 +169,10 @@ class NanoVNA_V2(VNA):
self.serial.write(pack("<Q", 0))
sleep(WRITE_SLEEP)
# cmd: write register 0x30 to clear FIFO
self.serial.write(
pack("<BBB", _CMD_WRITE, _ADDR_VALUES_FIFO, 0)
)
self.serial.write(pack("<BBB", _CMD_WRITE, _ADDR_VALUES_FIFO, 0))
sleep(WRITE_SLEEP)
# clear sweepdata
self._sweepdata = [(complex(), complex())] * (
self.datapoints + s21hack
)
self._sweepdata = [(complex(), complex())] * (self.datapoints + s21hack)
pointstodo = self.datapoints + s21hack
# we read at most 255 values at a time and the time required
# empirically is just over 3 seconds for 101 points or
@ -205,9 +198,7 @@ class NanoVNA_V2(VNA):
# timeout secs
arr = self.serial.read(nBytes)
if nBytes != len(arr):
logger.warning(
"expected %d bytes, got %d", nBytes, len(arr)
)
logger.warning("expected %d bytes, got %d", nBytes, len(arr))
# the way to retry on timeout is keep the data
# already read then try to read the rest of
# the data into the array
@ -225,9 +216,7 @@ class NanoVNA_V2(VNA):
self._sweepdata = self._sweepdata[1:]
idx = 1 if value == "data 1" else 0
return [
f"{str(x[idx].real)} {str(x[idx].imag)}" for x in self._sweepdata
]
return [f"{str(x[idx].real)} {str(x[idx].imag)}" for x in self._sweepdata]
def resetSweep(self, start: int, stop: int):
self.setSweep(start, stop)
@ -251,9 +240,7 @@ class NanoVNA_V2(VNA):
return result
def read_board_revision(self) -> "Version":
result = self._read_version(
_ADDR_DEVICE_VARIANT, _ADDR_HARDWARE_REVISION
)
result = self._read_version(_ADDR_DEVICE_VARIANT, _ADDR_HARDWARE_REVISION)
logger.debug("read_board_revision: %s", result)
return result
@ -279,12 +266,8 @@ class NanoVNA_V2(VNA):
_ADDR_SWEEP_START,
max(50000, int(self.sweepStartHz - (self.sweepStepHz * s21hack))),
)
cmd += pack(
"<BBQ", _CMD_WRITE8, _ADDR_SWEEP_STEP, int(self.sweepStepHz)
)
cmd += pack(
"<BBH", _CMD_WRITE2, _ADDR_SWEEP_POINTS, self.datapoints + s21hack
)
cmd += pack("<BBQ", _CMD_WRITE8, _ADDR_SWEEP_STEP, int(self.sweepStepHz))
cmd += pack("<BBH", _CMD_WRITE2, _ADDR_SWEEP_POINTS, self.datapoints + s21hack)
cmd += pack("<BBH", _CMD_WRITE2, _ADDR_SWEEP_VALS_PER_FREQ, 1)
with self.serial.lock:
self.serial.write(cmd)

Wyświetl plik

@ -62,9 +62,7 @@ class TinySA(VNA):
self.serial.write("capture\r".encode("ascii"))
self.serial.readline()
self.serial.timeout = 4
image_data = self.serial.read(
self.screenwidth * self.screenheight * 2
)
image_data = self.serial.read(self.screenwidth * self.screenheight * 2)
self.serial.timeout = timeout
self.serial.timeout = timeout
return image_data
@ -122,8 +120,7 @@ class TinySA(VNA):
logger.debug("Read: %s", value)
if value == "data 0":
self._sweepdata = [
f"{conv2float(line)} 0.0"
for line in self.exec_command("data 0")
f"{conv2float(line)} 0.0" for line in self.exec_command("data 0")
]
return self._sweepdata

Wyświetl plik

@ -43,9 +43,7 @@ WAIT = 0.05
def _max_retries(bandwidth: int, datapoints: int) -> int:
return round(
20
+ 20 * (datapoints / 101)
+ (1000 / bandwidth) ** 1.30 * (datapoints / 101)
20 + 20 * (datapoints / 101) + (1000 / bandwidth) ** 1.30 * (datapoints / 101)
)
@ -149,9 +147,7 @@ class VNA:
]
def set_bandwidth(self, bandwidth: int):
bw_val = (
DISLORD_BW[bandwidth] if self.bw_method == "dislord" else bandwidth
)
bw_val = DISLORD_BW[bandwidth] if self.bw_method == "dislord" else bandwidth
result = " ".join(self.exec_command(f"bandwidth {bw_val}"))
if self.bw_method == "ttrftech" and result:
raise IOError(f"set_bandwith({bandwidth}: {result}")

Wyświetl plik

@ -22,13 +22,16 @@ import typing
logger = logging.getLogger(__name__)
_RXP = re.compile(r"""^
_RXP = re.compile(
r"""^
\D*
(?P<major>\d+)\.
(?P<minor>\d+)\.?
(?P<revision>\d+)?
(?P<note>.*)
$""", re.VERBOSE)
$""",
re.VERBOSE,
)
class _Version(typing.NamedTuple):
@ -38,18 +41,17 @@ class _Version(typing.NamedTuple):
note: str
def __str__(self) -> str:
return (
f'{self.major}.{self.minor}'
f'.{self.revision}{self.note}'
)
return f"{self.major}.{self.minor}" f".{self.revision}{self.note}"
def Version(vstring: str = "0.0.0") -> '_Version':
def Version(vstring: str = "0.0.0") -> "_Version":
if (match := _RXP.search(vstring)) is None:
logger.error("Unable to parse version: %s", vstring)
return _Version(0, 0, 0, '')
return _Version(0, 0, 0, "")
return _Version(int(match.group('major')),
int(match.group('minor')),
int(match.group('revision') or '0'),
match.group('note'))
return _Version(
int(match.group("major")),
int(match.group("minor")),
int(match.group("revision") or "0"),
match.group("note"),
)

Wyświetl plik

@ -5,6 +5,7 @@ from .RFTools import Datapoint
import matplotlib.pyplot as plt
import math
class NanoVNASaverHeadless:
def __init__(self, vna_index=0, verbose=False):
self.verbose = verbose
@ -39,7 +40,14 @@ class NanoVNASaverHeadless:
def set_sweep(self, start, stop):
self.vna.setSweep(start, stop)
print("Sweep set from " + str(self.vna.readFrequencies()[0]/1e9) + "e9" + " to " + str(self.vna.readFrequencies()[-1]/1e9) + "e9")
print(
"Sweep set from "
+ str(self.vna.readFrequencies()[0] / 1e9)
+ "e9"
+ " to "
+ str(self.vna.readFrequencies()[-1] / 1e9)
+ "e9"
)
def stream_data(self):
data = self.get_data()
@ -80,7 +88,7 @@ class NanoVNASaverHeadless:
values = item.split()
real.append(float(values[0]))
imaginary.append(float(values[1]))
#add exception handling
# add exception handling
return real, imaginary
def kill(self):
@ -90,4 +98,4 @@ class NanoVNASaverHeadless:
else:
if self.verbose:
print("Disconnected VNA.")
return
return

Wyświetl plik

@ -76,9 +76,7 @@ class Datapoint(NamedTuple):
return -1 if imp.real == 0.0 else abs(imp.imag / imp.real)
def capacitiveEquivalent(self, ref_impedance: float = 50) -> float:
return impedance_to_capacitance(
self.impedance(ref_impedance), self.freq
)
return impedance_to_capacitance(self.impedance(ref_impedance), self.freq)
def inductiveEquivalent(self, ref_impedance: float = 50) -> float:
return impedance_to_inductance(self.impedance(ref_impedance), self.freq)
@ -125,9 +123,7 @@ def norm_to_impedance(z: complex, ref_impedance: float = 50) -> complex:
def parallel_to_serial(z: complex) -> complex:
"""Convert parallel impedance to serial impedance equivalent"""
z_sq_sum = z.real**2 + z.imag**2 or 10.0e-30
return complex(
z.real * z.imag**2 / z_sq_sum, z.real**2 * z.imag / z_sq_sum
)
return complex(z.real * z.imag**2 / z_sq_sum, z.real**2 * z.imag / z_sq_sum)
def reflection_coefficient(z: complex, ref_impedance: float = 50) -> complex:

Wyświetl plik

@ -116,9 +116,7 @@ class Value:
fmt = self.fmt
if math.isnan(self._value):
return f"-{fmt.space_str}{self._unit}"
if fmt.assume_infinity and abs(self._value) >= 10 ** (
(fmt.max_offset + 1) * 3
):
if fmt.assume_infinity and abs(self._value) >= 10 ** ((fmt.max_offset + 1) * 3):
return (
("-" if self._value < 0 else "")
+ "\N{INFINITY}"