Move cal data parsing to CalDataSet class

pull/607/head
Holger Müller 2023-02-28 18:50:18 +01:00
rodzic 92a8a0e39d
commit 6f6f6c65e1
1 zmienionych plików z 66 dodań i 86 usunięć

Wyświetl plik

@ -35,39 +35,6 @@ IDEAL_OPEN = complex(1, 0)
IDEAL_LOAD = complex(0, 0)
IDEAL_THROUGH = complex(1, 0)
RXP_CAL_LINE = {
"sol": re.compile(r"""
^ \s*
(?P<freq>\d+) \s+
(?P<shortr>[-0-9Ee.]+) \s+ (?P<shorti>[-0-9Ee.]+) \s+
(?P<openr>[-0-9Ee.]+) \s+ (?P<openi>[-0-9Ee.]+) \s+
(?P<loadr>[-0-9Ee.]+) \s+ (?P<loadi>[-0-9Ee.]+)
\s* $
""", re.VERBOSE),
"short": re.compile(r"""
^ \s*
(?P<freq>\d+) \s+
(?P<shortr>[-0-9Ee.]+) \s+ (?P<shorti>[-0-9Ee.]+) \s+
(?P<openr>[-0-9Ee.]+) \s+ (?P<openi>[-0-9Ee.]+) \s+
(?P<loadr>[-0-9Ee.]+) \s+ (?P<loadi>[-0-9Ee.]+)
( \s+ # optional for backword compatibility
(?P<throughr>[-0-9Ee.]+) \s+ (?P<throughi>[-0-9Ee.]+) \s+
(?P<isolationr>[-0-9Ee.]+) \s+ (?P<isolationi>[-0-9Ee.]+)
)? \s* $
""", re.VERBOSE),
"long": re.compile(r"""
^ \s*
(?P<freq>\d+) \s+
(?P<shortr>[-0-9Ee.]+) \s+ (?P<shorti>[-0-9Ee.]+) \s+
(?P<openr>[-0-9Ee.]+) \s+ (?P<openi>[-0-9Ee.]+) \s+
(?P<loadr>[-0-9Ee.]+) \s+ (?P<loadi>[-0-9Ee.]+) \s+
(?P<throughr>[-0-9Ee.]+) \s+ (?P<throughi>[-0-9Ee.]+) \s+
(?P<thrureflr>[-0-9Ee.]+) \s+ (?P<thrurefli>[-0-9Ee.]+) \s+
(?P<isolationr>[-0-9Ee.]+) \s+ (?P<isolationi>[-0-9Ee.]+)
\s* $
""", re.VERBOSE),
}
RXP_CAL_HEADER = re.compile(r"""
^ \# \s+ Hz \s+
ShortR \s+ ShortI \s+ OpenR \s+ OpenI \s+
@ -76,6 +43,18 @@ RXP_CAL_HEADER = re.compile(r"""
(?P<thrurefl> \s+ ThrureflR \s+ ThrureflI)?
(?P<isolation> \s+ IsolationR \s+ IsolationI)?
\s* $
""", re.VERBOSE | re.IGNORECASE)
RXP_CAL_LINE = re.compile(r"""
^ \s*
(?P<freq>\d+) \s+
(?P<shortr>[-0-9Ee.]+) \s+ (?P<shorti>[-0-9Ee.]+) \s+
(?P<openr>[-0-9Ee.]+) \s+ (?P<openi>[-0-9Ee.]+) \s+
(?P<loadr>[-0-9Ee.]+) \s+ (?P<loadi>[-0-9Ee.]+)
( \s+ (?P<throughr>[-0-9Ee.]+) \s+ (?P<throughi>[-0-9Ee.]+))?
( \s+ (?P<thrureflr>[-0-9Ee.]+) \s+ (?P<thrurefli>[-0-9Ee.]+))?
( \s+ (?P<isolationr>[-0-9Ee.]+) \s+ (?P<isolationi>[-0-9Ee.]+))?
\s* $
""", re.VERBOSE)
logger = logging.getLogger(__name__)
@ -170,6 +149,58 @@ class CalDataSet(UserDict):
if self.complete1port() else ""
)
def _append_match(self, m: re.Match, header: str,
line_nr: int, line: str) -> None:
cal = m.groupdict()
columns = {
col[:-1] for col in cal.keys() if cal[col] and col != "freq"
}
if "through" in columns and header == "sol":
logger.warning("Through data with sol header. %i: %s",
line_nr, line)
# fix short data (without thrurefl)
if "thrurefl" in columns and "isolation" not in columns:
cal["isolationr"] = cal["thrureflr"]
cal["isolationi"] = cal["thrurefli"]
cal["thrureflr"], cal["thrurefli"] = None, None
for name in columns:
self.insert(
name,
Datapoint(int(cal["freq"]),
float(cal[f"{name}r"]),
float(cal[f"{name}i"])))
def from_str(self, text: str) -> 'CalDataSet':
# reset data
self.notes = ""
self.data = defaultdict(CalData)
header = ""
# parse text
for i, line in enumerate(text.splitlines(), 1):
line = line.strip()
if line.startswith("!"):
self.notes += f"{line[2:]}\n"
continue
if m := RXP_CAL_HEADER.search(line):
if header:
logger.warning(
"Duplicate header in cal data. %i: %s", i, line)
header = "through" if m.group("through") else "sol"
continue
if not line or line.startswith("#"):
continue
m = RXP_CAL_LINE.search(line)
if not m:
logger.warning("Illegal caldata. Line %i: %s", i, line)
continue
if not header:
logger.warning(
"Caldata without having read header: %i: %s", i, line)
self._append_match(m, header, line, i)
return self
def insert(self, name: str, dp: Datapoint):
if name not in {'short', 'open', 'load',
'through', 'thrurefl', 'isolation'}:
@ -392,7 +423,6 @@ class Calibration:
* dp11.z - i["delta_e"](dp.freq)))
return Datapoint(dp.freq, s21.real, s21.imag)
# TODO: implement tests
def save(self, filename: str):
self.dataset.notes = "\n".join(self.notes)
if not self.isValid1Port():
@ -400,58 +430,8 @@ class Calibration:
with open(filename, mode="w", encoding='utf-8') as calfile:
calfile.write(str(self.dataset))
# TODO: implement tests
def load(self, filename):
self.source = os.path.basename(filename)
self.dataset = CalDataSet()
self.notes = []
header = ""
cols = {
"": (),
"sol": ("short", "open", "load"),
"short": ("short", "open", "load",
"through", "isolation"),
"long": ("short", "open", "load",
"through", "thrurefl", "isolation"),
}
with open(filename, encoding='utf-8') as calfile:
for i, line in enumerate(calfile):
line = line.strip()
if line.startswith("!"):
note = line[2:]
self.notes.append(note)
continue
if m := RXP_CAL_HEADER.search(line):
header = "sol"
if m.group("through"):
header = (
"long" if m.group("thrurefl") else "short")
columns = cols[header]
logger.debug("found %s header type", header)
continue
if line.startswith("#"):
continue
if not header:
logger.warning(
"Warning: Read line without having read header: %s",
line)
continue
m = RXP_CAL_LINE[header].search(line)
if not m:
logger.warning("Illegal data in cal file. Line %i"
" (Fix line or header)", i + 1)
continue
if (header == "short" and not m.group(8) and
columns != cols["sol"]):
logger.debug("only SOL cal data")
columns = cols["sol"]
cal = m.groupdict()
for name in columns:
self.dataset.insert(
name,
Datapoint(int(cal["freq"]),
float(cal[f"{name}r"]),
float(cal[f"{name}i"])))
self.dataset = CalDataSet().from_str(calfile.read())
self.notes = self.dataset.notes.splitlines()