Linting SweepWorker

pull/197/head
Holger Müller 2020-06-28 12:55:07 +02:00
rodzic c139a531e7
commit 790c8aac2b
1 zmienionych plików z 58 dodań i 33 usunięć

Wyświetl plik

@ -18,7 +18,7 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import logging
from time import sleep
from typing import List
from typing import List, Tuple
import numpy as np
from PyQt5 import QtCore, QtWidgets
@ -31,7 +31,7 @@ from NanoVNASaver.RFTools import Datapoint
logger = logging.getLogger(__name__)
def truncate(values, count):
def truncate(values: List[List[Tuple]], count: int) -> List[List[Tuple]]:
keep = len(values) - count
logger.debug("Truncating from %d values to %d", len(values), keep)
if count < 1 or keep < 1:
@ -42,7 +42,8 @@ def truncate(values, count):
avg = complex(*np.average(valueset, 0))
truncated.append(
sorted(valueset,
key=lambda v: abs(avg - complex(*v)))[:keep])
key=lambda v, a=avg:
abs(a - complex(*v)))[:keep])
return np.swapaxes(truncated, 0, 1).tolist()
@ -82,7 +83,8 @@ class SweepWorker(QtCore.QRunnable):
self.running = True
self.percentage = 0
if not self.app.serial.is_open:
logger.debug("Attempted to run without being connected to the NanoVNA")
logger.debug(
"Attempted to run without being connected to the NanoVNA")
self.running = False
return
@ -93,7 +95,8 @@ class SweepWorker(QtCore.QRunnable):
if self.averaging:
logger.info("%d averages", self.averages)
if self.app.sweepStartInput.text() == "" or self.app.sweepEndInput.text() == "":
if (self.app.sweepStartInput.text() == "" or
self.app.sweepEndInput.text() == ""):
logger.debug("First sweep - standard range")
# We should handle the first startup by reading frequencies?
sweep_from = 1000000
@ -101,13 +104,15 @@ class SweepWorker(QtCore.QRunnable):
else:
sweep_from = parse_frequency(self.app.sweepStartInput.text())
sweep_to = parse_frequency(self.app.sweepEndInput.text())
logger.debug("Parsed sweep range as %d to %d", sweep_from, sweep_to)
logger.debug("Parsed sweep range as %d to %d",
sweep_from, sweep_to)
if sweep_from < 0 or sweep_to < 0 or sweep_from == sweep_to:
logger.warning("Can't sweep from %s to %s",
self.app.sweepStartInput.text(),
self.app.sweepEndInput.text())
self.error_message = \
"Unable to parse frequency inputs - check start and stop fields."
self.error_message = (
"Unable to parse frequency inputs"
" - check start and stop fields.")
self.stopped = True
self.running = False
self.signals.sweepError.emit()
@ -124,19 +129,23 @@ class SweepWorker(QtCore.QRunnable):
if self.averaging:
for i in range(self.noSweeps):
logger.debug("Sweep segment no %d averaged over %d readings", i, self.averages)
logger.debug("Sweep segment no %d averaged over %d readings",
i, self.averages)
if self.stopped:
logger.debug("Stopping sweeping as signalled")
break
start = sweep_from + i * self.vna.datapoints * stepsize
freq, val11, val21 = self.readAveragedSegment(
start, start + (self.vna.datapoints - 1) * stepsize, self.averages)
start,
start + (self.vna.datapoints - 1) * stepsize,
self.averages)
frequencies += freq
values += val11
values21 += val21
self.percentage = (i + 1) * (self.vna.datapoints - 1) / self.noSweeps
self.percentage = (i + 1) * (self.vna.datapoints - 1) / \
self.noSweeps
logger.debug("Saving acquired data")
self.saveData(frequencies, values, values21)
@ -199,8 +208,9 @@ class SweepWorker(QtCore.QRunnable):
parse_frequency(
self.app.sweepStartInput.text()),
parse_frequency(self.app.sweepEndInput.text()))
self.vna.resetSweep(parse_frequency(self.app.sweepStartInput.text()),
parse_frequency(self.app.sweepEndInput.text()))
self.vna.resetSweep(
parse_frequency(self.app.sweepStartInput.text()),
parse_frequency(self.app.sweepEndInput.text()))
self.percentage = 100
logger.debug("Sending \"finished\" signal")
@ -210,7 +220,9 @@ class SweepWorker(QtCore.QRunnable):
def updateData(self, values11, values21, offset, segment_size=101):
# Update the data from (i*101) to (i+1)*101
logger.debug("Calculating data and inserting in existing data at offset %d", offset)
logger.debug(
"Calculating data and inserting in existing data at offset %d",
offset)
for i, val11 in enumerate(values11):
re, im = val11
re21, im21 = values21[i]
@ -234,14 +246,12 @@ class SweepWorker(QtCore.QRunnable):
raw_data21 = []
logger.debug("Calculating data including corrections")
for i, freq in enumerate(frequencies):
logger.debug("Freqnr %i, len(%i)", i, len(frequencies))
logger.debug("Val11 %s", values11[i])
logger.debug("Val21 %s", values21[i])
re, im = values11[i]
re21, im21 = values21[i]
raw_data11 += [Datapoint(freq, re, im)]
raw_data21 += [Datapoint(freq, re21, im21)]
self.data11, self.data21 = self.applyCalibration(raw_data11, raw_data21)
self.data11, self.data21 = self.applyCalibration(
raw_data11, raw_data21)
self.rawData11 = raw_data11
self.rawData21 = raw_data21
logger.debug("Saving data to application (%d and %d points)",
@ -250,8 +260,10 @@ class SweepWorker(QtCore.QRunnable):
logger.debug("Sending \"updated\" signal")
self.signals.updated.emit()
def applyCalibration(self, raw_data11: List[Datapoint], raw_data21: List[Datapoint]) ->\
(List[Datapoint], List[Datapoint]):
def applyCalibration(self,
raw_data11: List[Datapoint],
raw_data21: List[Datapoint]
) -> (List[Datapoint], List[Datapoint]):
if self.offsetDelay != 0:
tmp = []
for d in raw_data11:
@ -336,31 +348,41 @@ class SweepWorker(QtCore.QRunnable):
for d in tmpdata:
a, b = d.split(" ")
try:
if self.vna.validateInput and (float(a) < -9.5 or float(a) > 9.5):
logger.warning("Got a non-float data value: %s (%s)", d, a)
if self.vna.validateInput and (
float(a) < -9.5 or float(a) > 9.5):
logger.warning(
"Got a non-float data value: %s (%s)", d, a)
logger.debug("Re-reading %s", data)
done = False
elif self.vna.validateInput and (float(b) < -9.5 or float(b) > 9.5):
logger.warning("Got a non-float data value: %s (%s)", d, b)
elif self.vna.validateInput and (
float(b) < -9.5 or float(b) > 9.5):
logger.warning(
"Got a non-float data value: %s (%s)", d, b)
logger.debug("Re-reading %s", data)
done = False
else:
returndata.append((float(a), float(b)))
except Exception as e:
logger.exception("An exception occurred reading %s: %s", data, e)
logger.exception("An exception occurred reading %s: %s",
data, e)
logger.debug("Re-reading %s", data)
done = False
if not done:
sleep(0.2)
count += 1
if count == 10:
logger.error("Tried and failed to read %s %d times.", data, count)
logger.error("Tried and failed to read %s %d times.",
data, count)
if count >= 20:
logger.critical("Tried and failed to read %s %d times. Giving up.", data, count)
logger.critical(
"Tried and failed to read %s %d times. Giving up.",
data, count)
raise NanoVNAValueException(
f"Failed reading {data} {count} times.\n"
f"Data outside expected valid ranges, or in an unexpected format.\n\n"
f"You can disable data validation on the device settings screen.")
f"Data outside expected valid ranges,"
f" or in an unexpected format.\n\n"
f"You can disable data validation on the"
f"device settings screen.")
return returndata
def readFreq(self):
@ -375,7 +397,8 @@ class SweepWorker(QtCore.QRunnable):
tmpfreq = self.vna.readFrequencies()
if not tmpfreq:
logger.warning("Read no frequencies")
raise NanoVNASerialException("Failed reading frequencies: Returned no values.")
raise NanoVNASerialException(
"Failed reading frequencies: Returned no values.")
for f in tmpfreq:
if not f.isdigit():
logger.warning("Got a non-digit frequency: %s", f)
@ -383,11 +406,13 @@ class SweepWorker(QtCore.QRunnable):
done = False
count += 1
if count == 10:
logger.error("Tried and failed %d times to read frequencies.", count)
logger.error(
"Tried and failed %d times to read frequencies.",
count)
if count >= 20:
logger.critical(
"Tried and failed to read frequencies from the NanoVNA %d times.",
count)
"Tried and failed to read frequencies from the"
" NanoVNA %d times.", count)
raise NanoVNAValueException(
f"Failed reading frequencies {count} times.")
else: