diff --git a/Code_Python/filters.py b/Code_Python/filters.py index 1c0e2e4..d429f8d 100644 --- a/Code_Python/filters.py +++ b/Code_Python/filters.py @@ -22,51 +22,39 @@ na Number of coefficients (denominator) Notes: - the filter is applied starting from index. -- non filtered data are set equal to the original input, i.e. +- non filtered data are set equal to the original input, i.e. Y[0:idx-1,:] = X[0:idx-1,:] -Filters (ref. [1]) ------------------- +Filters: -Generic b, a Generic case -SMA N Simple Moving Average -EMA N Exponential Moving Average -WMA N Weighted moving average -MSMA N Modified Simple Moving Average -MLSQ N Modified Least-Squares Quadratic (N = 5, 7, 9, 11) -ButterOrig P, N Butterworth original (N = 2, 3) -ButterMod P, N Butterworth modified (N = 2, 3) -SuperSmoother P, N Super smoother (N = 2, 3) -GaussLow P, N Gauss, low pass (P > 1) -GaussHigh P, N Gauss, high pass (P > 4) -Decycler P Decycler -DecyclerOsc P1, P2 Decycle oscillator - - - -ZEMA1 N, K, Vn Zero-lag EMA (type 1) -ZEMA2 N, K Zero-lag EMA (type 2) -MEMA N, Ns Modified EMA (with cubic velocity extimation) -PassBand P, delta Pass band filter -StopBand P, delta Stop band filter -InstTrendline alpha Instantaneous trendline -SincFunction N Sinc function (N > 1, cut off at 0.5/N) -Roofing P1, P2 Gauss,HP,2nd,P1 + Supersmoother,P2 +Generic b,a Generic case +SMA N Simple Moving Average +EMA N/alpha Exponential Moving Average +WMA N Weighted moving average +MSMA N Modified Simple Moving Average +MLSQ N Modified Least-Squares Quadratic (N=5,7,9,11) +ButterOrig P,N Butterworth original (N=2,3) +ButterMod P,N Butterworth modified (N=2,3) +SuperSmooth P,N Super smoother (N=2,3) +GaussLow P,N Gauss low pass (P>=2) +GaussHigh P,N Gauss high pass (P>=5) +BandPass P,delta Band-pass filter +BandStop P,delta Band-stop filter +ZEMA1 N/alpha,K,Vn Zero-lag EMA (type 1) +ZEMA2 N/alpha,K Zero-lag EMA (type 2) +InstTrend N/alpha Instantaneous trendline +SincFunction N Sinc function +Decycler P Decycler, 1-GaussHigh (P>=5) +DecyclerOsc P1,P2 Decycle oscillator, GH(P1) - GH(P2), (P1>=5) N Order/smoothing factor/number of previous samples alpha Damping term P, P1, P2 Cut-off/critical period (50% power loss, -3 dB) - - -K Coefficient/gain -Vn Look back bar for the momentum -delta Band centered in P and in percent - (0.3 => 30% of P, = 0.3*P, if P = 10 => 0.3*10 = 3) -nt Times the filter is called (order) -Ns Look back bar/skip factor - - +delta Band centered in P and in fraction + (30% of P => 0.3, = 0.3*P, if P = 12 => 0.3*12 = 4) +K Coefficient/gain +Vn Look back sample (for the momentum) """ import sys @@ -88,12 +76,12 @@ def filter_data(X, b, a): tmp = np.zeros(n_series) for j in range(nb): - tmp = tmp + b[j] * X[i-j,:] # Numerator term + tmp = tmp + b[j] * X[i-j, :] # Numerator term for j in range(1, na): tmp = tmp - a[j] * Y[i-j, :] # Denominator term - Y[i,:] = tmp / a[0] + Y[i, :] = tmp / a[0] return Y, idx @@ -129,7 +117,7 @@ class Filter: def EMA(self, N=10, alpha=None): """ Exponential moving average (?? order, IIR, pass ??). - + If not given, is determined as equivalent to a N-SMA. """ if (alpha is None): @@ -142,10 +130,10 @@ class Filter: def WMA(self, N=10): """ Weighted moving average (?? order, FIR, pass ??). - + Example: N = 5 --> [5.0, 4.0, 3.0, 2.0, 1.0] / 15.0 """ - w = np.arange(N,0,-1) + w = np.arange(N, 0, -1) b = w / np.sum(w) a = np.array([1.0]) Y, self.idx = filter_data(self.X, b, a) @@ -154,7 +142,7 @@ class Filter: def MSMA(self, N=10): """ Modified simple moving average (?? order, FIR, pass ??). - + Example: N = 4 --> [0.5, 1.0, 1.0, 1.0, 0.5] / 4.0 """ w = np.ones(N+1) @@ -168,8 +156,8 @@ class Filter: def MLSQ(self, N=5): """ Modified simple moving average (?? order, FIR, pass ??). - - Only N = 5, 7, 9, and 11 are implemented. If not return the unfiltered + + Only N = 5, 7, 9, and 11 are implemented. If not returns the unfiltered dataset. """ if (N == 5): @@ -177,13 +165,14 @@ class Filter: elif (N == 7): b = np.array([1.0, 6.0, 12.0, 14.0, 12.0, 6.0, 1.0]) / 52.0 elif (N == 9): - b = np.array([-1.0, 28.0, 78.0, 108.0, 118.0, 108.0, 78.0, 28.0, \ + b = np.array([-1.0, 28.0, 78.0, 108.0, 118.0, 108.0, 78.0, 28.0, -1.0]) / 544.0 - elif (N == 11): - b = np.array([-11.0, 18.0, 88.0, 138.0, 168.0, 178.0, 168.0, \ + elif (N == 11): + b = np.array([-11.0, 18.0, 88.0, 138.0, 168.0, 178.0, 168.0, 138.0, 88.0, 18.0, -11.0]) / 980.0 else: Y = self.X.copy() + self.idx = 0 return Y a = np.array([1.0]) Y, self.idx = filter_data(self.X, b, a) @@ -193,7 +182,7 @@ class Filter: """ Butterworth original version (?? order, IIR, pass ??). - Only N = 2 and 3 are implemented. If not return the unfiltered dataset. + Only N = 2 and 3 are implemented. If not returns the unfiltered dataset. """ if (N == 2): beta = np.exp(-np.sqrt(2.0) * np.pi / P) @@ -205,20 +194,20 @@ class Filter: alpha = 2.0 * beta * np.cos(np.sqrt(3.0) * np.pi / P) b = np.array([1.0, 3.0, 3.0, 1.0]) \ * (1.0 - alpha + beta ** 2.0) * (1.0 - beta ** 2.0) / 8.0 - a = np.array([1.0, - (alpha + beta ** 2.0), \ + a = np.array([1.0, - (alpha + beta ** 2.0), (1.0 + alpha) * beta ** 2.0, - beta ** 4.0]) else: Y = self.X.copy() + self.idx = 0 return Y Y, self.idx = filter_data(self.X, b, a) return Y - def ButterMod(self, N=2, P=10): """ Butterworth modified version (?? order, IIR, pass ??). - Only N = 2 and 3 are implemented. If not return the unfiltered dataset. + Only N = 2 and 3 are implemented. If not returns the unfiltered dataset. """ if (N == 2): beta = np.exp(-np.sqrt(2.0) * np.pi / P) @@ -229,20 +218,20 @@ class Filter: beta = np.exp(-np.pi / P) alpha = 2.0 * beta * np.cos(np.sqrt(3.0) * np.pi / P) b = np.array([1.0 - alpha * (1.0 - beta ** 2.0) - beta ** 4.0]) - a = np.array([1.0, - (alpha + beta ** 2.0), \ + a = np.array([1.0, - (alpha + beta ** 2.0), (1.0 + alpha) * beta ** 2.0, - beta ** 4.0]) else: Y = self.X.copy() + self.idx = 0 return Y Y, self.idx = filter_data(self.X, b, a) return Y - - def SuperSmoother(self, N=2, P=10): + def SuperSmooth(self, N=2, P=10): """ - SuperSmoother (?? order, IIR, pass ??). - - Only N = 2 and 3 are implemented. If not return the unfiltered dataset. + SuperSmooth (?? order, IIR, pass ??). + + Only N = 2 and 3 are implemented. If not returns the unfiltered dataset. """ if (N == 2): beta = np.exp(-np.sqrt(2.0) * np.pi / P) @@ -255,42 +244,45 @@ class Filter: alpha = 2.0 * beta * np.cos(1.738 * np.pi / P) w = 1.0 - alpha * (1.0 - beta ** 2.0) - beta ** 4.0 b = np.array([w, w]) / 2.0 - a = np.array([1.0, - (alpha + beta ** 2.0), \ + a = np.array([1.0, - (alpha + beta ** 2.0), (1.0 + alpha) * beta ** 2.0, - beta ** 4.0]) else: Y = self.X.copy() + self.idx = 0 return Y Y, self.idx = filter_data(self.X, b, a) return Y - def GaussLow(self, P=2, N=1): + def GaussLow(self, N=1, P=2): """ Gauss low pass (IIR, N-th order, low pass). - Must be P > 1. If not return the unfiltered dataset. + Must be P > 1. If not returns the unfiltered dataset. """ if (P < 2): Y = self.X.copy() + self.idx = 0 return Y A = 2.0 ** (1.0 / N) - 1.0 B = 4.0 * np.sin(np.pi / P) ** 2.0 - C = 2.0 * (np.cos(2.0 * np.pi/ P) - 1.0) + C = 2.0 * (np.cos(2.0 * np.pi / P) - 1.0) alpha = (-B + np.sqrt(B ** 2.0 - 4.0 * A * C)) / (2.0 * A) b = np.array([alpha]) a = np.array([1.0, - (1.0 - alpha)]) Y = self.X.copy() for i in range(N): Y, self.idx = filter_data(Y, b, a) - return Y, b, a + return Y - def GaussHigh(self, P=5, N=1): + def GaussHigh(self, N=1, P=5): """ Gauss high pass (IIR, Nth order, high pass). - - Must be P > 4. If not return the unfiltered dataset. + + Must be P > 4. If not returns the unfiltered dataset. """ if (P < 5): Y = self.X.copy() + self.idx = 0 return Y A = 2.0 ** (1.0 / N) * np.sin(np.pi / P) ** 2.0 - 1.0 B = 2.0 * (2.0 ** (1.0 / N) - 1.0) * (np.cos(2.0 * np.pi / P) - 1.0) @@ -301,76 +293,38 @@ class Filter: Y = self.X - self.X[0, :] for i in range(N): Y, self.idx = filter_data(Y, b, a) - return Y, b, a - - - - def Decycler(self, P=10): - """ - Decycler (?? order, IIR, pass ??). Gauss,HP,1st,P - """ - pass - - # % Built subtracting high pass Gauss filter from 1 (only order 1) - # case 'Decycler' - # P = param(1); - # [TMP,nLast] = FiltersBasic(IN,'GaussHigh',[P 1]); - # OUT = IN - TMP; - - def DecyclerOsc(self, Pmin=1, Pmax=5): - """ - Decycler (?? order, IIR, pass ??). Gauss,HP,2nd,Pmax - Gauss,HP,2nd,Pmin - """ - pass - - # % (Gauss, HP, 2nd, Pmax - Gauss, HP, 2nd Pmin) - # % Pmax = larger cut off period - # % Pmin = smaller cut off period - # case 'DecyclerOsc' - # P1 = min(param); - # P2 = max(param); - # [TMP1,nLast] = FiltersBasic(IN,'GaussHigh',[P1 2]); - # [TMP2,nLast] = FiltersBasic(IN,'GaussHigh',[P2 2]); - # OUT = TMP2 - TMP1; - - - - - def InstTrend(self, alpha=0.5): - """ - Instantaneous Trendline (2nd order, IIR, low pass, Ehlers.). - """ - b = np.array([alpha - alpha ** 2.0 / 4.0, alpha ** 2.0 / 2.0, - - alpha + 3.0 * alpha ** 2.0 / 4.0]) - a = np.array([1.0, - 2.0 * (1.0 - alpha), (1.0 - alpha) ** 2.0]) - Y, self.idx = filter_data(self.X, b, a) return Y - def PassBand(self, P=5, delta=0.3): + def BandPass(self, P=5, delta=0.3): """ - Pass Band (type ???). + Band-pass (type, order, IIR). + + Example: delta = 0.3, P = 12 + (30% of P => 0.3, = 0.3*P, if P = 12 => 0.3*12 = 4) """ beta = np.cos(2.0 * np.pi / P) - gamma = np.cos(4.0 * np.pi * delta) / P + gamma = np.cos(4.0 * np.pi * delta / P) alpha = 1.0 / gamma - np.sqrt(1.0 / gamma ** 2 - 1.0) b = np.array([(1.0 - alpha) / 2.0, 0.0, - (1.0 - alpha) / 2.0]) a = np.array([1.0, - beta * (1.0 + alpha), alpha]) Y, self.idx = filter_data(self.X, b, a) return Y - def StopBand(self, P=5, delta=0.3): + def BandStop(self, P=5, delta=0.3): """ - Stop Band - """ - beta = cos(2.0*pi/float(P)) - gamma = cos(2.0*pi*(2.0*delta)/float(P)) - alpha = 1.0/gamma - sqrt(1.0/gamma**2 - 1.0) - b = np.array([(1.0+alpha)/2.0, -2.0*beta*(1.0+alpha)/2.0, - (1.0+alpha)/2.0]) - a = np.array([1.0, -beta*(1.0+alpha), alpha]) - Y, self.idx = Generalized(self.X, b, a) - return Y + band-stop (type, order, IIR) + Example: delta = 0.3, P = 12 + (30% of P => 0.3, = 0.3*P, if P = 12 => 0.3*12 = 4) + """ + beta = np.cos(2.0 * np.pi / P) + gamma = np.cos(4.0 * np.pi * delta / P) + alpha = 1.0 / gamma - np.sqrt(1.0 / gamma ** 2 - 1.0) + b = np.array([(1.0 + alpha) / 2.0, - beta * (1.0 + alpha), + (1.0 + alpha) / 2.0]) + a = np.array([1.0, -beta * (1.0 + alpha), alpha]) + Y, self.idx = filter_data(self.X, b, a) + return Y def ZEMA1(self, N=10, alpha=None, K=1.0, Vn=5): """ @@ -380,55 +334,81 @@ class Filter: """ if (alpha is None): alpha = 2.0 / (N + 1.0) - alpha = 2.0 / (float(N) + 1.0) b = np.zeros(Vn+1) b[0] = alpha * (1.0 + K) - b[-1] = - alpha * K - a = np.array([1.0, -(1.0-alpha)]) - Y, self.idx = Generalized(self.X, b, a) + b[Vn] = - alpha * K + a = np.array([1.0, - (1.0 - alpha)]) + Y, self.idx = filter_data(self.X, b, a) return Y + def ZEMA2(self, N=10, alpha=None, K=1.0): + """ + Zero lag Exponential Moving Average (type 2). + If not given, is determined as equivalent to a N-SMA. + """ + if (alpha is None): + alpha = 2.0 / (N + 1.0) + b = np.array([alpha * (1.0 + K)]) + a = np.array([1.0, alpha * K - (1.0 - alpha)]) + Y, self.idx = filter_data(self.X, b, a) + return Y -# % Zero lag Exponential Moving Average (type 2) -# % alpha is determined as equivalent to a N-SMA -# case 'ZEMA2' -# N = param(1); -# alpha = 2/(N+1); -# K = param(2); -# b = alpha*(1+K); -# a = [1 alpha*K-(1-alpha)]; -# [OUT,nLast] = GeneralizedFilter(IN,b,a,3); - + def InstTrend(self, N=10, alpha=None): + """ + Instantaneous Trendline (2nd order, IIR, low pass). - - - -# % Sinc function -# case 'SincFunction' -# N = param(1); -# nel = 50; -# k=1:nel-1; -# b = [ 1/N sin(pi*k/N)./(pi*k) ]; -# a = 1; -# [OUT,nLast] = GeneralizedFilter(IN,b,a,1); - - # % Roofing filter: Gauss high pass 2nd order filter + SuperSmoother - # % P1 = cut off period for GaussHigh - # % P2 = cut off period for SuperSmoother - # case 'Roofing' - # P1 = param(1); - # P2 = param(2); - # [TMP,nLast] = FiltersBasic(IN,'GaussHigh',[P1 2]); - # [OUT,nLast] = FiltersBasic(TMP,'SuperSmoother',[P2 1]); + If not given, is determined as equivalent to a N-SMA. + """ + if (alpha is None): + alpha = 2.0 / (N + 1.0) + b = np.array([alpha - alpha ** 2.0 / 4.0, alpha ** 2.0 / 2.0, + - alpha + 3.0 * alpha ** 2.0 / 4.0]) + a = np.array([1.0, - 2.0 * (1.0 - alpha), (1.0 - alpha) ** 2.0]) + Y, self.idx = filter_data(self.X, b, a) + return Y + def SincFunction(self, N=10, nel=10): + """ + Sinc function (order, FIR, pass). - # % MEMA - Uses cubic velocity extimation - # case 'MEMA' - # N = param(1); - # Ns = param(2); - # alpha = 2/(N+1); - # [Vel,Acc,nLast] = FiltersMak(IN,'Cubic',Ns); - # for i = nLast:-1:1 - # OUT(i) = alpha*(IN(i)+Vel(i)/Ns) + (1-alpha)*OUT(i+1); - # end + (N > 1, cut off at 0.5/N) + """ + b = np.zeros(nel) + b[0] = 1.0 / N + k = np.arange(1, nel) + b[1:] = np.sin(np.pi * k / N) / (np.pi * k) + a = np.array([1.0]) + Y, self.idx = filter_data(self.X, b, a) + return Y, b, a + + def Decycler(self, P=10): + """ + Decycler (?? order, IIR, pass ??). Gauss,HP,1st,P + + Built subtracting high pass Gauss filter from 1 (order 1) + Must be P > 4. If not returns the unfiltered dataset. + """ + if (P < 5): + Y = self.X.copy() + self.idx = 0 + return Y + Y = self.X - self.GaussHigh(N=1, P=P) + return Y + + def DecyclerOsc(self, P1=5, P2=10): + """ + DecyclerOsc (?? order 2, IIR, pass ??). + + (Gauss, HP, 2nd order, Pmax - Gauss, HP, 2nd order, Pmin) + P1 = 1st cut off period, P2 = 2nd cut off period. Automatically fixed. + Must be P1, P2 > 4. If not returns the unfiltered dataset. + """ + P_low = np.amin([P1, P2]) + P_high = np.amax([P1, P2]) + if (P1 < 5): + Y = self.X.copy() + self.idx = 0 + return Y + Y = self.GaussHigh(N=2, P=P_low) - self.GaussHigh(N=2, P=P_high) + return Y diff --git a/Code_Python/test.py b/Code_Python/test.py index 2223f87..a0ae1e5 100644 --- a/Code_Python/test.py +++ b/Code_Python/test.py @@ -7,11 +7,13 @@ ToDo: - use NaN/input values for points not filtered? - return idx? - util to test filter (impulse, utils) -- warning in filter when wrong order? +- warning in filter when wrong order? or save flag with true/false if computed - use self.a and self.b - remove a and b from plots - in comments write what filters do - is necessary to copy X for Y untouched? +- decide default values in functions +- check conditions on P and N """ import sys @@ -40,9 +42,15 @@ spx = flt.Filter(data) # aa = np.array([1.0, alpha - 1.0]) -res, bb, aa = spx.GaussHigh(N=3, P=10) +res, bb, aa = spx.SincFunction(2, 50) print(bb) print(aa) - utl.plot_frequency_response(bb, aa) utl.plot_lag_response(bb, aa) + +# res = spx.DecyclerOsc(30, 60) +# print(res[0:10, :]) +signals = (spx.X, res) +print(spx.idx) +utl.plot_signals(signals) +# print(spx.X[0:20]) \ No newline at end of file diff --git a/Code_Python/utils.py b/Code_Python/utils.py index 9e2235c..8a47d0d 100644 --- a/Code_Python/utils.py +++ b/Code_Python/utils.py @@ -117,6 +117,7 @@ def plot_signals(signals): plt.plot(signal) plt.grid(b=True) + plt.xlim(0, 100) plt.show() @@ -134,6 +135,7 @@ def plot_frequency_response(b, a=1.0): plt.axhline(-3.0, lw=1.5, ls='--', C='r') plt.grid(b=True) plt.xlim(np.amin(wf), np.amax(wf)) + # plt.ylim(-40.0, 0.0) plt.xlabel('$\omega$ [rad/sample]') plt.ylabel('$h$ [db]') plt.title('b = ' + np.array_str(np.around(b, decimals=2)) \