1 #! /usr/bin/env python2
3 from itertools import *
8 class Translate(object):
9 "set up once, consumes an I/Q stream, returns an I/Q stream"
10 def __init__(self, num, den):
11 angles = [(a*tau*num/den) % tau for a in range(den)]
12 fir = [complex(math.cos(a), math.sin(a)) for a in angles]
13 self.looping_fir = cycle(fir)
14 def __call__(self, stream):
15 return numpy.array([s1*s2 for s1,s2 in izip(self.looping_fir, stream)])
17 class Downsample(object):
18 "set up once, consumes an I/Q stream, returns an I/Q stream"
20 def __init__(self, scale):
23 self.window = numpy.hanning(scale * 2)
24 self.window = self.window / sum(self.window)
25 def __call__(self, stream):
26 prev_off = self.offset
27 self.offset = self.scale - ((len(stream) + self.offset) % self.scale)
28 # bad edges, does 60x more math than needed
29 stream2 = numpy.convolve(stream, self.window)
30 return stream2[prev_off::self.scale]
32 class DownsampleFloat(object):
33 # poor quality, but good temporal accuracy
34 # uses triangle window
35 def __init__(self, scale):
38 def __call__(self, stream):
40 # should be using more numpy magic
43 for x in numpy.arange(self.offset, len(stream), scale):
45 window = numpy.concatenate((numpy.arange(1-frac, scale),
46 numpy.arange(int(scale)+frac-1, 0, -1)))
47 window = window / sum(window)
48 start = x - len(window)//2
50 start = min(start, len(stream)-len(window))
51 c = sum(stream[start : start+len(window)] * window)
53 return numpy.array(stream2)
55 class Upsample(object):
56 # use minimal power interpolation?
57 def __init__(self, scale):
60 def __call__(self, stream):
61 xp = range(len(stream))
62 x2 = numpy.arange(self.offset, len(stream), 1.0/self.scale)
63 self.offset = (len(stream) + self.offset) % self.scale
64 reals = numpy.interp(x2, xp, stream.real)
65 imags = numpy.interp(x2, xp, stream.imag)
66 return numpy.array([complex(*ri) for ri in zip(reals, imags)])
68 class Bandpass(object):
69 "set up once, consumes an I/Q stream, returns an I/Q stream"
70 def __init__(self, center_fc, center_bw, pass_fc, pass_bw):
71 # some errors from dropping the fractional parts of the ratio
72 # either optimize tuning, scale up, or use floating point
73 ratio = (center_fc - pass_fc) / center_bw
74 self.translate = Translate(ratio * 1024, 1024)
75 self.downsample = Downsample(int(center_bw/pass_bw))
76 def __call__(self, stream):
77 return self.downsample(self.translate(stream))
79 # check license on matplotlib code
80 # chop out as much slow junk as possible
82 # http://mail.scipy.org/pipermail/numpy-discussion/2003-January/014298.html
83 # http://cleaver.cnx.rice.edu/eggs_directory/obspy.signal/obspy.signal/obspy/signal/freqattributes.py
84 # /usr/lib/python2.7/site-packages/matplotlib/mlab.py
87 "Return x: no detrending"
90 def window_hanning(x):
91 "return x times the hanning window of len(x)"
92 return numpy.hanning(len(x))*x
94 def psd(x, NFFT=256, Fs=2, Fc=0, detrend=detrend_none, window=window_hanning,
95 noverlap=0, pad_to=None, sides='default', scale_by_freq=True):
96 Pxx,freqs = csd(x, x, NFFT, Fs, detrend, window, noverlap, pad_to, sides,
98 return Pxx.real, freqs + Fc
100 def csd(x, y, NFFT=256, Fs=2, detrend=detrend_none, window=window_hanning,
101 noverlap=0, pad_to=None, sides='default', scale_by_freq=True):
102 Pxy, freqs, t = _spectral_helper(x, y, NFFT, Fs, detrend, window,
103 noverlap, pad_to, sides, scale_by_freq)
105 if len(Pxy.shape) == 2 and Pxy.shape[1]>1:
106 Pxy = Pxy.mean(axis=1)
110 #This is a helper function that implements the commonality between the
111 #psd, csd, and spectrogram. It is *NOT* meant to be used outside of mlab
112 def _spectral_helper(x, y, NFFT=256, Fs=2, detrend=detrend_none,
113 window=window_hanning, noverlap=0, pad_to=None, sides='default',
115 #The checks for if y is x are so that we can use the same function to
116 #implement the core of psd(), csd(), and spectrogram() without doing
117 #extra calculations. We return the unaveraged Pxy, freqs, and t.
120 #Make sure we're dealing with a numpy array. If y and x were the same
121 #object to start with, keep them that way
128 # zero pad x and y up to NFFT if they are shorter than NFFT
131 x = numpy.resize(x, (NFFT,))
134 if not same_data and len(y)<NFFT:
136 y = numpy.resize(y, (NFFT,))
142 # For real x, ignore the negative frequencies unless told otherwise
143 if (sides == 'default' and numpy.iscomplexobj(x)) or sides == 'twosided':
146 elif sides in ('default', 'onesided'):
147 numFreqs = pad_to//2 + 1
150 raise ValueError("sides must be one of: 'default', 'onesided', or "
153 #if cbook.iterable(window):
154 if type(window) != type(lambda:0):
155 assert(len(window) == NFFT)
158 windowVals = window(numpy.ones((NFFT,), x.dtype))
160 step = NFFT - noverlap
161 ind = numpy.arange(0, len(x) - NFFT + 1, step)
163 Pxy = numpy.zeros((numFreqs, n), numpy.complex_)
165 # do the ffts of the slices
167 thisX = x[ind[i]:ind[i]+NFFT]
168 thisX = windowVals * detrend(thisX)
169 fx = numpy.fft.fft(thisX, n=pad_to)
174 thisY = y[ind[i]:ind[i]+NFFT]
175 thisY = windowVals * detrend(thisY)
176 fy = numpy.fft.fft(thisY, n=pad_to)
177 Pxy[:,i] = numpy.conjugate(fx[:numFreqs]) * fy[:numFreqs]
179 # Scale the spectrum by the norm of the window to compensate for
180 # windowing loss; see Bendat & Piersol Sec 11.5.2.
181 Pxy /= (numpy.abs(windowVals)**2).sum()
183 # Also include scaling factors for one-sided densities and dividing by the
184 # sampling frequency, if desired. Scale everything, except the DC component
185 # and the NFFT/2 component:
186 Pxy[1:-1] *= scaling_factor
188 # MATLAB divides by the sampling frequency so that density function
189 # has units of dB/Hz and can be integrated by the plotted frequency
190 # values. Perform the same scaling here.
194 t = 1./Fs * (ind + NFFT / 2.)
195 freqs = float(Fs) / pad_to * numpy.arange(numFreqs)
197 if (numpy.iscomplexobj(x) and sides == 'default') or sides == 'twosided':
198 # center the frequency range at zero
199 freqs = numpy.concatenate((freqs[numFreqs//2:] - Fs, freqs[:numFreqs//2]))
200 Pxy = numpy.concatenate((Pxy[numFreqs//2:, :], Pxy[:numFreqs//2, :]), 0)