kopia lustrzana https://github.com/erdewit/HiFiScan
Porównaj commity
2 Commity
0161ef225f
...
dfd571243e
Autor | SHA1 | Data |
---|---|---|
Ewald de Wit | dfd571243e | |
Ewald de Wit | 5860ab2f33 |
15
README.rst
15
README.rst
|
@ -141,12 +141,17 @@ it has this frequency response:
|
|||
|
||||
.. image:: images/mic_response.png
|
||||
|
||||
With EasyEffects we make the following correction.
|
||||
The correction can be applied either to the input or the
|
||||
output. Here it's applied to the output, as long as it is
|
||||
turned off after the calibration that's OK.
|
||||
We create a text file that describes the microphone's frequency response::
|
||||
|
||||
.. image:: images/mic_correction.png
|
||||
20 -1.5
|
||||
150 0
|
||||
4500 0
|
||||
10000 4
|
||||
17000 0
|
||||
20000 -2
|
||||
|
||||
The file is imported with "Corrections... -> Mic Calibration -> Load".
|
||||
Manifucturer-supplied calibration files can be imported here as well.
|
||||
|
||||
Measuring the spectrum bears out the concerning lack
|
||||
of treble:
|
||||
|
|
61
chirp.ipynb
61
chirp.ipynb
File diff suppressed because one or more lines are too long
|
@ -3,4 +3,4 @@
|
|||
from hifiscan.analyzer import (
|
||||
Analyzer, XY, geom_chirp, linear_chirp, resample, smooth, taper,
|
||||
tone, window)
|
||||
from hifiscan.audio import Audio, write_wav
|
||||
from hifiscan.audio import Audio, read_correction, write_wav
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
import array
|
||||
from functools import lru_cache
|
||||
from typing import NamedTuple, Tuple
|
||||
from typing import List, NamedTuple, Optional, Tuple
|
||||
|
||||
from numba import njit
|
||||
|
||||
|
@ -14,6 +14,9 @@ class XY(NamedTuple):
|
|||
y: np.ndarray
|
||||
|
||||
|
||||
Correction = List[Tuple[float, float]]
|
||||
|
||||
|
||||
class Analyzer:
|
||||
"""
|
||||
Analyze the system response to a chirp stimulus.
|
||||
|
@ -47,7 +50,9 @@ class Analyzer:
|
|||
time: float
|
||||
|
||||
def __init__(
|
||||
self, f0: int, f1: int, secs: float, rate: int, ampl: float):
|
||||
self, f0: int, f1: int, secs: float, rate: int, ampl: float,
|
||||
calibration: Optional[Correction] = None,
|
||||
target: Optional[Correction] = None):
|
||||
self.chirp = ampl * geom_chirp(f0, f1, secs, rate)
|
||||
self.x = np.concatenate([
|
||||
self.chirp,
|
||||
|
@ -58,9 +63,12 @@ class Analyzer:
|
|||
self.fmin = min(f0, f1)
|
||||
self.fmax = max(f0, f1)
|
||||
self.time = 0
|
||||
self._calibration = calibration
|
||||
self._target = target
|
||||
|
||||
# Cache the methods in a way that allows garbage collection of self.
|
||||
for meth in ['X', 'Y', 'H', 'H2', 'h', 'h_inv', 'spectrum']:
|
||||
for meth in ['X', 'Y', 'H', 'H2', 'h', 'h_inv', 'spectrum',
|
||||
'frequency', 'calibration', 'target']:
|
||||
setattr(self, meth, lru_cache(getattr(self, meth)))
|
||||
|
||||
def findMatch(self, recording: array.array) -> bool:
|
||||
|
@ -84,16 +92,42 @@ class Analyzer:
|
|||
"""See if time to find a match has exceeded the timeout limit."""
|
||||
return self.time > self.secs + self.TIMEOUT_SECS
|
||||
|
||||
def freqRange(self, size: int) -> slice:
|
||||
def frequency(self) -> np.ndarray:
|
||||
return np.linspace(0, self.rate // 2, self.X().size)
|
||||
|
||||
def freqRange(self, size: int = 0) -> slice:
|
||||
"""
|
||||
Return range slice of the valid frequency range for an array
|
||||
of given size.
|
||||
"""
|
||||
size = size or self.X().size
|
||||
nyq = self.rate / 2
|
||||
i0 = int(0.5 + size * self.fmin / nyq)
|
||||
i1 = int(0.5 + size * self.fmax / nyq)
|
||||
return slice(i0, i1 + 1)
|
||||
|
||||
def calibration(self) -> Optional[np.ndarray]:
|
||||
return self.interpolateCorrection(self._calibration)
|
||||
|
||||
def target(self) -> Optional[np.ndarray]:
|
||||
return self.interpolateCorrection(self._target)
|
||||
|
||||
def interpolateCorrection(self, corr: Correction) -> Optional[np.ndarray]:
|
||||
"""
|
||||
Logarithmically interpolate the correction to a full-sized array.
|
||||
"""
|
||||
if not corr:
|
||||
return None
|
||||
corr = sorted(c for c in corr if c[0] > 0)
|
||||
a = np.array(corr, 'd').T
|
||||
logF = np.log(a[0])
|
||||
db = a[1]
|
||||
freq = self.frequency()
|
||||
interp = np.empty_like(freq)
|
||||
interp[1:] = np.interp(np.log(freq[1:]), logF, db)
|
||||
interp[0] = 0
|
||||
return interp
|
||||
|
||||
def X(self) -> np.ndarray:
|
||||
return np.fft.rfft(self.x)
|
||||
|
||||
|
@ -109,14 +143,16 @@ class Analyzer:
|
|||
Y = self.Y()
|
||||
# H = Y / X
|
||||
H = Y * np.conj(X) / (np.abs(X) ** 2 + 1e-3)
|
||||
freq = np.linspace(0, self.rate // 2, H.size)
|
||||
if self._calibration:
|
||||
H *= 10 ** (-self.calibration() / 20)
|
||||
freq = self.frequency()
|
||||
return XY(freq, H)
|
||||
|
||||
def H2(self, smoothing: float):
|
||||
"""Calculate smoothed squared transfer function |H|^2."""
|
||||
freq, H = self.H()
|
||||
H = np.abs(H)
|
||||
r = self.freqRange(H.size)
|
||||
r = self.freqRange()
|
||||
H2 = np.empty_like(H)
|
||||
# Perform smoothing on the squared amplitude.
|
||||
H2[r] = smooth(freq[r], H[r] ** 2, smoothing)
|
||||
|
@ -143,7 +179,7 @@ class Analyzer:
|
|||
If 0 then no smoothing is done.
|
||||
"""
|
||||
freq, H2 = self.H2(smoothing)
|
||||
r = self.freqRange(H2.size)
|
||||
r = self.freqRange()
|
||||
return XY(freq[r], 10 * np.log10(H2[r]))
|
||||
|
||||
def h_inv(
|
||||
|
@ -162,6 +198,9 @@ class Analyzer:
|
|||
smoothing: Strength of frequency-dependent smoothing.
|
||||
"""
|
||||
freq, H2 = self.H2(smoothing)
|
||||
# Apply target curve.
|
||||
if self._target:
|
||||
H2 = H2 * 10 ** (-self.target() / 10)
|
||||
# Re-sample to halve the number of samples needed.
|
||||
n = int(secs * self.rate / 2)
|
||||
H = resample(H2, n) ** 0.5
|
||||
|
@ -205,7 +244,7 @@ class Analyzer:
|
|||
"""
|
||||
freq, H2 = self.H2(0)
|
||||
H = H2 ** 0.5
|
||||
r = self.freqRange(H.size)
|
||||
r = self.freqRange()
|
||||
|
||||
tf = resample(corrFactor.y, H.size)
|
||||
resp = 20 * np.log10(tf[r] * H[r])
|
||||
|
@ -218,6 +257,17 @@ class Analyzer:
|
|||
|
||||
return spectrum, spectrum_resamp
|
||||
|
||||
def targetSpectrum(self, spectrum: XY) -> Optional[XY]:
|
||||
if self._target:
|
||||
freq, resp = spectrum
|
||||
r = self.freqRange()
|
||||
target = self.target()[r]
|
||||
target += np.average(resp - target, weights=1 / freq)
|
||||
targetSpectrum = XY(freq, target)
|
||||
else:
|
||||
targetSpectrum = None
|
||||
return targetSpectrum
|
||||
|
||||
|
||||
@lru_cache
|
||||
def tone(f: float, secs: float, rate: int):
|
||||
|
|
|
@ -34,6 +34,8 @@ class App(qt.QMainWindow):
|
|||
self.paused = False
|
||||
self.analyzer = None
|
||||
self.refAnalyzer = None
|
||||
self.calibration = None
|
||||
self.target = None
|
||||
self.saveDir = Path.home()
|
||||
self.loop = asyncio.get_event_loop_policy().get_event_loop()
|
||||
self.task = self.loop.create_task(wrap_coro(self.analyze()))
|
||||
|
@ -52,7 +54,8 @@ class App(qt.QMainWindow):
|
|||
await asyncio.sleep(0.1)
|
||||
continue
|
||||
|
||||
analyzer = hifi.Analyzer(lo, hi, secs, audio.rate, ampl)
|
||||
analyzer = hifi.Analyzer(lo, hi, secs, audio.rate, ampl,
|
||||
self.calibration, self.target)
|
||||
audio.play(analyzer.chirp)
|
||||
async for recording in audio.record():
|
||||
if self.paused:
|
||||
|
@ -79,6 +82,11 @@ class App(qt.QMainWindow):
|
|||
if self.analyzer:
|
||||
spectrum = self.analyzer.spectrum(smoothing)
|
||||
self.spectrumPlot.setData(*spectrum)
|
||||
target = self.analyzer.targetSpectrum(spectrum)
|
||||
if target:
|
||||
self.targetPlot.setData(*target)
|
||||
else:
|
||||
self.targetPlot.clear()
|
||||
if self.refAnalyzer:
|
||||
spectrum = self.refAnalyzer.spectrum(smoothing)
|
||||
self.refSpectrumPlot.setData(*spectrum)
|
||||
|
@ -107,6 +115,11 @@ class App(qt.QMainWindow):
|
|||
spectrum, spectrum_resamp = analyzer.correctedSpectrum(corrFactor)
|
||||
self.simPlot.setData(*spectrum)
|
||||
self.avSimPlot.setData(*spectrum_resamp)
|
||||
target = analyzer.targetSpectrum(spectrum)
|
||||
if target:
|
||||
self.targetSimPlot.setData(*target)
|
||||
else:
|
||||
self.targetSimPlot.clear()
|
||||
|
||||
def screenshot(self):
|
||||
timestamp = dt.datetime.now().strftime('%Y%m%d_%H%M%S')
|
||||
|
@ -174,11 +187,12 @@ class App(qt.QMainWindow):
|
|||
axes = {ori: Axis(ori) for ori in
|
||||
['bottom', 'left', 'top', 'right']}
|
||||
for ax in axes.values():
|
||||
ax.setGrid(255)
|
||||
ax.setGrid(200)
|
||||
self.spectrumPlotWidget = pw = pg.PlotWidget(axisItems=axes)
|
||||
pw.setLabel('left', 'Relative Power [dB]')
|
||||
pw.setLabel('bottom', 'Frequency [Hz]')
|
||||
pw.setLogMode(x=True)
|
||||
self.targetPlot = pw.plot(pen=(255, 0, 0), stepMode='right')
|
||||
self.refSpectrumPlot = pw.plot(pen=(255, 100, 0), stepMode='right')
|
||||
self.spectrumPlot = pw.plot(pen=(0, 255, 255), stepMode='right')
|
||||
self.spectrumPlot.curve.setCompositionMode(
|
||||
|
@ -250,13 +264,14 @@ class App(qt.QMainWindow):
|
|||
|
||||
axes = {ori: Axis(ori) for ori in ['bottom', 'left']}
|
||||
for ax in axes.values():
|
||||
ax.setGrid(255)
|
||||
ax.setGrid(200)
|
||||
self.simPlotWidget = pw = pg.PlotWidget(axisItems=axes)
|
||||
pw.showGrid(True, True, 0.8)
|
||||
pw.setLabel('left', 'Corrected Spectrum')
|
||||
self.simPlot = pg.PlotDataItem(pen=(150, 100, 60), stepMode='right')
|
||||
pw.addItem(self.simPlot, ignoreBounds=True)
|
||||
self.avSimPlot = pw.plot(pen=(255, 255, 200), stepMode='right')
|
||||
self.targetSimPlot = pw.plot(pen=(255, 0, 0), stepMode='right')
|
||||
pw.setLogMode(x=True)
|
||||
splitter.addWidget(pw)
|
||||
|
||||
|
@ -315,6 +330,53 @@ class App(qt.QMainWindow):
|
|||
spectrumButton.setChecked(True)
|
||||
buttons.idClicked.connect(self.stack.setCurrentIndex)
|
||||
|
||||
def loadCalibration():
|
||||
path, _ = qt.QFileDialog.getOpenFileName(
|
||||
self, 'Load mic calibration', str(self.saveDir))
|
||||
if path:
|
||||
cal = hifi.read_correction(path)
|
||||
if cal:
|
||||
self.calibration = cal
|
||||
calAction.setText(path)
|
||||
self.saveDir = Path(path).parent
|
||||
else:
|
||||
clearCalibration()
|
||||
|
||||
def clearCalibration():
|
||||
self.calibration = None
|
||||
calAction.setText('None')
|
||||
|
||||
def loadTarget():
|
||||
path, _ = qt.QFileDialog.getOpenFileName(
|
||||
self, 'Load target curve', str(self.saveDir))
|
||||
if path:
|
||||
target = hifi.read_correction(path)
|
||||
if target:
|
||||
self.target = target
|
||||
targetAction.setText(path)
|
||||
self.saveDir = Path(path).parent
|
||||
else:
|
||||
clearTarget()
|
||||
|
||||
def clearTarget():
|
||||
self.target = None
|
||||
targetAction.setText('None')
|
||||
|
||||
menuBar = qt.QMenuBar()
|
||||
corr = qt.QMenu('Corrections...')
|
||||
corr.addSection('Mic Calibration')
|
||||
corr.addAction('Load', loadCalibration)
|
||||
corr.addAction('Clear', clearCalibration)
|
||||
corr.addSection('Current:')
|
||||
calAction = corr.addAction('None')
|
||||
corr.addSeparator()
|
||||
corr.addSection('Target Curve')
|
||||
corr.addAction('Load', loadTarget)
|
||||
corr.addAction('Clear', clearTarget)
|
||||
corr.addSection('Current:')
|
||||
targetAction = corr.addAction('None')
|
||||
menuBar.addMenu(corr)
|
||||
|
||||
screenshotButton = qt.QPushButton('Screenshot')
|
||||
screenshotButton.setShortcut('S')
|
||||
screenshotButton.setToolTip('<Key S>')
|
||||
|
@ -333,8 +395,10 @@ class App(qt.QMainWindow):
|
|||
|
||||
hbox = qt.QHBoxLayout()
|
||||
hbox.addWidget(spectrumButton)
|
||||
hbox.addSpacing(32)
|
||||
hbox.addSpacing(16)
|
||||
hbox.addWidget(irButton)
|
||||
hbox.addSpacing(64)
|
||||
hbox.addWidget(menuBar)
|
||||
hbox.addStretch(1)
|
||||
hbox.addWidget(screenshotButton)
|
||||
hbox.addSpacing(32)
|
||||
|
|
|
@ -9,6 +9,8 @@ import eventkit as ev
|
|||
import numpy as np
|
||||
import sounddevice as sd
|
||||
|
||||
from hifiscan.analyzer import Correction
|
||||
|
||||
|
||||
class Audio:
|
||||
"""
|
||||
|
@ -104,3 +106,15 @@ def write_wav(path: str, rate: int, sound: np.ndarray):
|
|||
wav.setsampwidth(4)
|
||||
wav.setframerate(rate)
|
||||
wav.writeframes(stereo.tobytes())
|
||||
|
||||
|
||||
def read_correction(path: str) -> Correction:
|
||||
corr = []
|
||||
with open(path, 'r') as f:
|
||||
for line in f.readlines():
|
||||
try:
|
||||
freq, db = line.split()
|
||||
corr.append((float(freq), float(db)))
|
||||
except ValueError:
|
||||
pass
|
||||
return corr
|
||||
|
|
Ładowanie…
Reference in New Issue