diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index f6d8b0db..90d2f9e4 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -2,7 +2,7 @@ default_language_version: python: python3.8 repos: - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.5.0 + rev: v4.6.0 hooks: - id: check-ast types: [file, python] @@ -18,7 +18,7 @@ repos: - id: end-of-file-fixer - id: trailing-whitespace - repo: https://github.com/asottile/pyupgrade - rev: v3.15.2 + rev: v3.16.0 hooks: - id: pyupgrade args: ["--py38-plus"] @@ -30,12 +30,12 @@ repos: types: [file, python] args: ["--profile", "black", "--filter-files", "--gitignore"] - repo: https://github.com/psf/black - rev: 24.3.0 + rev: 24.4.2 hooks: - id: black types: [file, python] - repo: https://github.com/igorshubovych/markdownlint-cli - rev: v0.39.0 + rev: v0.41.0 hooks: - id: markdownlint types: [file, markdown] diff --git a/scos_actions/actions/acquire_sea_data_product.py b/scos_actions/actions/acquire_sea_data_product.py index d7b54c99..2b8e0af9 100644 --- a/scos_actions/actions/acquire_sea_data_product.py +++ b/scos_actions/actions/acquire_sea_data_product.py @@ -434,7 +434,7 @@ def run(self, iqdata: np.ndarray) -> list: retrieve the processed results. The order is [FFT, PVT, PFP, APD]. """ # Filter IQ and place it in the object store - iqdata = ray.put(sosfilt(self.iir_sos, iqdata)) + iqdata = ray.put(sosfilt(sos=self.iir_sos, x=iqdata)) # Compute PSD, PVT, PFP, and APD concurrently. # Do not wait until they finish. Yield references to their results. yield [worker.run.remote(iqdata) for worker in self.workers] @@ -490,7 +490,7 @@ def __init__(self, parameters: dict): self.iir_sb_edge_Hz, self.sample_rate_Hz, ) - self.iir_numerators, self.iir_denominators = sos2tf(self.iir_sos) + self.iir_numerators, self.iir_denominators = sos2tf(sos=self.iir_sos) # Remove IIR parameters which aren't needed after filter generation for key in [ diff --git a/scos_actions/actions/calibrate_y_factor.py b/scos_actions/actions/calibrate_y_factor.py index 636596f0..c8ca666b 100644 --- a/scos_actions/actions/calibrate_y_factor.py +++ b/scos_actions/actions/calibrate_y_factor.py @@ -307,8 +307,12 @@ def calibrate(self, params: dict): # Estimate of IIR filter ENBW does NOT account for passband ripple in sensor transfer function! enbw_hz = self.iir_enbw_hz logger.debug("Applying IIR filter to IQ captures") - noise_on_data = sosfilt(self.iir_sos, noise_on_measurement_result["data"]) - noise_off_data = sosfilt(self.iir_sos, noise_off_measurement_result["data"]) + noise_on_data = sosfilt( + sos=self.iir_sos, x=noise_on_measurement_result["data"] + ) + noise_off_data = sosfilt( + sos=self.iir_sos, x=noise_off_measurement_result["data"] + ) else: logger.debug("Skipping IIR filtering") # Get ENBW from sensor calibration diff --git a/scos_actions/calibration/tests/test_differential_calibration.py b/scos_actions/calibration/tests/test_differential_calibration.py index 5c9c80ca..dd53f7f4 100644 --- a/scos_actions/calibration/tests/test_differential_calibration.py +++ b/scos_actions/calibration/tests/test_differential_calibration.py @@ -16,16 +16,34 @@ def setup_differential_calibration_file(self, tmp_path: Path): "calibration_reference": "antenna input", "calibration_data": {3555e6: 11.5}, } + dict_to_json_multiple_antenna = { + "calibration_parameters": ["rf_path", "frequency"], + "calibration_reference": "antenna input", + "calibration_data": { + "antenna1": {3555e6: 11.5}, + "antenna2": {3555e6: 21.5}, + }, + } self.valid_file_path = tmp_path / "sample_diff_cal.json" + self.valid_file_path_multiple_antenna = ( + tmp_path / "sample_diff_cal_multiple_antenna.json" + ) self.invalid_file_path = tmp_path / "sample_diff_cal_invalid.json" self.sample_diff_cal = DifferentialCalibration( file_path=self.valid_file_path, **dict_to_json ) + self.sample_diff_cal_multiple_antenna = DifferentialCalibration( + file_path=self.valid_file_path_multiple_antenna, + **dict_to_json_multiple_antenna, + ) with open(self.valid_file_path, "w") as f: f.write(json.dumps(dict_to_json)) + with open(self.valid_file_path_multiple_antenna, "w") as f: + f.write(json.dumps(dict_to_json_multiple_antenna)) + dict_to_json.pop("calibration_reference", None) with open(self.invalid_file_path, "w") as f: @@ -35,6 +53,10 @@ def test_from_json(self): """Check from_json functionality with valid and invalid dummy data.""" diff_cal = DifferentialCalibration.from_json(self.valid_file_path) assert diff_cal == self.sample_diff_cal + diff_cal_multiple_antenna = DifferentialCalibration.from_json( + self.valid_file_path_multiple_antenna + ) + assert diff_cal_multiple_antenna == self.sample_diff_cal_multiple_antenna with pytest.raises(Exception): _ = DifferentialCalibration.from_json(self.invalid_file_path) diff --git a/scos_actions/signal_processing/fft.py b/scos_actions/signal_processing/fft.py index e61da7cb..4f563a1b 100644 --- a/scos_actions/signal_processing/fft.py +++ b/scos_actions/signal_processing/fft.py @@ -97,7 +97,7 @@ def get_fft( time_data *= fft_window # Take the FFT - complex_fft = sp_fft(time_data, norm=norm, workers=workers) + complex_fft = sp_fft(x=time_data, norm=norm, workers=workers) # Shift the frequencies if desired if shift: @@ -129,7 +129,7 @@ def get_fft_window(window_type: str, window_length: int) -> np.ndarray: window_type = "hann" # Get window samples - window = get_window(window_type, window_length, fftbins=True) + window = get_window(window=window_type, Nx=window_length, fftbins=True) # Return the window return window diff --git a/scos_actions/signal_processing/filtering.py b/scos_actions/signal_processing/filtering.py index 25048219..ae6bb527 100644 --- a/scos_actions/signal_processing/filtering.py +++ b/scos_actions/signal_processing/filtering.py @@ -36,9 +36,23 @@ def generate_elliptic_iir_low_pass_filter( + f"edge frequency {pb_edge_Hz} Hz." ) ord, wn = ellipord( - pb_edge_Hz, sb_edge_Hz, gpass_dB, gstop_dB, False, sample_rate_Hz + wp=pb_edge_Hz, + ws=sb_edge_Hz, + gpass=gpass_dB, + gstop=gstop_dB, + analog=False, + fs=sample_rate_Hz, + ) + sos = ellip( + N=ord, + rp=gpass_dB, + rs=gstop_dB, + Wn=wn, + btype="lowpass", + analog=False, + output="sos", + fs=sample_rate_Hz, ) - sos = ellip(ord, gpass_dB, gstop_dB, wn, "lowpass", False, "sos", sample_rate_Hz) return sos @@ -62,14 +76,16 @@ def generate_fir_low_pass_filter( :param sample_rate_Hz: Sampling rate, in Hz. :return: Coeffiecients of the FIR low pass filter. """ - ord, beta = kaiserord(attenuation_dB, width_Hz / (0.5 * sample_rate_Hz)) + ord, beta = kaiserord( + ripple=attenuation_dB, width=width_Hz / (0.5 * sample_rate_Hz) + ) taps = firwin( - ord + 1, - cutoff_Hz, - width_Hz, - ("kaiser", beta), - "lowpass", - True, + numtaps=ord + 1, + cutoff=cutoff_Hz, + width=width_Hz, + window=("kaiser", beta), + pass_zero="lowpass", + scale=True, fs=sample_rate_Hz, ) return taps @@ -91,7 +107,7 @@ def get_iir_frequency_response( The second is the array containing the frequency response values, which are complex values in linear units. """ - w, h = sosfreqz(sos, worN, whole=True, fs=sample_rate_Hz) + w, h = sosfreqz(sos=sos, worN=worN, whole=True, fs=sample_rate_Hz) return w, h @@ -110,7 +126,7 @@ def get_iir_phase_response( frequencies, in Hz, for which the phase response was calculated. The second is the array containing the phase response values, in radians. """ - w, h = sosfreqz(sos, worN, whole=False, fs=sample_rate_Hz) + w, h = sosfreqz(sos=sos, worN=worN, whole=False, fs=sample_rate_Hz) angles = np.unwrap(np.angle(h)) return w, angles @@ -156,6 +172,6 @@ def is_stable(sos: np.ndarray) -> bool: :param sos: Second-order sections representation of the IIR filter. :return: True if the filter is stable, False if not. """ - _, poles, _ = sos2zpk(sos) + _, poles, _ = sos2zpk(sos=sos) stable = all([p < 1 for p in np.square(np.abs(poles))]) return stable diff --git a/scos_actions/signal_processing/tests/test_fft.py b/scos_actions/signal_processing/tests/test_fft.py index 1ea7458c..00997f9e 100644 --- a/scos_actions/signal_processing/tests/test_fft.py +++ b/scos_actions/signal_processing/tests/test_fft.py @@ -55,7 +55,7 @@ def test_get_fft(): fft_size = 1024 num_ffts = 5 signal_amplitude = 500 - window = get_window("flattop", fft_size, True) + window = get_window(window="flattop", Nx=fft_size, fftbins=True) window_acf = window_amplitude_correction(window) # Generated signal is constant: the FFT should be zero in all bins except @@ -175,7 +175,7 @@ def test_get_fft_window(): # generated using SciPy for w_type in supported_window_types: win = fft.get_fft_window(w_type, 1024) - true_win = get_window(w_type, 1024, True) + true_win = get_window(window=w_type, Nx=1024, fftbins=True) assert isinstance(win, np.ndarray) assert win.size == 1024 assert np.array_equal(win, true_win) @@ -193,10 +193,10 @@ def test_get_fft_window(): # Check that input formatting works as expected for bad_w_type, true_w_type in window_alt_format_map.items(): win = fft.get_fft_window(bad_w_type, 1024) - true_win = get_window(true_w_type, 1024, True) + true_win = get_window(window=true_w_type, Nx=1024, fftbins=True) assert np.array_equal(win, true_win) with pytest.raises(ValueError, match="Unknown window type."): - _ = get_window(bad_w_type, 1024, True) + _ = get_window(window=bad_w_type, Nx=1024, fftbins=True) def test_get_fft_window_correction(): @@ -216,7 +216,7 @@ def test_get_fft_window_correction(): # Function under test should raise a ValueError for invalid correction type bad_correction_type = ["amp", "e", "both", "other"] - test_good_window = get_window("flattop", 1024, True) + test_good_window = get_window(window="flattop", Nx=1024, fftbins=True) for ct in bad_correction_type: with pytest.raises(ValueError, match=f"Invalid window correction type: {ct}"): _ = fft.get_fft_window_correction(test_good_window, ct) @@ -253,7 +253,7 @@ def test_get_fft_enbw(): fft_size = 1024 sample_rate__Hz = 14e6 for w_type in window_types: - window = get_window(w_type, fft_size, True) + window = get_window(window=w_type, Nx=fft_size, fftbins=True) true_fft_bin_enbw__Hz = fft_bin_enbw(window, sample_rate__Hz) test_fft_bin_enbw__Hz = fft.get_fft_enbw(window, sample_rate__Hz) assert isinstance(test_fft_bin_enbw__Hz, float) diff --git a/scos_actions/signal_processing/tests/test_filtering.py b/scos_actions/signal_processing/tests/test_filtering.py index 0df7b845..9592c7c7 100644 --- a/scos_actions/signal_processing/tests/test_filtering.py +++ b/scos_actions/signal_processing/tests/test_filtering.py @@ -40,8 +40,17 @@ def sr(): # Sample rate, Hz @pytest.fixture def example_sos(gpass, gstop, pb, sb, sr): - o, w = ellipord(pb, sb, gpass, gstop, False, sr) - return ellip(o, gpass, gstop, w, "lowpass", False, "sos", sr) + o, w = ellipord(wp=pb, ws=sb, gpass=gpass, gstop=gstop, analog=False, fs=sr) + return ellip( + N=o, + rp=gpass, + rs=gstop, + Wn=w, + btype="lowpass", + analog=False, + output="sos", + fs=sr, + ) def test_generate_elliptic_iir_low_pass_filter(example_sos, gpass, gstop, pb, sb, sr): @@ -56,8 +65,16 @@ def test_generate_elliptic_iir_low_pass_filter(example_sos, gpass, gstop, pb, sb def test_generate_fir_low_pass_filter(): # Same approach as above for IIR: basically duplicate the functionality here att, wid, xoff, sr = 10, 100, 1000, 10e3 - o, b = kaiserord(att, wid / (0.5 * sr)) - true_taps = firwin(o + 1, xoff, wid, ("kaiser", b), "lowpass", True, fs=sr) + o, b = kaiserord(ripple=att, width=wid / (0.5 * sr)) + true_taps = firwin( + numtaps=o + 1, + cutoff=xoff, + width=wid, + window=("kaiser", b), + pass_zero="lowpass", + scale=True, + fs=sr, + ) test_taps = filtering.generate_fir_low_pass_filter(att, wid, xoff, sr) assert isinstance(test_taps, np.ndarray) assert test_taps.shape == (o + 1,) @@ -66,7 +83,7 @@ def test_generate_fir_low_pass_filter(): def test_get_iir_frequency_response(example_sos, pb, sb, sr): for worN in [100, np.linspace(pb - 500, sb + 500, 3050)]: - true_w, true_h = sosfreqz(example_sos, worN, True, sr) + true_w, true_h = sosfreqz(sos=example_sos, worN=worN, whole=True, fs=sr) test_w, test_h = filtering.get_iir_frequency_response(example_sos, worN, sr) if isinstance(worN, int): assert all(len(x) == worN for x in [test_w, test_h]) @@ -78,7 +95,7 @@ def test_get_iir_frequency_response(example_sos, pb, sb, sr): def test_get_iir_phase_response(example_sos, pb, sb, sr): for worN in [100, np.linspace(pb - 500, sb + 500, 3050)]: - true_w, h = sosfreqz(example_sos, worN, False, sr) + true_w, h = sosfreqz(sos=example_sos, worN=worN, whole=False, fs=sr) true_angles = np.unwrap(np.angle(h)) test_w, test_angles = filtering.get_iir_phase_response(example_sos, worN, sr) if isinstance(worN, int): @@ -103,6 +120,6 @@ def test_is_stable(example_sos): stable_test = filtering.is_stable(example_sos) assert isinstance(stable_test, bool) assert stable_test is True - _, poles, _ = sos2zpk(example_sos) + _, poles, _ = sos2zpk(sos=example_sos) stable_true = all([p < 1 for p in np.square(np.abs(poles))]) assert stable_true == stable_test