diff --git a/scos_actions/actions/acquire_sea_data_product.py b/scos_actions/actions/acquire_sea_data_product.py index d7b54c99..2b8e0af9 100644 --- a/scos_actions/actions/acquire_sea_data_product.py +++ b/scos_actions/actions/acquire_sea_data_product.py @@ -434,7 +434,7 @@ def run(self, iqdata: np.ndarray) -> list: retrieve the processed results. The order is [FFT, PVT, PFP, APD]. """ # Filter IQ and place it in the object store - iqdata = ray.put(sosfilt(self.iir_sos, iqdata)) + iqdata = ray.put(sosfilt(sos=self.iir_sos, x=iqdata)) # Compute PSD, PVT, PFP, and APD concurrently. # Do not wait until they finish. Yield references to their results. yield [worker.run.remote(iqdata) for worker in self.workers] @@ -490,7 +490,7 @@ def __init__(self, parameters: dict): self.iir_sb_edge_Hz, self.sample_rate_Hz, ) - self.iir_numerators, self.iir_denominators = sos2tf(self.iir_sos) + self.iir_numerators, self.iir_denominators = sos2tf(sos=self.iir_sos) # Remove IIR parameters which aren't needed after filter generation for key in [ diff --git a/scos_actions/actions/calibrate_y_factor.py b/scos_actions/actions/calibrate_y_factor.py index 636596f0..c8ca666b 100644 --- a/scos_actions/actions/calibrate_y_factor.py +++ b/scos_actions/actions/calibrate_y_factor.py @@ -307,8 +307,12 @@ def calibrate(self, params: dict): # Estimate of IIR filter ENBW does NOT account for passband ripple in sensor transfer function! enbw_hz = self.iir_enbw_hz logger.debug("Applying IIR filter to IQ captures") - noise_on_data = sosfilt(self.iir_sos, noise_on_measurement_result["data"]) - noise_off_data = sosfilt(self.iir_sos, noise_off_measurement_result["data"]) + noise_on_data = sosfilt( + sos=self.iir_sos, x=noise_on_measurement_result["data"] + ) + noise_off_data = sosfilt( + sos=self.iir_sos, x=noise_off_measurement_result["data"] + ) else: logger.debug("Skipping IIR filtering") # Get ENBW from sensor calibration diff --git a/scos_actions/signal_processing/fft.py b/scos_actions/signal_processing/fft.py index e61da7cb..4f563a1b 100644 --- a/scos_actions/signal_processing/fft.py +++ b/scos_actions/signal_processing/fft.py @@ -97,7 +97,7 @@ def get_fft( time_data *= fft_window # Take the FFT - complex_fft = sp_fft(time_data, norm=norm, workers=workers) + complex_fft = sp_fft(x=time_data, norm=norm, workers=workers) # Shift the frequencies if desired if shift: @@ -129,7 +129,7 @@ def get_fft_window(window_type: str, window_length: int) -> np.ndarray: window_type = "hann" # Get window samples - window = get_window(window_type, window_length, fftbins=True) + window = get_window(window=window_type, Nx=window_length, fftbins=True) # Return the window return window diff --git a/scos_actions/signal_processing/filtering.py b/scos_actions/signal_processing/filtering.py index 75f60698..ae6bb527 100644 --- a/scos_actions/signal_processing/filtering.py +++ b/scos_actions/signal_processing/filtering.py @@ -43,7 +43,16 @@ def generate_elliptic_iir_low_pass_filter( analog=False, fs=sample_rate_Hz, ) - sos = ellip(ord, gpass_dB, gstop_dB, wn, "lowpass", False, "sos", sample_rate_Hz) + sos = ellip( + N=ord, + rp=gpass_dB, + rs=gstop_dB, + Wn=wn, + btype="lowpass", + analog=False, + output="sos", + fs=sample_rate_Hz, + ) return sos diff --git a/scos_actions/signal_processing/tests/test_fft.py b/scos_actions/signal_processing/tests/test_fft.py index 1ea7458c..00997f9e 100644 --- a/scos_actions/signal_processing/tests/test_fft.py +++ b/scos_actions/signal_processing/tests/test_fft.py @@ -55,7 +55,7 @@ def test_get_fft(): fft_size = 1024 num_ffts = 5 signal_amplitude = 500 - window = get_window("flattop", fft_size, True) + window = get_window(window="flattop", Nx=fft_size, fftbins=True) window_acf = window_amplitude_correction(window) # Generated signal is constant: the FFT should be zero in all bins except @@ -175,7 +175,7 @@ def test_get_fft_window(): # generated using SciPy for w_type in supported_window_types: win = fft.get_fft_window(w_type, 1024) - true_win = get_window(w_type, 1024, True) + true_win = get_window(window=w_type, Nx=1024, fftbins=True) assert isinstance(win, np.ndarray) assert win.size == 1024 assert np.array_equal(win, true_win) @@ -193,10 +193,10 @@ def test_get_fft_window(): # Check that input formatting works as expected for bad_w_type, true_w_type in window_alt_format_map.items(): win = fft.get_fft_window(bad_w_type, 1024) - true_win = get_window(true_w_type, 1024, True) + true_win = get_window(window=true_w_type, Nx=1024, fftbins=True) assert np.array_equal(win, true_win) with pytest.raises(ValueError, match="Unknown window type."): - _ = get_window(bad_w_type, 1024, True) + _ = get_window(window=bad_w_type, Nx=1024, fftbins=True) def test_get_fft_window_correction(): @@ -216,7 +216,7 @@ def test_get_fft_window_correction(): # Function under test should raise a ValueError for invalid correction type bad_correction_type = ["amp", "e", "both", "other"] - test_good_window = get_window("flattop", 1024, True) + test_good_window = get_window(window="flattop", Nx=1024, fftbins=True) for ct in bad_correction_type: with pytest.raises(ValueError, match=f"Invalid window correction type: {ct}"): _ = fft.get_fft_window_correction(test_good_window, ct) @@ -253,7 +253,7 @@ def test_get_fft_enbw(): fft_size = 1024 sample_rate__Hz = 14e6 for w_type in window_types: - window = get_window(w_type, fft_size, True) + window = get_window(window=w_type, Nx=fft_size, fftbins=True) true_fft_bin_enbw__Hz = fft_bin_enbw(window, sample_rate__Hz) test_fft_bin_enbw__Hz = fft.get_fft_enbw(window, sample_rate__Hz) assert isinstance(test_fft_bin_enbw__Hz, float) diff --git a/scos_actions/signal_processing/tests/test_filtering.py b/scos_actions/signal_processing/tests/test_filtering.py index 0df7b845..9592c7c7 100644 --- a/scos_actions/signal_processing/tests/test_filtering.py +++ b/scos_actions/signal_processing/tests/test_filtering.py @@ -40,8 +40,17 @@ def sr(): # Sample rate, Hz @pytest.fixture def example_sos(gpass, gstop, pb, sb, sr): - o, w = ellipord(pb, sb, gpass, gstop, False, sr) - return ellip(o, gpass, gstop, w, "lowpass", False, "sos", sr) + o, w = ellipord(wp=pb, ws=sb, gpass=gpass, gstop=gstop, analog=False, fs=sr) + return ellip( + N=o, + rp=gpass, + rs=gstop, + Wn=w, + btype="lowpass", + analog=False, + output="sos", + fs=sr, + ) def test_generate_elliptic_iir_low_pass_filter(example_sos, gpass, gstop, pb, sb, sr): @@ -56,8 +65,16 @@ def test_generate_elliptic_iir_low_pass_filter(example_sos, gpass, gstop, pb, sb def test_generate_fir_low_pass_filter(): # Same approach as above for IIR: basically duplicate the functionality here att, wid, xoff, sr = 10, 100, 1000, 10e3 - o, b = kaiserord(att, wid / (0.5 * sr)) - true_taps = firwin(o + 1, xoff, wid, ("kaiser", b), "lowpass", True, fs=sr) + o, b = kaiserord(ripple=att, width=wid / (0.5 * sr)) + true_taps = firwin( + numtaps=o + 1, + cutoff=xoff, + width=wid, + window=("kaiser", b), + pass_zero="lowpass", + scale=True, + fs=sr, + ) test_taps = filtering.generate_fir_low_pass_filter(att, wid, xoff, sr) assert isinstance(test_taps, np.ndarray) assert test_taps.shape == (o + 1,) @@ -66,7 +83,7 @@ def test_generate_fir_low_pass_filter(): def test_get_iir_frequency_response(example_sos, pb, sb, sr): for worN in [100, np.linspace(pb - 500, sb + 500, 3050)]: - true_w, true_h = sosfreqz(example_sos, worN, True, sr) + true_w, true_h = sosfreqz(sos=example_sos, worN=worN, whole=True, fs=sr) test_w, test_h = filtering.get_iir_frequency_response(example_sos, worN, sr) if isinstance(worN, int): assert all(len(x) == worN for x in [test_w, test_h]) @@ -78,7 +95,7 @@ def test_get_iir_frequency_response(example_sos, pb, sb, sr): def test_get_iir_phase_response(example_sos, pb, sb, sr): for worN in [100, np.linspace(pb - 500, sb + 500, 3050)]: - true_w, h = sosfreqz(example_sos, worN, False, sr) + true_w, h = sosfreqz(sos=example_sos, worN=worN, whole=False, fs=sr) true_angles = np.unwrap(np.angle(h)) test_w, test_angles = filtering.get_iir_phase_response(example_sos, worN, sr) if isinstance(worN, int): @@ -103,6 +120,6 @@ def test_is_stable(example_sos): stable_test = filtering.is_stable(example_sos) assert isinstance(stable_test, bool) assert stable_test is True - _, poles, _ = sos2zpk(example_sos) + _, poles, _ = sos2zpk(sos=example_sos) stable_true = all([p < 1 for p in np.square(np.abs(poles))]) assert stable_true == stable_test