diff --git a/NanoVNASaver/Calibration.py b/NanoVNASaver/Calibration.py index 854e4e1..ec93d32 100644 --- a/NanoVNASaver/Calibration.py +++ b/NanoVNASaver/Calibration.py @@ -35,39 +35,6 @@ IDEAL_OPEN = complex(1, 0) IDEAL_LOAD = complex(0, 0) IDEAL_THROUGH = complex(1, 0) -RXP_CAL_LINE = { - "sol": re.compile(r""" - ^ \s* - (?P\d+) \s+ - (?P[-0-9Ee.]+) \s+ (?P[-0-9Ee.]+) \s+ - (?P[-0-9Ee.]+) \s+ (?P[-0-9Ee.]+) \s+ - (?P[-0-9Ee.]+) \s+ (?P[-0-9Ee.]+) - \s* $ - """, re.VERBOSE), - "short": re.compile(r""" - ^ \s* - (?P\d+) \s+ - (?P[-0-9Ee.]+) \s+ (?P[-0-9Ee.]+) \s+ - (?P[-0-9Ee.]+) \s+ (?P[-0-9Ee.]+) \s+ - (?P[-0-9Ee.]+) \s+ (?P[-0-9Ee.]+) - ( \s+ # optional for backword compatibility - (?P[-0-9Ee.]+) \s+ (?P[-0-9Ee.]+) \s+ - (?P[-0-9Ee.]+) \s+ (?P[-0-9Ee.]+) - )? \s* $ - """, re.VERBOSE), - "long": re.compile(r""" - ^ \s* - (?P\d+) \s+ - (?P[-0-9Ee.]+) \s+ (?P[-0-9Ee.]+) \s+ - (?P[-0-9Ee.]+) \s+ (?P[-0-9Ee.]+) \s+ - (?P[-0-9Ee.]+) \s+ (?P[-0-9Ee.]+) \s+ - (?P[-0-9Ee.]+) \s+ (?P[-0-9Ee.]+) \s+ - (?P[-0-9Ee.]+) \s+ (?P[-0-9Ee.]+) \s+ - (?P[-0-9Ee.]+) \s+ (?P[-0-9Ee.]+) - \s* $ - """, re.VERBOSE), -} - RXP_CAL_HEADER = re.compile(r""" ^ \# \s+ Hz \s+ ShortR \s+ ShortI \s+ OpenR \s+ OpenI \s+ @@ -76,6 +43,18 @@ RXP_CAL_HEADER = re.compile(r""" (?P \s+ ThrureflR \s+ ThrureflI)? (?P \s+ IsolationR \s+ IsolationI)? \s* $ +""", re.VERBOSE | re.IGNORECASE) + +RXP_CAL_LINE = re.compile(r""" + ^ \s* + (?P\d+) \s+ + (?P[-0-9Ee.]+) \s+ (?P[-0-9Ee.]+) \s+ + (?P[-0-9Ee.]+) \s+ (?P[-0-9Ee.]+) \s+ + (?P[-0-9Ee.]+) \s+ (?P[-0-9Ee.]+) + ( \s+ (?P[-0-9Ee.]+) \s+ (?P[-0-9Ee.]+))? + ( \s+ (?P[-0-9Ee.]+) \s+ (?P[-0-9Ee.]+))? + ( \s+ (?P[-0-9Ee.]+) \s+ (?P[-0-9Ee.]+))? + \s* $ """, re.VERBOSE) logger = logging.getLogger(__name__) @@ -170,6 +149,58 @@ class CalDataSet(UserDict): if self.complete1port() else "" ) + def _append_match(self, m: re.Match, header: str, + line_nr: int, line: str) -> None: + cal = m.groupdict() + columns = { + col[:-1] for col in cal.keys() if cal[col] and col != "freq" + } + if "through" in columns and header == "sol": + logger.warning("Through data with sol header. %i: %s", + line_nr, line) + # fix short data (without thrurefl) + if "thrurefl" in columns and "isolation" not in columns: + cal["isolationr"] = cal["thrureflr"] + cal["isolationi"] = cal["thrurefli"] + cal["thrureflr"], cal["thrurefli"] = None, None + for name in columns: + self.insert( + name, + Datapoint(int(cal["freq"]), + float(cal[f"{name}r"]), + float(cal[f"{name}i"]))) + + def from_str(self, text: str) -> 'CalDataSet': + # reset data + self.notes = "" + self.data = defaultdict(CalData) + header = "" + # parse text + for i, line in enumerate(text.splitlines(), 1): + line = line.strip() + + if line.startswith("!"): + self.notes += f"{line[2:]}\n" + continue + if m := RXP_CAL_HEADER.search(line): + if header: + logger.warning( + "Duplicate header in cal data. %i: %s", i, line) + header = "through" if m.group("through") else "sol" + continue + if not line or line.startswith("#"): + continue + + m = RXP_CAL_LINE.search(line) + if not m: + logger.warning("Illegal caldata. Line %i: %s", i, line) + continue + if not header: + logger.warning( + "Caldata without having read header: %i: %s", i, line) + self._append_match(m, header, line, i) + return self + def insert(self, name: str, dp: Datapoint): if name not in {'short', 'open', 'load', 'through', 'thrurefl', 'isolation'}: @@ -392,7 +423,6 @@ class Calibration: * dp11.z - i["delta_e"](dp.freq))) return Datapoint(dp.freq, s21.real, s21.imag) - # TODO: implement tests def save(self, filename: str): self.dataset.notes = "\n".join(self.notes) if not self.isValid1Port(): @@ -400,58 +430,8 @@ class Calibration: with open(filename, mode="w", encoding='utf-8') as calfile: calfile.write(str(self.dataset)) - # TODO: implement tests def load(self, filename): self.source = os.path.basename(filename) - self.dataset = CalDataSet() - self.notes = [] - - header = "" - cols = { - "": (), - "sol": ("short", "open", "load"), - "short": ("short", "open", "load", - "through", "isolation"), - "long": ("short", "open", "load", - "through", "thrurefl", "isolation"), - - } with open(filename, encoding='utf-8') as calfile: - for i, line in enumerate(calfile): - line = line.strip() - if line.startswith("!"): - note = line[2:] - self.notes.append(note) - continue - if m := RXP_CAL_HEADER.search(line): - header = "sol" - if m.group("through"): - header = ( - "long" if m.group("thrurefl") else "short") - columns = cols[header] - logger.debug("found %s header type", header) - continue - if line.startswith("#"): - continue - if not header: - logger.warning( - "Warning: Read line without having read header: %s", - line) - continue - m = RXP_CAL_LINE[header].search(line) - if not m: - logger.warning("Illegal data in cal file. Line %i" - " (Fix line or header)", i + 1) - continue - if (header == "short" and not m.group(8) and - columns != cols["sol"]): - logger.debug("only SOL cal data") - columns = cols["sol"] - cal = m.groupdict() - - for name in columns: - self.dataset.insert( - name, - Datapoint(int(cal["freq"]), - float(cal[f"{name}r"]), - float(cal[f"{name}i"]))) + self.dataset = CalDataSet().from_str(calfile.read()) + self.notes = self.dataset.notes.splitlines()