Source code for pwtools.signal

"""
Some general signal procressing tools (FFT, correlation). Mostly textbook and
reference implementations plus some utilities.
"""

from itertools import product
import numpy as np
from scipy.fftpack import fft, ifft
from scipy.signal import fftconvolve, gaussian, kaiserord, firwin, lfilter, freqz
from scipy.integrate import trapz
from pwtools import _flib, num


[docs] def fftsample(a, b, mode='f', mirr=False): """Convert size and resolution between frequency and time domain. Convert between maximal frequency to sample (fmax) + desired frequency resolution (df) and the needed number of sample points (N) + time step (dt). The maximal frequency is also called the Nyquist frequency and is 1/2*samplerate. Parameters ---------- a, b: float | mode='f': a=fmax b=df | mode='t': a=dt b=N mode : string, {'f', 't'} | f : frequency mode | t : time mode mirr: bool consider mirroring of the signal at t=0 before Fourier transform Returns ------- mode='f': array([dt, N]) mode='t': array([fmax, df]) Examples -------- >>> # fmax = 100 Hz, df = 1 Hz -> you need 200 steps with dt=0.005 sec >>> fftsample(100, 1, mode='f') array([ 5.00000000e-03, 2.00000000e+03]) >>> fftsample(5e-3, 2e3, mode='t') array([ 100. , 1.]) # If you mirror, you only need 100 steps >>> fftsample(100, 1, mode='f', mirr=True) array([ 5.00000000e-03, 1.00000000e+02]) Notes ----- These relations hold: =========== =========== size resolution =========== =========== N [t] up df [f] down fmax [f] up dt [t] down =========== =========== If you know that the signal in the time domain will be mirrored before FFT (N -> 2*N), you will get 1/2*df (double fine resolution), so 1/2*N is sufficient to get the desired df. Units: In general frequency_unit = 1/time_unit, need not be Hz and s. """ if mode == 'f': fmax, df = a,b if mirr: df *= 2 dt = 0.5/fmax N = 1.0/(df*dt) return np.array([dt, N]) elif mode == 't': dt, N = a, b if mirr: N *= 2 fmax = 0.5/dt df = 1.0/(N*dt) return np.array([fmax, df]) else: raise ValueError("illegal mode, allowed: t, f")
[docs] def dft(a, method='loop'): """Simple straightforward complex DFT algo. Parameters ---------- a : numpy 1d array method : string, {'matmul', 'loop'} Returns ------- (len(a),) array Examples -------- >>> from scipy.fftpack import fft >>> a=np.random.rand(100) >>> sfft=fft(a) >>> dfft1=dft(a, method='loop') >>> dfft2=dft(a, method='matmul') >>> np.testing.assert_array_almost_equal(sfft, dfft1) >>> np.testing.assert_array_almost_equal(sfft, dfft2) Notes ----- This is only a reference implementation and has it's limitations. | 'loop': runs looong | 'matmul': memory limit | => use only with medium size arrays N = len(a) sqrt(complex(-1)) = np.sqrt(-1 + 0*j) = 1j Forward DFT, see [2]_ and [3]_ , scipy.fftpack.fft(): y[k] = sum(n=0...N-1) a[n] * exp(-2*pi*n*k*j/N) k = 0 ... N-1 Backward DFT, see [1]_ eq. 12.1.6, 12.2.2: y[k] = sum(n=0...N-1) a[n] * exp(2*pi*n*k*j/N) k = 0 ... N-1 The algo for method=='matmul' is the matrix mult from [1]_, but as Forward DFT for comparison with scipy. The difference between FW and BW DFT is that the imaginary parts are mirrored at y=0. References ---------- .. [1] Numerical Recipes in Fortran, Second Edition, 1992 .. [2] http://www.fftw.org/doc/The-1d-Real_002ddata-DFT.html .. [3] http://mathworld.wolfram.com/FourierTransform.html """ pi = np.pi N = a.shape[0] # n and k run from 0 ... N-1 nk = np.linspace(0.0, float(N), endpoint=False, num=N) if method == 'loop': fta = np.empty((N,), dtype=complex) for ik,k in enumerate(nk): fta[ik] = np.sum(a*np.exp(-2.0*pi*1.0j*k*nk/float(N))) elif method == 'matmul': # `mat` is the matrix with elements W**(n*k) in [1], eq. 12.2.2 nkmat = nk*nk[:,np.newaxis] mat = np.exp(-2.0*pi*1.0j*nkmat/float(N)) fta = np.dot(mat, a) else: raise ValueError("illegal method '%s'" %method) return fta
[docs] def ezfft(y, dt=1.0): """Simple FFT function for interactive use. Parameters ---------- y : 1d array to fft dt : float time step Returns ------- faxis, fft(y) Examples -------- >>> t = linspace(0,1,200) >>> x = sin(2*pi*10*t) + sin(2*pi*20*t) >>> f,d = signal.ezfft(x, dt=t[1]-t[0]) >>> plot(f,abs(d)) """ assert y.ndim == 1 faxis = np.fft.fftfreq(len(y), dt) split_idx = len(faxis)//2 return faxis[:split_idx], fft(y)[:split_idx]
[docs] def fft_1d_loop(arr, axis=-1): """Like scipy.fft.pack.fft and numpy.fft.fft, perform fft along an axis. Here do this by looping over remaining axes and perform 1D FFTs. This was implemented as a low-memory version like :func:`~pwtools.crys.smooth` to be used in :func:`~pwtools.pydos.pdos`, which fills up the memory for big MD data. But actually it has the same memory footprint as the plain scipy fft routine. Keep it here anyway as a nice reference for how to loop over remaining axes in the ndarray case. """ if axis < 0: axis = arr.ndim - 1 axes = [ax for ax in range(arr.ndim) if ax != axis] # tuple here is 3x faster than generator expression # idxs = (range(arr.shape[ax]) for ax in axes) idxs = tuple(range(arr.shape[ax]) for ax in axes) out = np.empty(arr.shape, dtype=complex) for idx_tup in product(*idxs): sl = [slice(None)] * arr.ndim for idx,ax in zip(idx_tup, axes): sl[ax] = idx tsl = tuple(sl) out[tsl] = fft(arr[tsl]) return out
[docs] def pad_zeros(arr, axis=0, where='end', nadd=None, upto=None, tonext=None, tonext_min=None): """Pad an nd-array with zeros. Default is to append an array of zeros of the same shape as `arr` to arr's end along `axis`. Parameters ---------- arr : nd array axis : the axis along which to pad where : string {'end', 'start'}, pad at the end ("append to array") or start ("prepend to array") of `axis` nadd : number of items to padd (i.e. nadd=3 means padd w/ 3 zeros in case of an 1d array) upto : pad until arr.shape[axis] == upto tonext : bool, pad up to the next power of two (pad so that the padded array has a length of power of two) tonext_min : int, when using `tonext`, pad the array to the next possible power of two for which the resulting array length along `axis` is at least `tonext_min`; the default is tonext_min = arr.shape[axis] Use only one of nadd, upto, tonext. Returns ------- padded array Examples -------- >>> # 1d >>> pad_zeros(a) array([1, 2, 3, 0, 0, 0]) >>> pad_zeros(a, nadd=3) array([1, 2, 3, 0, 0, 0]) >>> pad_zeros(a, upto=6) array([1, 2, 3, 0, 0, 0]) >>> pad_zeros(a, nadd=1) array([1, 2, 3, 0]) >>> pad_zeros(a, nadd=1, where='start') array([0, 1, 2, 3]) >>> # 2d >>> a=arange(9).reshape(3,3) >>> pad_zeros(a, nadd=1, axis=0) array([[0, 1, 2], [3, 4, 5], [6, 7, 8], [0, 0, 0]]) >>> pad_zeros(a, nadd=1, axis=1) array([[0, 1, 2, 0], [3, 4, 5, 0], [6, 7, 8, 0]]) >>> # up to next power of two >>> 2**arange(10) array([ 1, 2, 4, 8, 16, 32, 64, 128, 256, 512]) >>> pydos.pad_zeros(arange(9), tonext=True).shape (16,) """ if tonext == False: tonext = None lst = [nadd, upto, tonext] assert lst.count(None) in [2,3], "`nadd`, `upto` and `tonext` must be " +\ "all None or only one of them not None" if nadd is None: if upto is None: if (tonext is None) or (not tonext): # default nadd = arr.shape[axis] else: tonext_min = arr.shape[axis] if (tonext_min is None) \ else tonext_min # beware of int overflows starting w/ 2**arange(64), but we # will never have such long arrays anyway two_powers = 2**np.arange(30) assert tonext_min <= two_powers[-1], ("tonext_min exceeds " "max power of 2") power = two_powers[np.searchsorted(two_powers, tonext_min)] nadd = power - arr.shape[axis] else: nadd = upto - arr.shape[axis] if nadd == 0: return arr add_shape = list(arr.shape) add_shape[axis] = nadd add_shape = tuple(add_shape) if where == 'end': return np.concatenate((arr, np.zeros(add_shape, dtype=arr.dtype)), axis=axis) elif where == 'start': return np.concatenate((np.zeros(add_shape, dtype=arr.dtype), arr), axis=axis) else: raise Exception("illegal `where` arg: %s" %where)
[docs] def welch(M, sym=1): """Welch window. Function skeleton shamelessly stolen from scipy.signal.bartlett() and others.""" if M < 1: return np.array([]) if M == 1: return np.ones(1,dtype=float) odd = M % 2 if not sym and not odd: M = M+1 n = np.arange(0,M) w = 1.0-((n-0.5*(M-1))/(0.5*(M-1)))**2.0 if not sym and not odd: w = w[:-1] return w
[docs] def lorentz(M, std=1.0, sym=True): r"""Lorentz window (same as Cauchy function). Function skeleton stolen from scipy.signal.gaussian(). The Lorentz function is .. math:: L(x) = \frac{\Gamma}{(x-x_0)^2 + \Gamma^2} Here :math:`x_0 = 0` and `std` = :math:`\Gamma`. Some definitions use :math:`1/2\,\Gamma` instead of :math:`\Gamma`, but without 1/2 we get comparable peak width to Gaussians when using this window in convolutions, thus ``scipy.signal.gaussian(M, std=5)`` is similar to ``lorentz(M, std=5)``. Parameters ---------- M : int number of points std : float spread parameter :math:`\Gamma` sym : bool Returns ------- w : (M,) """ if M < 1: return np.array([]) if M == 1: return np.ones(1,dtype=float) odd = M % 2 if not sym and not odd: M = M+1 n = np.arange(0, M) - (M - 1.0) / 2.0 w = std / (n**2.0 + std**2.0) w /= w.max() if not sym and not odd: w = w[:-1] return w
cauchy = lorentz
[docs] def mirror(arr, axis=0): """Mirror array `arr` at index 0 along `axis`. The length of the returned array is 2*arr.shape[axis]-1 .""" return np.concatenate((arr[::-1],arr[1:]), axis=axis)
# XXX # Check f2py wrapper of _flib.acorr(): The signature is: # In [13]: num._flib.acorr? # Type: fortran # String form: <fortran object> # Docstring: # c = acorr(v,c,method,norm,[nstep]) # # Wrapper for ``acorr``. # # Parameters # ---------- # v : input rank-1 array('d') with bounds (nstep) # c : input rank-1 array('d') with bounds (nstep) # method : input int # norm : input int # # Other Parameters # ---------------- # nstep : input int, optional # Default: len(v) # # Returns # ------- # c : rank-1 array('d') with bounds (nstep) # # We need to pass in a result array 'c' which gets overwritten, but this also # gets returned. Check f2py docs for wrapping such that c generated on the # Fortran side. #
[docs] def acorr(v, method=7, norm=True): """(Normalized) autocorrelation function (ACF) for 1d arrays. Without normalization c(t) = <v(0) v(t)> and with c(t) = <v(0) v(t)> / <v(0)**2> The x-axis is the offset "t" (or "lag" in Digital Signal Processing lit.). Since the ACF is symmetric around t=0, we return only t=0...len(v)-1 . Several Python and Fortran implememtations. The Python versions are mostly for reference and are slow, except for fft-based, which is by far the fastet. Parameters ---------- v : 1d array method : int | 1: Python loops | 2: Python loops, zero-padded | 3: method 1, numpy vectorized | 4: uses numpy.correlate() | 5: Fortran version of 1 | 6: Fortran version of 3 | 7: fft, Wiener-Khinchin Theorem norm : bool normalize or not Returns ------- c : numpy 1d array | c[0] <=> lag = 0 | c[-1] <=> lag = len(v) Notes ----- Generalization of this function to correlation corr(v,w) should be straightforward. Autocorrelation is then corr(v,v). speed: methods 1 ... are loosely ordered slow ... fast methods: All methods, besides the FFT, are "exact", they use variations of loops in the time domain, i.e. norm(acorr(v,1) - acorr(v,6)) = 0.0. The FFT method introduces small numerical noise, norm(acorr(v,1) - acorr(v,4)) = O(1e-16) or so. signature of the Fortran extension _flib.acorr:: acorr - Function signature: c = acorr(v,c,method,[nstep]) Required arguments: v : input rank-1 array('d') with bounds (nstep) c : input rank-1 array('d') with bounds (nstep) method : input int Optional arguments: nstep := len(v) input int Return objects: c : rank-1 array('d') with bounds (nstep) References ---------- .. [1] Numerical Recipes in Fortran, 2nd ed., ch. 13.2 .. [2] http://mathworld.wolfram.com/FourierTransform.html .. [3] http://mathworld.wolfram.com/Cross-CorrelationTheorem.html .. [4] http://mathworld.wolfram.com/Wiener-KhinchinTheorem.html .. [5] http://mathworld.wolfram.com/Autocorrelation.html """ nstep = v.shape[0] c = np.zeros((nstep,), dtype=float) _norm = 1 if norm else 0 if method == 1: for t in range(nstep): for j in range(nstep-t): c[t] += v[j]*v[j+t] elif method == 2: vv = np.concatenate((v, np.zeros((nstep,),dtype=float))) for t in range(nstep): for j in range(nstep): c[t] += v[j]*vv[j+t] elif method == 3: for t in range(nstep): c[t] = (v[:(nstep-t)] * v[t:]).sum() elif method == 4: c = np.correlate(v, v, mode='full')[nstep-1:] elif method == 5: return _flib.acorr(v, c, 1, _norm) elif method == 6: return _flib.acorr(v, c, 2, _norm) elif method == 7: # Correlation via fft. After ifft, the imaginary part is (in theory) = # 0, in practise < 1e-16, so we are safe to return the real part only. vv = np.concatenate((v, np.zeros((nstep,),dtype=float))) c = ifft(np.abs(fft(vv))**2.0)[:nstep].real else: raise ValueError('unknown method: %s' %method) if norm: return c / c[0] else: return c
[docs] def gauss(x, std=1.0, norm=False): """Gaussian function. Parameters ---------- x : 1d array std : float sigma norm : bool Norm such that integrate(gauss(x),x=-inf,inf) = 1, i.e. normalize and return a PDF. Returns ------- array_like(x) """ if norm: return 1.0 / std / np.sqrt(2*np.pi) * np.exp(-x**2.0 / 2.0 / std**2.0) else: return np.exp(-x**2.0 / 2.0 / std**2.0)
[docs] def find_peaks(y, x=None, k=3, spread=2, ymin=None): """Simple peak finding algorithm. Find all peaks where ``y > ymin``. If `x` given, also extract peak maxima positions by fitting a spline of order `k` to each found peak. To find minima, just use ``-y``. Parameters ---------- y : 1d array_like data with peaks x : 1d array_like, optional, len(y) x axis k : int order of spline spread : int Use ``2*spread+1`` points around each peak to fit a spline. Note that we need ``2*spread+1 > k``. ymin : float, optional Find all peaks above that value. Returns ------- idx0, pos0 idx0 : indices of peaks from finite diffs, each peak is at ``x[idx0[i]]`` pos0 : refined `x`-positions of peaks if `x` given, else None Examples -------- >>> from pwtools.signal import gauss, find_peaks >>> from pwtools import num >>> x=linspace(0,10,300); y=0.2*gauss(x-0.5,.1) + gauss(x-2,.1) + 0.7*gauss(x-3,0.1) + gauss(x-6,1) >>> # ymin=0.4: ignore first peak at x=0.5 >>> find_peaks(y,x, ymin=0.4) ([60, 90, 179], [2.000231296097065, 3.0007122565950572, 5.999998055132549]) >>> idx0, pos0=find_peaks(y,x, ymin=0.4) >>> spl=num.Spline(x,y) >>> plot(x,y) >>> for x0 in pos0: ... plot([x0], [spl(x0)], 'ro') """ ymin = y.min() if ymin is None else ymin idx0 = [] dfy = np.diff(y, n=1) for ii in range(len(dfy)-1): if dfy[ii] > 0 and dfy[ii+1] < 0 and y[ii] >= ymin: idx0.append(ii+1) pos0 = None if x is not None: pos0 = [] for i0 in idx0: sl = slice(i0-spread,i0+1+spread,None) xx = x[sl] yy = y[sl] spl = num.Spline(xx,-yy,k=k,s=None) try: root = spl.get_min() pos0.append(root) except ValueError: raise ValueError("error calculating spline maximum at idx=%i, x=%f" %(i0,x[i0])) return idx0, pos0
[docs] def smooth(data, kern, axis=0, edge='m', norm=True): """Smooth N-dim `data` by convolution with a kernel `kern`. Uses scipy.signal.fftconvolve(). Note that due to edge effect handling (padding) and kernal normalization, the convolution identity convolve(data,kern) == convolve(kern,data) doesn't apply here. We always return an array of ``data.shape``. Parameters ---------- data : nd array The data to smooth. Example: 1d (N,) or (N,K,3) for trajectory kern : nd array Convolution kernel. Example: 1d (M,) or (M,1,1) for trajectory along axis=0 (data length N) axis : int Axis along which to do the smoothing. That is actually not needed for the convolution ``fftconvolve(data, kern)`` but is used for padding the data along `axis` to handle edge effects before convolution. edge : str Method for edge effect handling. | 'm' : pad with mirror signal | 'c' : pad with constant values (i.e. ``data[0]`` and | ``data[-1]`` in the 1d case) norm : bool Normalize kernel. Default is True. This assures that the smoothed signal lies within the data. Note that this is not True for kernels with very big spread (i.e. ``hann(N*10)`` or ``gaussian(N/2, std=N*10)``. Then the kernel is effectively a constant. Returns ------- ret : data.shape Convolved signal. Examples -------- >>> from pwtools.signal import welch >>> from numpy.random import rand >>> x = linspace(0,2*pi,500); a=cos(x)+rand(500) >>> plot(a, color='0.7') >>> k=scipy.signal.hann(21) >>> plot(signal.smooth(a,k), 'r', label='hann') >>> k=scipy.signal.gaussian(21, 3) >>> plot(signal.smooth(a,k), 'g', label='gauss') >>> k=welch(21) >>> plot(signal.smooth(a,k), 'y', label='welch') >>> legend() >>> # odd kernel [0,1,0] reproduces data exactly, i.e. convolution with >>> # delta peak >>> figure(); title('smooth with delta [0,1,0]') >>> x=linspace(0,2*pi,15); k=scipy.signal.hann(3) >>> plot(cos(x)) >>> plot(signal.smooth(cos(x),k), 'r') >>> legend() >>> # edge effects with normal convolution >>> figure(); title('edge effects') >>> x=rand(20)+10; k=scipy.signal.hann(11); >>> plot(x); plot(signal.smooth(x,k),label="smooth"); >>> plot(scipy.signal.convolve(x,k/k.sum(),'same'), label='convolve') >>> legend() >>> # edge effect methods >>> figure(); title('edge effect methods') >>> x=rand(20)+10; k=scipy.signal.hann(20); >>> plot(x); plot(signal.smooth(x,k,edge='m'),label="edge='m'"); >>> plot(signal.smooth(x,k,edge='c'),label="edge='c'"); >>> legend() >>> # smooth a trajectory of atomic coordinates >>> figure(); title('trajectory') >>> x = linspace(0,2*pi,500) >>> a = rand(500,2,3) # (nstep, natoms, 3) >>> a[:,0,:] += cos(x)[:,None] >>> a[:,1,:] += sin(x)[:,None] >>> k=scipy.signal.hann(21)[:,None,None] >>> y = signal.smooth(a,k) >>> plot(a[:,0,0], color='0.7'); plot(y[:,0,0],'b', ... label='atom1 x') >>> plot(a[:,1,0], color='0.7'); plot(y[:,1,0],'r', ... label='atom2 x') >>> legend() References ---------- [1] http://wiki.scipy.org/Cookbook/SignalSmooth See Also -------- :func:`welch` :func:`lorentz` Notes ----- Kernels: Even kernels result in shifted signals, odd kernels are better. However, for N >> M, it doesn't make a difference really. Usual kernels (window functions) are created by e.g. ``scipy.signal.hann(M)``. For ``kern=scipy.signal.gaussian(M, std)``, two values are needed, namely `M` and `std`, where `M` determines the number of points calculated for the convolution kernel, as in the other cases. But what is actually important is `std`, which determines the "used width" of the gaussian. Say we use N=100 and M=50. That would be a massively wide window and we would smooth away all details. OTOH, using ``gaussian(50,3)`` would generate a kernel with the same number `M` of data points, but the gauss peak which is effectively used for convolution is much smaller. For ``gaussian()``, `M` should be bigger then `std`. The convolved signal will converge with increasing `M`. Good values are `M=6*std` and bigger. For :func:`lorentz`, much wider kernels are needed such as `M=100*std` b/c of the long tails of the Lorentz function. Testing is mandatory! Edge effects: We use padding of the signal with ``M=len(kern)`` values at both ends such that the convolution with `kern` doesn't zero the `data` at the signal edges. We have two methods. `edge='m'`: padd with the signal mirrored at 0 and -1 or `edge='c'`: use the constant values ``data[0]`` and ``data[-1]``. Many more of these variants may be thought of. The choice of how to extend the data essentially involves an assumption about how the signal *would* continue, which is signal-dependent. In practice, we usually have ``M << N`` (e.g. ``scipy.signal.hann(M)``) or ``std << N`` (``scipy.signal.gaussian(M, std``). Then, both methods are identical in the middle and show only very small differences at the edges. Essentially, edge effect handling shall only ensure that the smoothed signal doesn't go to zero and that must be independent of the method, which is the case. Memory: For big data, fftconvolve() can easily eat up all your memory, for example:: >>> # assume axis=0 is the axis along which to convolve >>> arr = ones((1e5,200,3)) >>> kern = scipy.signal.hann(101) >>> ret = scipy.signal.fftconvolve(arr, kern[:,None,None]) Then it is better to loop over some or all of the remaing dimensions:: >>> ret = np.empty_like(arr) >>> for jj in range(arr.shape[1]): >>> ret[:,jj,:] = smooth(arr[:,jj,:], kern[:,None]) or:: >>> for jj in range(arr.shape[1]): >>> for kk in range(arr.shape[2]): >>> ret[:,jj,kk] = smooth(arr[:,jj,kk], kern) The size of the chunk over which you explicitely loop depends on the data of course. We do exactly this in :func:`pwtools.crys.smooth`. """ # edge = 'm' # ---------- # # Add mirror of the signal left and right to handle edge effects, up to # signal length N on both ends. If M > N then fill padding regions up with # zeros until we have sig = [(M,), (N,), (M,)]. fftconvolve(..., 'valid') # always returns only the signal length where sig and kern overlap # completely. Therefore, data at the far end doesn't influence the edge and # we can safely put zeros (or anything else) there. The returned length is # always N+M+1. # # example (M < N), add M=3 data parts left and right # npad = 3 # data = [1,2,3,4,5,6] # dleft = [4,3,2] # dright = [5,4,3] # sig = [4,3,2,1,2,3,4,5,6,5,4,3] # If M = 8 > N, then: # dleft = [6,5,4,3,2] # dright = [5,4,3,2,1] # sig = [0,0,0,6,5,4,3,2,1,2,3,4,5,6,5,4,3,2,1,0,0,0] # # edge = 'c' # ---------- # The same, but all padded values are the first (left) and last (right) # data value. N = data.shape[axis] M = kern.shape[axis] if edge == 'm': npad = min(M,N) sleft = slice(npad,0,-1) sright = slice(-2,-(npad+2),-1) dleft = num.slicetake(data, sl=sleft, axis=axis) dright = num.slicetake(data, sl=sright, axis=axis) assert dleft.shape == dright.shape K = dleft.shape[axis] if K < M: dleft = pad_zeros(dleft, axis=axis, where='start', nadd=M-K) dright = pad_zeros(dright, axis=axis, where='end', nadd=M-K) elif edge == 'c': sl = [slice(None)]*data.ndim sl[axis] = None tsl = tuple(sl) dleft = np.repeat(num.slicetake(data, sl=0, axis=axis)[tsl], M, axis=axis) dright = np.repeat(num.slicetake(data, sl=-1, axis=axis)[tsl], M, axis=axis) assert dleft.shape == dright.shape # 1d special case: (M,1) -> (M,) if data.ndim == 1 and dleft.ndim == 2 and dleft.shape[1] == 1: dleft = dleft[:,0] dright = dright[:,0] else: raise Exception("unknown value for edge") sig = np.concatenate((dleft, data, dright), axis=axis) kk = kern/float(kern.sum()) if norm else kern ret = fftconvolve(sig, kk, 'valid') assert ret.shape[axis] == N+M+1, "unexpected convolve result shape" del sig if M % 2 == 0: ##sl = slice(M//2+1,-(M//2)) # even kernel, shift result to left sl = slice(M//2,-(M//2)-1) # even kernel, shift result to right else: sl = slice(M//2+1,-(M//2)-1) ret = num.slicetake(ret, sl=sl, axis=axis) assert ret.shape == data.shape, ("ups, ret.shape (%s)!= data.shape (%s)" %(ret.shape, data.shape)) return ret
[docs] def odd(n, add=1): """Return next odd integer to `n`. Can be used to construt odd smoothing kernels in :func:`smooth`. Parameters ---------- n : int add : int number to add if `n` is even, +1 or -1 """ assert add in [-1, 1], "add must be -1 or 1" return n if n % 2 == 1 else n + add
[docs] def scale(x, copy=True): """Scale `x` to unity. Subtract min and divide by (max-min). Parameters ---------- x : array_like copy : bool copy `x` before scaling Returns ------- x_scaled """ xx = x.copy() if copy else x xx = xx - xx.min() xx /= xx.max() return xx
[docs] class FIRFilter: """Build and apply a digital FIR filter (low-, high-, band-pass, band-stop). Uses firwin() and in some cases kaiserord(). Doc strings stolen from scipy.signal. Notes ----- To plot the frequency response (the frequency bands), use:: >>> f = Filter(...) >>> plot(f.w, abs(f.h)) Examples -------- .. literalinclude:: ../../../../examples/filter_example.py References ---------- .. [1] http://www.scipy.org/Cookbook/FIRFilter """
[docs] def __init__(self, cutoff, nyq, ntaps=None, ripple=None, width=None, window='hamming', mode='lowpass'): """ Parameters ---------- cutoff : float or 1D array_like Cutoff frequency of filter (expressed in the same units as `nyq`) OR an array of cutoff frequencies (that is, band edges). In the latter case, the frequencies in `cutoff` should be positive and monotonically increasing between 0 and `nyq`. The values 0 and `nyq` must not be included in `cutoff`. nyq : float Nyquist frequency [Hz]. Each frequency in `cutoff` must be between 0 and `nyq`. ntaps : int Length of the filter (number of coefficients, i.e. the filter order + 1). `ntaps` must be even if a passband includes the Nyquist frequency. Use either `ntaps` or `ripple` + `width` for a Kaiser window. ripple : float Positive number specifying maximum ripple in passband (dB) and minimum ripple in stopband. Large values (like 1000) remove the "rippling" in the pass band almost completely and make frequency response almost "square" (if `width` is small) but also lead to a large number of filter coeffs (ntaps). width : float Width of transition region (same unit as `nyq`, e.g. Hz). window : string or tuple of string and parameter values Desired window to use. See `scipy.signal.get_window` for a list of windows and required parameters. Default is "hamming". Ignored if `width` and `ripple` givem b/c then ``kaiserord`` is used to build a Kaiser window. mode : str 'lowpass', 'highpass', 'bandpass', 'bandstop' """ if ntaps is None: assert [ripple, width] != [None]*2, ("ntaps is None, we need " "ripple and width for a Kaiser window") self.ntaps, self.beta = kaiserord(float(ripple), float(width) / nyq) window = ('kaiser', self.beta) else: self.ntaps = ntaps self.window = window if mode == 'lowpass': pass_zero = True elif mode == 'highpass': pass_zero = False elif mode == 'bandpass': pass_zero = False assert len(cutoff) == 2 elif mode == 'bandstop': pass_zero = True assert len(cutoff) == 2 if N % 2 == 0: N += 1 else: raise Exception('unknown mode') self.taps = firwin(numtaps=self.ntaps, cutoff=cutoff, window=self.window, nyq=nyq, pass_zero=pass_zero, width=width) w,h = freqz(self.taps) self.w = (w/np.pi)*nyq self.h = h
[docs] def __call__(self, x, axis=-1): """Apply filter to signal. Parameters ---------- x : 1d array axis : int """ return lfilter(self.taps, 1.0, x, axis=axis)