Merge pull request #225 from NanoVNA-Saver/Development

v0.3.4
pull/280/head v0.3.5
Holger Müller 2020-07-04 14:29:00 +02:00 zatwierdzone przez GitHub
commit 408eda63cf
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
17 zmienionych plików z 596 dodań i 309 usunięć

Wyświetl plik

@ -1,3 +1,14 @@
v0.3.4
======
- Refactored Analysis
- Add Antenna Analysis
- Fixed bug in Through Calibration
- Fixed bug in s2p saving
- Fixed crash when clicking connect with no device connected
- Fixed module error with source installation if
pkg\_resources missing
v0.3.3
======

Wyświetl plik

@ -17,7 +17,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
VERSION = "0.3.3"
VERSION = "0.3.4"
VERSION_URL = (
"https://raw.githubusercontent.com/"
"NanoVNA-Saver/nanovna-saver/master/NanoVNASaver/About.py")

Wyświetl plik

@ -0,0 +1,271 @@
'''
Created on 30 giu 2020
@author: mauro
'''
from PyQt5 import QtWidgets, QtTest
import logging
import math
from NanoVNASaver.Analysis import Analysis
from NanoVNASaver.Hardware import VNA
import numpy as np
from NanoVNASaver.Marker import Marker
from NanoVNASaver import RFTools
from NanoVNASaver.Analysis.VSWRAnalysis import VSWRAnalysis
from NanoVNASaver.Formatting import format_frequency_sweep
logger = logging.getLogger(__name__)
class Antenna(object):
@staticmethod
def group_consecutives(vals, step=1):
"""
https://stackoverflow.com/questions/7352684/how-to-find-the-groups-of-consecutive-elements-from-an-array-in-numpy
Return list of consecutive lists of numbers from vals (number list).
:param vals:
:param step:
"""
run = []
result = [run]
expect = None
for v in vals:
if (v == expect) or (expect is None):
run.append(v)
else:
run = [v]
result.append(run)
expect = v + step
return result
@classmethod
def analyze(cls, frequencies, values, FIELD_NAME, step=5000, vuoto=False):
'''
dati dati, prova a trovare minimi e bands passanti
:param cls:
'''
if FIELD_NAME == "rl":
BAND_THRESHOLD = -7.0
MIN_CALCOLO_Q = -27
elif FIELD_NAME == "vswr":
BAND_THRESHOLD = 2.62
MIN_CALCOLO_Q = 1.1
else:
raise ValueError("unknown threshold for {}".format(FIELD_NAME))
bands_raw = np.where(values < BAND_THRESHOLD)[0]
# raggruppo posizioni in cui il valore è sotto la soglia
bands = cls.group_consecutives(bands_raw)
# print("raggruppate in ", bands)
out = []
# print "bands", bands
banda_dict = None
for band in bands:
if band:
print("band ", band)
fmin = frequencies[band[0]]
fmax = frequencies[band[-1]]
estensione = fmax - fmin
x = np.argmin(values[band[0]:band[-1] + 1])
prog = x + band[0]
min_val = values[prog]
if banda_dict:
salto = fmin - banda_dict["fmax"]
if salto < (10 * step):
logger.warning("unisco band e proseguo")
if min_val < banda_dict["min"]:
logger.debug("aggiusto nuovo minimo, da %s a %s",
banda_dict["min"], min_val)
banda_dict["min"] = min_val
# invalido eventuale Q e band passante ?!?
banda_dict["q"] = None
banda_dict["banda_passante"] = None
banda_dict["fmax"] = fmax
# non servono ulteriori elaborazioni
continue
else:
logger.warning("finalizzo band precedente")
out.append(banda_dict)
banda_dict = None
# se sono qui è nuova
if estensione == 0 and vuoto:
logger.warning("ritorno minima estensione")
banda_dict = {"fmin": fmin - 30 * step,
"fmax": fmin + 30 * step,
"banda_passante": None,
"q": None,
"min": min_val,
"freq": fmin,
"prog": prog,
}
else:
logger.warning("Nuova band")
if min_val <= MIN_CALCOLO_Q:
# FIXME: verificare che ci siano valori >
# BAND_THRESHOLD?!?
q = np.sqrt(fmax * fmin) / (fmax - fmin)
logger.info("Q=%s", q)
else:
logger.info(
"non calcolo Q perchè minimo %s non è abbastanza", min_val)
q = None
banda_dict = {"fmin": fmin,
"fmax": fmax,
"banda_passante": fmax - fmin,
"q": q,
"min": min_val,
"freq": frequencies[prog],
"prog": prog,
}
if banda_dict:
out.append(banda_dict)
return out
class ChartFactory(object):
@classmethod
def NewChart(cls, chart_class, name, app):
from NanoVNASaver.NanoVNASaver import BandsModel
new_chart = chart_class(name)
new_chart.isPopout = True
new_chart.data = app.data
new_chart.bands = BandsModel()
i=0
default_color = app.default_marker_colors[i]
color = app.settings.value("Marker" + str(i+1) + "Color", default_color)
marker = Marker("Marker " + str(i+1), color)
marker.isMouseControlledRadioButton.setChecked(True)
new_chart.setMarkers([marker])
return new_chart
class MinVswrAnalysis(Antenna, Analysis):
def __init__(self, app):
super().__init__(app)
self._widget = QtWidgets.QWidget()
def runAnalysis(self):
self.reset()
if len(self.app.data) == 0:
logger.debug("No data to analyse")
self.result_label.setText("No data to analyse.")
return
frequencies = []
values = []
for p in self.app.data:
frequencies.append(p.freq)
vswr = p.vswr
values.append(vswr)
res = self.analyze(np.array(frequencies),
np.array(values),
"vswr")
marker = 0
for banda in res:
if marker < 3:
self.app.markers[marker].setFrequency(
str(round(banda["freq"])))
marker += 1
print("min {min} a {freq}".format(**banda))
# Charts
progr = 0
for c in self.app.subscribing_charts:
if c.name == "S11 VSWR":
new_chart = c.copy()
new_chart.isPopout = True
new_chart.show()
new_chart.setWindowTitle("%s %s" % (new_chart.name, progr))
vna = self.app.vna
if isinstance(vna, InvalidVNA):
logger.warning("end analysis, non valid vna")
else:
logger.warning("band zoom")
for banda in res:
progr += 1
# scan
self.app.sweepStartInput.setText(str(banda["fmin"]))
self.app.sweepEndInput.setText(str(banda["fmax"]))
self.app.sweep()
while not self.app.btnSweep.isEnabled():
QtTest.QTest.qWait(500)
for c in self.app.subscribing_charts:
if c.name == "S11 VSWR":
new_chart = c.copy()
new_chart.isPopout = True
new_chart.show()
new_chart.setWindowTitle("%s %s" % (new_chart.name, progr))
class ZeroCrossAnalysis(Antenna, Analysis):
def __init__(self, app):
super().__init__(app)
self._widget = QtWidgets.QWidget()
def runAnalysis(self):
self.reset()
if len(self.app.data) == 0:
logger.debug("No data to analyse")
self.result_label.setText("No data to analyse.")
return
frequencies = []
values = []
for p in self.app.data:
frequencies.append(p.freq)
values.append(p.z.imag)
zero_crossings = np.where(np.diff(np.sign(np.array(values))))[0]
marker = 0
for pos in zero_crossings:
freq = round(frequencies[pos])
if marker < 3:
self.app.markers[marker].setFrequency(
str(freq))
marker += 1
print("cross at {}".format(freq))
class MagLoopAnalysis(VSWRAnalysis):
max_dips_shown = 1
vswr_limit_value = 2.56
def runAnalysis(self):
super().runAnalysis()
for m in self.minimums:
start, lowest, end = m
if start != end:
Q = self.app.data[lowest].freq/(self.app.data[end].freq-self.app.data[start].freq)
self.layout.addRow("Q",QtWidgets.QLabel("{}".format(int(Q))))
self.app.sweepStartInput.setText(self.app.data[start].freq)
self.app.sweepEndInput.setText(self.app.data[end].freq)
# self.app.sweepEndInput.textEdited.emit(self.app.sweepEndInput.text())

Wyświetl plik

@ -28,6 +28,9 @@ logger = logging.getLogger(__name__)
class VSWRAnalysis(Analysis):
max_dips_shown = 3
vswr_limit_value = 1.5
class QHLine(QtWidgets.QFrame):
def __init__(self):
super().__init__()
@ -41,7 +44,7 @@ class VSWRAnalysis(Analysis):
self._widget.setLayout(self.layout)
self.input_vswr_limit = QtWidgets.QDoubleSpinBox()
self.input_vswr_limit.setValue(1.5)
self.input_vswr_limit.setValue(self.vswr_limit_value)
self.input_vswr_limit.setSingleStep(0.1)
self.input_vswr_limit.setMinimum(1)
self.input_vswr_limit.setMaximum(25)
@ -56,7 +59,7 @@ class VSWRAnalysis(Analysis):
self.layout.addRow(self.results_label)
def runAnalysis(self):
max_dips_shown = 3
max_dips_shown = self.max_dips_shown
data = []
for d in self.app.data:
data.append(d.vswr)
@ -111,7 +114,7 @@ class VSWRAnalysis(Analysis):
dips.remove(dips[min_idx])
minimums.remove(minimums[min_idx])
minimums = best_dips
self.minimums = minimums
if len(minimums) > 0:
for m in minimums:
start, lowest, end = m

Wyświetl plik

@ -220,12 +220,12 @@ class Calibration:
f" values at frequency {freq}Hz.")
if self.isValid2Port():
gt = self.gamma_through(freq)
gm4 = caldata["through"].z
caldata["e10e32"] = (gm4 / gt - caldata["e30"]
) * (1 - caldata["e11"] ** 2)
caldata["e30"] = caldata["isolation"].z
gt = self.gamma_through(freq)
caldata["e10e32"] = (caldata["through"].z / gt - caldata["e30"]
) * (1 - caldata["e11"]**2)
self.gen_interpolation()
self.isCalculated = True
logger.debug("Calibration correctly calculated.")
@ -238,7 +238,7 @@ class Calibration:
self.shortL0 + self.shortL1 * freq +
self.shortL2 * freq**2 + self.shortL3 * freq**3)
# Referencing https://arxiv.org/pdf/1606.02446.pdf (18) - (21)
g = ((Zsp / 50 - 1) / (Zsp / 50 + 1)) * cmath.exp(
g = (Zsp / 50 - 1) / (Zsp / 50 + 1) * cmath.exp(
complex(0, 1) * 2 * math.pi * 2 * freq *
self.shortLength * -1)
return g
@ -252,7 +252,7 @@ class Calibration:
self.openC2 * freq**2 + self.openC3 * freq**3))
if divisor != 0:
Zop = complex(0, -1) / divisor
g = ((Zop / 50 - 1) / (Zop / 50+ 1)) * cmath.exp(
g = ((Zop / 50 - 1) / (Zop / 50 + 1)) * cmath.exp(
complex(0, 1) * 2 * math.pi *
2 * freq * self.openLength * -1)
return g
@ -263,7 +263,7 @@ class Calibration:
logger.debug("Using load calibration set values.")
Zl = self.loadR + (complex(0, 1) * 2 *
math.pi * freq * self.loadL)
g = (((Zl / 50) - 1) / ((Zl / 50) + 1)) * cmath.exp(
g = (Zl / 50 - 1) / (Zl / 50 + 1) * cmath.exp(
complex(0, 1) * 2 * math.pi *
2 * freq * self.loadLength * -1)
return g

Wyświetl plik

@ -42,7 +42,7 @@ class AVNA(VNA):
logger.debug("Reading calibration info.")
if not self.serial.is_open:
return "Not connected."
if self.app.serialLock.acquire():
with self.app.serialLock:
try:
data = "a"
while data != "":
@ -58,8 +58,6 @@ class AVNA(VNA):
return values[1]
except serial.SerialException as exc:
logger.exception("Exception while reading calibration info: %s", exc)
finally:
self.app.serialLock.release()
return "Unknown"
def readFrequencies(self) -> List[str]:
@ -73,7 +71,7 @@ class AVNA(VNA):
logger.debug("Reading version info.")
if not self.serial.is_open:
return
if self.app.serialLock.acquire():
with self.app.serialLock:
try:
data = "a"
while data != "":
@ -90,9 +88,7 @@ class AVNA(VNA):
return values[1]
except serial.SerialException as exc:
logger.exception("Exception while reading firmware version: %s", exc)
finally:
self.app.serialLock.release()
return
return ""
def setSweep(self, start, stop):
self.writeSerial("sweep " + str(start) + " " + str(stop) + " " + str(self.datapoints))

Wyświetl plik

@ -30,6 +30,7 @@ from NanoVNASaver.Hardware.NanoVNA_F import NanoVNA_F
from NanoVNASaver.Hardware.NanoVNA_H import NanoVNA_H, NanoVNA_H4
from NanoVNASaver.Hardware.NanoVNA import NanoVNA
from NanoVNASaver.Hardware.NanoVNA_V2 import NanoVNAV2
from NanoVNASaver.Hardware.Serial import drain_serial
logger = logging.getLogger(__name__)
@ -40,6 +41,8 @@ DEVICETYPES = (
Device(0x16c0, 0x0483, "AVNA"),
Device(0x04b4, 0x0008, "NanaVNA-V2"),
)
RETRIES = 3
TIMEOUT = 0.2
# The USB Driver for NanoVNA V2 seems to deliver an
@ -63,21 +66,18 @@ def get_interfaces() -> List[Tuple[str, str]]:
port = d.device
logger.info("Found %s (%04x %04x) on port %s",
t.name, d.vid, d.pid, d.device)
return_ports.append((port, f"{port}({t.name})"))
return_ports.append((port, f"{port} ({t.name})"))
return return_ports
def get_VNA(app, serial_port: serial.Serial) -> 'VNA':
serial_port.timeout = TIMEOUT
logger.info("Finding correct VNA type...")
with app.serialLock:
vna_version = detect_version(serial_port)
for _ in range(3):
vnaType = detect_version(serial_port)
if vnaType != "unknown":
break
serial_port.timeout = 0.2
if vnaType == 'nanovnav2':
if vna_version == 'v2':
logger.info("Type: NanoVNA-V2")
return NanoVNAV2(app, serial_port)
@ -91,14 +91,14 @@ def get_VNA(app, serial_port: serial.Serial) -> 'VNA':
if firmware.find("NanoVNA-H 4") > 0:
logger.info("Type: NanoVNA-H4")
vna = NanoVNA_H4(app, serial_port)
if firmware.find("sweep_points 201") > 0:
if vna.readFirmware().find("sweep_points 201") > 0:
logger.info("VNA has 201 datapoints capability")
vna._datapoints = (201, 101)
return vna
if firmware.find("NanoVNA-H") > 0:
logger.info("Type: NanoVNA-H")
vna = NanoVNA_H(app, serial_port)
if firmware.find("sweep_points 201") > 0:
if vna.readFirmware().find("sweep_points 201") > 0:
logger.info("VNA has 201 datapoints capability")
vna._datapoints = (201, 101)
return vna
@ -112,27 +112,19 @@ def get_VNA(app, serial_port: serial.Serial) -> 'VNA':
return NanoVNA(app, serial_port)
def detect_version(serialPort: serial.Serial) -> str:
serialPort.timeout = 0.1
# drain any outstanding data in the serial incoming buffer
data = "a"
while len(data) != 0:
data = serialPort.read(128)
# send a \r and see what we get
serialPort.write(b"\r")
# will wait up to 0.1 seconds
data = serialPort.readline().decode('ascii')
if data == 'ch> ':
# this is an original nanovna
return 'nanovna'
if data == '2':
# this is a nanovna v2
return 'nanovnav2'
logger.error('Unknown VNA type: hardware responded to CR with: %s', data)
return 'unknown'
def detect_version(serial_port: serial.Serial) -> str:
data = ""
for i in range(RETRIES):
drain_serial(serial_port)
serial_port.write("\r".encode("ascii"))
data = serial_port.read(128).decode("ascii")
if data.startswith("ch> "):
return "v1"
# -H versions
if data.startswith("\r\nch> "):
return "vh"
if data.startswith("2"):
return "v2"
logger.debug("Retry detection: %s", i + 1)
logger.error('No VNA detected. Hardware responded to CR with: %s', data)
return ""

Wyświetl plik

@ -25,6 +25,7 @@ import serial
import numpy as np
from PyQt5 import QtGui
from NanoVNASaver.Hardware.Serial import drain_serial
from NanoVNASaver.Hardware.VNA import VNA, Version
logger = logging.getLogger(__name__)
@ -61,11 +62,9 @@ class NanoVNA(VNA):
logger.debug("Reading calibration info.")
if not self.serial.is_open:
return "Not connected."
if self.app.serialLock.acquire():
with self.app.serialLock:
try:
data = "a"
while data != "":
data = self.serial.readline().decode('ascii')
drain_serial(self.serial)
self.serial.write("cal\r".encode('ascii'))
result = ""
data = ""
@ -77,19 +76,15 @@ class NanoVNA(VNA):
return values[1]
except serial.SerialException as exc:
logger.exception("Exception while reading calibration info: %s", exc)
finally:
self.app.serialLock.release()
return "Unknown"
def getScreenshot(self) -> QtGui.QPixmap:
logger.debug("Capturing screenshot...")
if not self.serial.is_open:
return QtGui.QPixmap()
if self.app.serialLock.acquire():
try:
data = "a"
while data != "":
data = self.serial.readline().decode('ascii')
try:
with self.app.serialLock:
drain_serial(self.serial)
timeout = self.serial.timeout
self.serial.write("capture\r".encode('ascii'))
self.serial.timeout = 4
@ -97,26 +92,24 @@ class NanoVNA(VNA):
image_data = self.serial.read(
self.screenwidth * self.screenheight * 2)
self.serial.timeout = timeout
rgb_data = struct.unpack(
f">{self.screenwidth * self.screenheight}H",
image_data)
rgb_array = np.array(rgb_data, dtype=np.uint32)
rgba_array = (0xFF000000 +
((rgb_array & 0xF800) << 8) +
((rgb_array & 0x07E0) << 5) +
((rgb_array & 0x001F) << 3))
image = QtGui.QImage(
rgba_array,
self.screenwidth,
self.screenheight,
QtGui.QImage.Format_ARGB32)
logger.debug("Captured screenshot")
return QtGui.QPixmap(image)
except serial.SerialException as exc:
logger.exception(
"Exception while capturing screenshot: %s", exc)
finally:
self.app.serialLock.release()
rgb_data = struct.unpack(
f">{self.screenwidth * self.screenheight}H",
image_data)
rgb_array = np.array(rgb_data, dtype=np.uint32)
rgba_array = (0xFF000000 +
((rgb_array & 0xF800) << 8) +
((rgb_array & 0x07E0) << 5) +
((rgb_array & 0x001F) << 3))
image = QtGui.QImage(
rgba_array,
self.screenwidth,
self.screenheight,
QtGui.QImage.Format_ARGB32)
logger.debug("Captured screenshot")
return QtGui.QPixmap(image)
except serial.SerialException as exc:
logger.exception(
"Exception while capturing screenshot: %s", exc)
return QtGui.QPixmap()
def readFrequencies(self) -> List[str]:
@ -130,11 +123,9 @@ class NanoVNA(VNA):
logger.debug("Reading version info.")
if not self.serial.is_open:
return ""
if self.app.serialLock.acquire():
try:
data = "a"
while data != "":
data = self.serial.readline().decode('ascii')
try:
with self.app.serialLock:
drain_serial(self.serial)
self.serial.write("version\r".encode('ascii'))
result = ""
data = ""
@ -142,18 +133,16 @@ class NanoVNA(VNA):
while "ch>" not in data:
data = self.serial.readline().decode('ascii')
result += data
values = result.splitlines()
logger.debug("Found version info: %s", values[1])
return values[1]
except serial.SerialException as exc:
logger.exception("Exception while reading firmware version: %s", exc)
finally:
self.app.serialLock.release()
values = result.splitlines()
logger.debug("Found version info: %s", values[1])
return values[1]
except serial.SerialException as exc:
logger.exception("Exception while reading firmware version: %s", exc)
return ""
def setSweep(self, start, stop):
if self.useScan:
self.writeSerial("scan " + str(start) + " " + str(stop) + " " + str(self.datapoints))
self.writeSerial(f"scan {start} {stop} {self.datapoints}")
else:
self.writeSerial("sweep " + str(start) + " " + str(stop) + " " + str(self.datapoints))
self.writeSerial(f"sweep {start} {stop} {self.datapoints}")
sleep(1)

Wyświetl plik

@ -24,6 +24,7 @@ import numpy as np
from PyQt5 import QtGui
from NanoVNASaver.Hardware.NanoVNA import NanoVNA
from NanoVNASaver.Hardware.Serial import drain_serial
logger = logging.getLogger(__name__)
@ -37,11 +38,9 @@ class NanoVNA_F(NanoVNA):
logger.debug("Capturing screenshot...")
if not self.serial.is_open:
return QtGui.QPixmap()
if self.app.serialLock.acquire():
try:
data = "a"
while data != "":
data = self.serial.readline().decode('ascii')
try:
with self.app.serialLock:
drain_serial(self.serial)
self.serial.write("capture\r".encode('ascii'))
timeout = self.serial.timeout
self.serial.timeout = 4
@ -49,47 +48,45 @@ class NanoVNA_F(NanoVNA):
image_data = self.serial.read(
self.screenwidth * self.screenheight * 2)
self.serial.timeout = timeout
rgb_data = struct.unpack(
f"<{self.screenwidth * self.screenheight}H", image_data)
rgb_array = np.array(rgb_data, dtype=np.uint32)
rgba_array = (0xFF000000 +
((rgb_array & 0xF800) << 8) + # G?!
((rgb_array & 0x07E0) >> 3) + # B
((rgb_array & 0x001F) << 11)) # G
rgb_data = struct.unpack(
f"<{self.screenwidth * self.screenheight}H", image_data)
rgb_array = np.array(rgb_data, dtype=np.uint32)
rgba_array = (0xFF000000 +
((rgb_array & 0xF800) << 8) + # G?!
((rgb_array & 0x07E0) >> 3) + # B
((rgb_array & 0x001F) << 11)) # G
unwrapped_array = np.empty(
self.screenwidth*self.screenheight,
dtype=np.uint32)
for y in range(self.screenheight // 2):
for x in range(self.screenwidth // 2):
unwrapped_array[
2 * x + 2 * y * self.screenwidth
] = rgba_array[x + y * self.screenwidth]
unwrapped_array[
(2 * x) + 1 + 2 * y * self.screenwidth
] = rgba_array[
x + (self.screenheight//2 + y) * self.screenwidth
]
unwrapped_array[
2 * x + (2 * y + 1) * self.screenwidth
] = rgba_array[
x + self.screenwidth // 2 + y * self.screenwidth
]
unwrapped_array[
(2 * x) + 1 + (2 * y + 1) * self.screenwidth
] = rgba_array[
x + self.screenwidth // 2 +
(self.screenheight//2 + y) * self.screenwidth
]
unwrapped_array = np.empty(
self.screenwidth*self.screenheight,
dtype=np.uint32)
for y in range(self.screenheight // 2):
for x in range(self.screenwidth // 2):
unwrapped_array[
2 * x + 2 * y * self.screenwidth
] = rgba_array[x + y * self.screenwidth]
unwrapped_array[
(2 * x) + 1 + 2 * y * self.screenwidth
] = rgba_array[
x + (self.screenheight//2 + y) * self.screenwidth
]
unwrapped_array[
2 * x + (2 * y + 1) * self.screenwidth
] = rgba_array[
x + self.screenwidth // 2 + y * self.screenwidth
]
unwrapped_array[
(2 * x) + 1 + (2 * y + 1) * self.screenwidth
] = rgba_array[
x + self.screenwidth // 2 +
(self.screenheight//2 + y) * self.screenwidth
]
image = QtGui.QImage(
unwrapped_array,
self.screenwidth, self.screenheight,
QtGui.QImage.Format_ARGB32)
logger.debug("Captured screenshot")
return QtGui.QPixmap(image)
except serial.SerialException as exc:
logger.exception("Exception while capturing screenshot: %s", exc)
finally:
self.app.serialLock.release()
image = QtGui.QImage(
unwrapped_array,
self.screenwidth, self.screenheight,
QtGui.QImage.Format_ARGB32)
logger.debug("Captured screenshot")
return QtGui.QPixmap(image)
except serial.SerialException as exc:
logger.exception("Exception while capturing screenshot: %s", exc)
return QtGui.QPixmap()

Wyświetl plik

@ -66,7 +66,8 @@ class NanoVNAV2(VNA):
tty.setraw(self.serial.fd)
# reset protocol to known state
self.serial.write(pack("<Q", 0))
with self.app.serialLock:
self.serial.write(pack("<Q", 0))
self.version = self.readVersion()
self.firmware = self.readFirmware()
@ -103,9 +104,9 @@ class NanoVNAV2(VNA):
cmd = pack("<BBBB",
_CMD_READ, _ADDR_FW_MAJOR,
_CMD_READ, _ADDR_FW_MINOR)
self.serial.write(cmd)
resp = self.serial.read(2)
with self.app.serialLock:
self.serial.write(cmd)
resp = self.serial.read(2)
if len(resp) != 2:
logger.error("Timeout reading version registers")
return None
@ -119,50 +120,51 @@ class NanoVNAV2(VNA):
def readValues(self, value) -> List[str]:
self.checkValid()
self.serial.timeout = 8 # should be enough
# Actually grab the data only when requesting channel 0.
# The hardware will return all channels which we will store.
if value == "data 0":
# reset protocol to known state
self.serial.write(pack("<Q", 0))
with self.app.serialLock:
self.serial.timeout = 8 # should be enough
self.serial.write(pack("<Q", 0))
# cmd: write register 0x30 to clear FIFO
self.serial.write(pack("<BBB",
_CMD_WRITE, _ADDR_VALUES_FIFO, 0))
# clear sweepdata
self._sweepdata = [(complex(), complex())] * self.datapoints
pointstodo = self.datapoints
while pointstodo > 0:
logger.info("reading values")
pointstoread = min(255, pointstodo)
# cmd: read FIFO, addr 0x30
self.serial.write(
pack("<BBB",
_CMD_READFIFO, _ADDR_VALUES_FIFO,
pointstoread))
# cmd: write register 0x30 to clear FIFO
self.serial.write(pack("<BBB",
_CMD_WRITE, _ADDR_VALUES_FIFO, 0))
# clear sweepdata
self._sweepdata = [(complex(), complex())] * self.datapoints
pointstodo = self.datapoints
while pointstodo > 0:
logger.info("reading values")
pointstoread = min(255, pointstodo)
# cmd: read FIFO, addr 0x30
self.serial.write(
pack("<BBB",
_CMD_READFIFO, _ADDR_VALUES_FIFO,
pointstoread))
# each value is 32 bytes
nBytes = pointstoread * 32
# each value is 32 bytes
nBytes = pointstoread * 32
# serial .read() will wait for exactly nBytes bytes
arr = self.serial.read(nBytes)
if nBytes != len(arr):
logger.error("expected %d bytes, got %d",
nBytes, len(arr))
return []
# serial .read() will wait for exactly nBytes bytes
arr = self.serial.read(nBytes)
if nBytes != len(arr):
logger.error("expected %d bytes, got %d",
nBytes, len(arr))
return []
for i in range(pointstoread):
(fwd_real, fwd_imag, rev0_real, rev0_imag, rev1_real,
rev1_imag, freq_index) = unpack_from(
"<iiiiiihxxxxxx", arr, i * 32)
fwd = complex(fwd_real, fwd_imag)
refl = complex(rev0_real, rev0_imag)
thru = complex(rev1_real, rev1_imag)
logger.debug("Freq index: %i", freq_index)
self._sweepdata[freq_index] = (refl / fwd, thru / fwd)
for i in range(pointstoread):
(fwd_real, fwd_imag, rev0_real, rev0_imag, rev1_real,
rev1_imag, freq_index) = unpack_from(
"<iiiiiihxxxxxx", arr, i * 32)
fwd = complex(fwd_real, fwd_imag)
refl = complex(rev0_real, rev0_imag)
thru = complex(rev1_real, rev1_imag)
logger.debug("Freq index: %i", freq_index)
self._sweepdata[freq_index] = (refl / fwd, thru / fwd)
pointstodo = pointstodo - pointstoread
pointstodo = pointstodo - pointstoread
ret = [x[0] for x in self._sweepdata]
ret = [str(x.real) + ' ' + str(x.imag) for x in ret]
@ -181,9 +183,9 @@ class NanoVNAV2(VNA):
def readVersion(self):
# read register 0xf0 (device type), 0xf2 (board revision)
cmd = b"\x10\xf0\x10\xf2"
self.serial.write(cmd)
resp = self.serial.read(2)
with self.app.serialLock:
self.serial.write(cmd)
resp = self.serial.read(2)
if len(resp) != 2:
logger.error("Timeout reading version registers")
return None
@ -210,4 +212,5 @@ class NanoVNAV2(VNA):
_ADDR_SWEEP_POINTS, self.datapoints)
cmd += pack("<BBH", _CMD_WRITE2,
_ADDR_SWEEP_VALS_PER_FREQ, 1)
self.serial.write(cmd)
with self.app.serialLock:
self.serial.write(cmd)

Wyświetl plik

@ -0,0 +1,25 @@
# NanoVNASaver
#
# A python program to view and export Touchstone data from a NanoVNA
# Copyright (C) 2019, 2020 Rune B. Broberg
# Copyright (C) 2020 NanoVNA-Saver Authors
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import serial
def drain_serial(serial_port: serial.Serial):
"""drain up to 10k outstanding data in the serial incoming buffer"""
for _ in range(80):
if len(serial_port.read(128)) == 0:
break

Wyświetl plik

@ -24,6 +24,7 @@ import serial
from PyQt5 import QtWidgets, QtGui
from NanoVNASaver.Settings import Version
from NanoVNASaver.Hardware.Serial import drain_serial
logger = logging.getLogger(__name__)
@ -76,21 +77,17 @@ class VNA:
return QtGui.QPixmap()
def flushSerialBuffers(self):
if self.app.serialLock.acquire():
self.serial.write(b"\r\n\r\n")
with self.app.serialLock:
self.serial.write("\r\n\r\n".encode("ascii"))
sleep(0.1)
self.serial.reset_input_buffer()
self.serial.reset_output_buffer()
sleep(0.1)
self.app.serialLock.release()
def readFirmware(self) -> str:
if self.app.serialLock.acquire():
result = ""
try:
data = "a"
while data != "":
data = self.serial.readline().decode('ascii')
try:
with self.app.serialLock:
drain_serial(self.serial)
self.serial.write("info\r".encode('ascii'))
result = ""
data = ""
@ -98,65 +95,49 @@ class VNA:
while data != "ch> ":
data = self.serial.readline().decode('ascii')
result += data
except serial.SerialException as exc:
logger.exception(
"Exception while reading firmware data: %s", exc)
finally:
self.app.serialLock.release()
return result
logger.error("Unable to acquire serial lock to read firmware.")
except serial.SerialException as exc:
logger.exception(
"Exception while reading firmware data: %s", exc)
return ""
def readFromCommand(self, command) -> str:
if self.app.serialLock.acquire():
result = ""
try:
data = "a"
while data != "":
data = self.serial.readline().decode('ascii')
self.serial.write((command + "\r").encode('ascii'))
try:
with self.app.serialLock:
drain_serial(self.serial)
self.serial.write(f"{command}\r".encode('ascii'))
result = ""
data = ""
sleep(0.01)
while data != "ch> ":
data = self.serial.readline().decode('ascii')
result += data
except serial.SerialException as exc:
logger.exception(
"Exception while reading %s: %s", command, exc)
finally:
self.app.serialLock.release()
return result
logger.error("Unable to acquire serial lock to read %s", command)
except serial.SerialException as exc:
logger.exception(
"Exception while reading %s: %s", command, exc)
return ""
def readValues(self, value) -> List[str]:
logger.debug("VNA reading %s", value)
if self.app.serialLock.acquire():
try:
data = "a"
while data != "":
data = self.serial.readline().decode('ascii')
# Then send the command to read data
self.serial.write(str(value + "\r").encode('ascii'))
try:
with self.app.serialLock:
drain_serial(self.serial)
self.serial.write(f"{value}\r".encode('ascii'))
result = ""
data = ""
sleep(0.05)
while data != "ch> ":
data = self.serial.readline().decode('ascii')
result += data
values = result.split("\r\n")
except serial.SerialException as exc:
logger.exception(
"Exception while reading %s: %s", value, exc)
return []
finally:
self.app.serialLock.release()
values = result.split("\r\n")
logger.debug(
"VNA done reading %s (%d values)",
value, len(values)-2)
return values[1:-1]
logger.error("Unable to acquire serial lock to read %s", value)
except serial.SerialException as exc:
logger.exception(
"Exception while reading %s: %s", value, exc)
return []
def writeSerial(self, command):
@ -164,25 +145,21 @@ class VNA:
logger.warning("Writing without serial port being opened (%s)",
command)
return
if self.app.serialLock.acquire():
with self.app.serialLock:
try:
self.serial.write(str(command + "\r").encode('ascii'))
self.serial.write(f"{command}\r".encode('ascii'))
self.serial.readline()
except serial.SerialException as exc:
logger.exception(
"Exception while writing to serial port (%s): %s",
command, exc)
finally:
self.app.serialLock.release()
return
def setSweep(self, start, stop):
self.writeSerial("sweep " + str(start) + " " + str(stop) + " " + str(self.datapoints))
# TODO: should be dropped and the serial part should be a connection class which handles
# unconnected devices
self.writeSerial(f"sweep {start} {stop} {self.datapoints}")
# TODO: should be dropped and the serial part should be a connection class
# which handles unconnected devices
class InvalidVNA(VNA):
name = "Invalid"
_datapoints = (0, )

Wyświetl plik

@ -1,2 +0,0 @@
from .Hardware import get_interfaces, get_VNA #noqa
from .VNA import InvalidVNA, Version #noqa

Wyświetl plik

@ -36,7 +36,8 @@ from .Formatting import (
format_frequency, format_frequency_short, format_frequency_sweep,
parse_frequency,
)
from .Hardware import get_interfaces, get_VNA, InvalidVNA
from .Hardware.Hardware import get_interfaces, get_VNA
from .Hardware.VNA import InvalidVNA
from .RFTools import Datapoint, corr_att_data
from .Charts.Chart import Chart
from .Charts import (
@ -588,7 +589,7 @@ class NanoVNASaver(QtWidgets.QWidget):
ts = Touchstone(filename)
ts.sdata[0] = self.data
if nr_params > 1:
ts.sdata[1] = self.data
ts.sdata[1] = self.data21
for dp in self.data:
ts.sdata[2].append(Datapoint(dp.freq, 0, 0))
ts.sdata[3].append(Datapoint(dp.freq, 0, 0))
@ -606,7 +607,7 @@ class NanoVNASaver(QtWidgets.QWidget):
return
def startSerial(self):
if self.serialLock.acquire():
with self.serialLock:
self.serialPort = self.serialPortInput.currentData()
if self.serialPort == "":
self.serialPort = self.serialPortInput.currentText()
@ -616,52 +617,52 @@ class NanoVNASaver(QtWidgets.QWidget):
self.serial.timeout = 0.05
except serial.SerialException as exc:
logger.error("Tried to open %s and failed: %s", self.serialPort, exc)
self.serialLock.release()
return
if not self.serial.isOpen() :
logger.error("Unable to open port %s", self.serialPort)
return
self.btnSerialToggle.setText("Disconnect")
self.serialLock.release()
sleep(0.05)
sleep(0.05)
self.vna = get_VNA(self, self.serial)
self.vna.validateInput = self.settings.value("SerialInputValidation", True, bool)
self.worker.setVNA(self.vna)
self.vna = get_VNA(self, self.serial)
self.vna.validateInput = self.settings.value("SerialInputValidation", True, bool)
self.worker.setVNA(self.vna)
logger.info(self.vna.readFirmware())
logger.info(self.vna.readFirmware())
frequencies = self.vna.readFrequencies()
if frequencies:
logger.info("Read starting frequency %s and end frequency %s",
frequencies[0], frequencies[100])
if int(frequencies[0]) == int(frequencies[100]) and (
self.sweepStartInput.text() == "" or
self.sweepEndInput.text() == ""):
self.sweepStartInput.setText(
format_frequency_sweep(int(frequencies[0])))
self.sweepEndInput.setText(
format_frequency_sweep(int(frequencies[100]) + 100000))
elif (self.sweepStartInput.text() == "" or
self.sweepEndInput.text() == ""):
self.sweepStartInput.setText(
format_frequency_sweep(int(frequencies[0])))
self.sweepEndInput.setText(
format_frequency_sweep(int(frequencies[100])))
self.sweepStartInput.textEdited.emit(
self.sweepStartInput.text())
self.sweepStartInput.textChanged.emit(
self.sweepStartInput.text())
else:
logger.warning("No frequencies read")
return
logger.debug("Starting initial sweep")
self.sweep()
frequencies = self.vna.readFrequencies()
if frequencies:
logger.info("Read starting frequency %s and end frequency %s",
frequencies[0], frequencies[100])
if int(frequencies[0]) == int(frequencies[100]) and (
self.sweepStartInput.text() == "" or
self.sweepEndInput.text() == ""):
self.sweepStartInput.setText(
format_frequency_sweep(int(frequencies[0])))
self.sweepEndInput.setText(
format_frequency_sweep(int(frequencies[100]) + 100000))
elif (self.sweepStartInput.text() == "" or
self.sweepEndInput.text() == ""):
self.sweepStartInput.setText(
format_frequency_sweep(int(frequencies[0])))
self.sweepEndInput.setText(
format_frequency_sweep(int(frequencies[100])))
self.sweepStartInput.textEdited.emit(
self.sweepStartInput.text())
self.sweepStartInput.textChanged.emit(
self.sweepStartInput.text())
else:
logger.warning("No frequencies read")
return
logger.debug("Starting initial sweep")
self.sweep()
return
def stopSerial(self):
if self.serialLock.acquire():
with self.serialLock:
logger.info("Closing connection to NanoVNA")
self.serial.close()
self.serialLock.release()
self.btnSerialToggle.setText("Connect to NanoVNA")
def toggleSweepSettings(self, disabled):
@ -698,13 +699,13 @@ class NanoVNASaver(QtWidgets.QWidget):
def stopSweep(self):
self.worker.stopped = True
def saveData(self, data, data12, source=None):
def saveData(self, data, data21, source=None):
if self.dataLock.acquire(blocking=True):
self.data = data
if self.s21att > 0:
self.data21 = corr_att_data(data12, self.s21att)
self.data21 = corr_att_data(data21, self.s21att)
else:
self.data21 = data12
self.data21 = data21
else:
logger.error("Failed acquiring data lock while saving.")
self.dataLock.release()

Wyświetl plik

@ -23,6 +23,7 @@ from PyQt5 import QtWidgets, QtCore
from NanoVNASaver.Analysis import Analysis, LowPassAnalysis, HighPassAnalysis, \
BandPassAnalysis, BandStopAnalysis, VSWRAnalysis, \
SimplePeakSearchAnalysis
from NanoVNASaver.Analysis.AntennaAnalysis import MagLoopAnalysis
logger = logging.getLogger(__name__)
@ -53,6 +54,7 @@ class AnalysisWindow(QtWidgets.QWidget):
# self.analysis_list.addItem("Peak search", PeakSearchAnalysis(self.app))
self.analysis_list.addItem("Peak search", SimplePeakSearchAnalysis(self.app))
self.analysis_list.addItem("VSWR analysis", VSWRAnalysis(self.app))
self.analysis_list.addItem("MagLoop analysis", MagLoopAnalysis(self.app))
select_analysis_layout.addRow("Analysis type", self.analysis_list)
self.analysis_list.currentIndexChanged.connect(self.updateSelection)

Wyświetl plik

@ -7,22 +7,41 @@
NanoVNASaver
============
A multiplatform tool to save Touchstone files from
the NanoVNA, sweep frequency spans in segments to gain more than 101 data
A multiplatform tool to save Touchstone files from the NanoVNA,
sweep frequency spans in segments to gain more than 101 data
points, and generally display and analyze the resulting data.
Copyright 2019, 2020 Rune B. Broberg
Copyright 2020 NanoVNA-Saver Authors
- Copyright 2019, 2020 Rune B. Broberg
- Copyright 2020 NanoVNA-Saver Authors
# Latest Changes
## Changes in v0.3.4
- Refactored Analysis
- Add Antenna Analysis
- Fixed bug in Through Calibration
- Fixed bug in s2p saving
- Fixed crash when clicking connect with no device connected
- Fixed module error with source installation if
pkg\_resources missing
## Changes in v0.3.3
- Fixed data acquisition with S-A-A-2 / NanoVNA V2
- Refactored calibration code
- Calibration data between known datapoints is now
interpolated using spline interpolation
- Fixed Through Calibration (CH0 -> CH1)
## Changes in v0.3.2
- This adds support for the saa2, a vna loosely
based on the original nanovna with frequency range up to 3Ghz.
- Added ability to add attenutor values in s11 sweep settings
for amplifier measuremnts
- This adds support for the SAA2, a VNA loosely based on the
original NanoVNA with frequency range up to 3GHz.
- Added ability to add use an attenuator and add the Antenuation
in s11 sweep settings for amplifier measurements.
## Introduction This software connects to a NanoVNA and extracts the data for
display on a computer, and for saving to Touchstone files.
# Introduction
This software connects to a NanoVNA and extracts the data for
display on a computer and allows saving the sweep data to Touchstone files.
Current features:
- Reading data from a NanoVNA -- Compatible devices: NanoVNA, NanoVNA-H,
@ -52,8 +71,6 @@ Current features:
## Running the application
### Windows
The software was written in Python on Windows, using Pycharm, and the modules
PyQT5, numpy, scipy and pyserial.
@ -61,16 +78,18 @@ PyQT5, numpy, scipy and pyserial.
You can find 64bit binary releases for Windows, Linux and MacOS under
https://github.com/NanoVNA-Saver/nanovna-saver/releases/
The downloadable executable runs directly, and requires no installation. For
Windows 7, it does require Service Pack 1 and [Microsoft VC++
Redistributable](https://support.microsoft.com/en-us/help/2977003/the-latest-supported-visual-c-downloads).
For most users, this is already installed.
Windows 10
### Windows
Versions older than Windows 7 are not known to work.
It may be possible to run on those directly from the python code:
#### Installation and Use with pip
#### Windows 7
It requires Service Pack 1 and [Microsoft VC++ Redistributable]
(https://support.microsoft.com/en-us/help/2977003/the-latest-supported-visual-c-downloads).
For most users, this would already be installed.
#### Windows 10
The downloadable executable runs directly, and requires no installation.
##### Installation and Use with pip
1. Clone repo and cd into the directory
@ -127,7 +146,7 @@ Via a MacPorts distribution maintained by @ra1nb0w.
#### Homebrew
1. Install Homebrew
From : https://brew.sh/
From : https://brew.sh/
/usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"
@ -176,11 +195,11 @@ frequency span, saved to save slot 0, is sufficient. If the NanoVNA is
completely uncalibrated, its readings may be outside the range accepted by the
application.
In-application calibration is available, either assuming ideal standards, or
In-application calibration is available, either assuming ideal standards or
with relevant standard correction. To manually calibrate, sweep each standard
in turn, and press the relevant button in the calibration window. For assisted
calibration, press the "Calibration assistant" button. If desired, enter a
note in the provided field describing the conditions under which the
in turn and press the relevant button in the calibration window.
For assisted calibration, press the "Calibration Assistant" button. If desired,
enter a note in the provided field describing the conditions under which the
calibration was performed.
Calibration results may be saved and loaded using the provided buttons at the
@ -199,7 +218,7 @@ _Currently, load capacitance is unsupported_
### TDR
To get accurate TDR measurements, calibrate the device, and attach the cable to
be measured at the calibration plane - ie. at the same position where the
be measured at the calibration plane - i.e. at the same position where the
calibration load would be attached. Open the "Time Domain Reflectometry"
window, and select the correct cable type, or manually enter a propagation
factor.
@ -224,7 +243,7 @@ You can use it, commercially as well. You may make changes to the code, but I
* HexAndFlex wrote a 3-part (thus far) series on Getting Started with the NanoVNA:
[https://hexandflex.com/2019/08/31/getting-started-with-the-nanovna-part-1/] - Part 3 is dedicated to NanoVNASaver:
[https://hexandflex.com/2019/09/15/getting-started-with-the-nanovna-part-3-pc-software/]
* Gunthard Kraus also does a documentation in english and german:
* Gunthard Kraus did documentation in English and German:
[http://www.gunthard-kraus.de/fertig_NanoVNA/English/]
[http://www.gunthard-kraus.de/fertig_NanoVNA/Deutsch/]

Wyświetl plik

@ -14,7 +14,10 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import pkg_resources.py2_warn
try:
import pkg_resources.py2_warn
except ImportError:
pass
from NanoVNASaver.__main__ import main
if __name__ == '__main__':