Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add test for multi-antennas #122

Merged
merged 5 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ default_language_version:
python: python3.8
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.5.0
rev: v4.6.0
hooks:
- id: check-ast
types: [file, python]
Expand All @@ -18,7 +18,7 @@ repos:
- id: end-of-file-fixer
- id: trailing-whitespace
- repo: https://github.com/asottile/pyupgrade
rev: v3.15.2
rev: v3.16.0
hooks:
- id: pyupgrade
args: ["--py38-plus"]
Expand All @@ -30,12 +30,12 @@ repos:
types: [file, python]
args: ["--profile", "black", "--filter-files", "--gitignore"]
- repo: https://github.com/psf/black
rev: 24.3.0
rev: 24.4.2
hooks:
- id: black
types: [file, python]
- repo: https://github.com/igorshubovych/markdownlint-cli
rev: v0.39.0
rev: v0.41.0
hooks:
- id: markdownlint
types: [file, markdown]
Expand Down
4 changes: 2 additions & 2 deletions scos_actions/actions/acquire_sea_data_product.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ def run(self, iqdata: np.ndarray) -> list:
retrieve the processed results. The order is [FFT, PVT, PFP, APD].
"""
# Filter IQ and place it in the object store
iqdata = ray.put(sosfilt(self.iir_sos, iqdata))
iqdata = ray.put(sosfilt(sos=self.iir_sos, x=iqdata))
# Compute PSD, PVT, PFP, and APD concurrently.
# Do not wait until they finish. Yield references to their results.
yield [worker.run.remote(iqdata) for worker in self.workers]
Expand Down Expand Up @@ -490,7 +490,7 @@ def __init__(self, parameters: dict):
self.iir_sb_edge_Hz,
self.sample_rate_Hz,
)
self.iir_numerators, self.iir_denominators = sos2tf(self.iir_sos)
self.iir_numerators, self.iir_denominators = sos2tf(sos=self.iir_sos)

# Remove IIR parameters which aren't needed after filter generation
for key in [
Expand Down
8 changes: 6 additions & 2 deletions scos_actions/actions/calibrate_y_factor.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,12 @@ def calibrate(self, params: dict):
# Estimate of IIR filter ENBW does NOT account for passband ripple in sensor transfer function!
enbw_hz = self.iir_enbw_hz
logger.debug("Applying IIR filter to IQ captures")
noise_on_data = sosfilt(self.iir_sos, noise_on_measurement_result["data"])
noise_off_data = sosfilt(self.iir_sos, noise_off_measurement_result["data"])
noise_on_data = sosfilt(
sos=self.iir_sos, x=noise_on_measurement_result["data"]
)
noise_off_data = sosfilt(
sos=self.iir_sos, x=noise_off_measurement_result["data"]
)
else:
logger.debug("Skipping IIR filtering")
# Get ENBW from sensor calibration
Expand Down
22 changes: 22 additions & 0 deletions scos_actions/calibration/tests/test_differential_calibration.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,34 @@ def setup_differential_calibration_file(self, tmp_path: Path):
"calibration_reference": "antenna input",
"calibration_data": {3555e6: 11.5},
}
dict_to_json_multiple_antenna = {
"calibration_parameters": ["rf_path", "frequency"],
"calibration_reference": "antenna input",
"calibration_data": {
"antenna1": {3555e6: 11.5},
"antenna2": {3555e6: 21.5},
},
}
self.valid_file_path = tmp_path / "sample_diff_cal.json"
self.valid_file_path_multiple_antenna = (
tmp_path / "sample_diff_cal_multiple_antenna.json"
)
self.invalid_file_path = tmp_path / "sample_diff_cal_invalid.json"

self.sample_diff_cal = DifferentialCalibration(
file_path=self.valid_file_path, **dict_to_json
)
self.sample_diff_cal_multiple_antenna = DifferentialCalibration(
file_path=self.valid_file_path_multiple_antenna,
**dict_to_json_multiple_antenna,
)

with open(self.valid_file_path, "w") as f:
f.write(json.dumps(dict_to_json))

with open(self.valid_file_path_multiple_antenna, "w") as f:
f.write(json.dumps(dict_to_json_multiple_antenna))

dict_to_json.pop("calibration_reference", None)

with open(self.invalid_file_path, "w") as f:
Expand All @@ -35,6 +53,10 @@ def test_from_json(self):
"""Check from_json functionality with valid and invalid dummy data."""
diff_cal = DifferentialCalibration.from_json(self.valid_file_path)
assert diff_cal == self.sample_diff_cal
diff_cal_multiple_antenna = DifferentialCalibration.from_json(
self.valid_file_path_multiple_antenna
)
assert diff_cal_multiple_antenna == self.sample_diff_cal_multiple_antenna
with pytest.raises(Exception):
_ = DifferentialCalibration.from_json(self.invalid_file_path)

Expand Down
4 changes: 2 additions & 2 deletions scos_actions/signal_processing/fft.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def get_fft(
time_data *= fft_window

# Take the FFT
complex_fft = sp_fft(time_data, norm=norm, workers=workers)
complex_fft = sp_fft(x=time_data, norm=norm, workers=workers)

# Shift the frequencies if desired
if shift:
Expand Down Expand Up @@ -129,7 +129,7 @@ def get_fft_window(window_type: str, window_length: int) -> np.ndarray:
window_type = "hann"

# Get window samples
window = get_window(window_type, window_length, fftbins=True)
window = get_window(window=window_type, Nx=window_length, fftbins=True)

# Return the window
return window
Expand Down
40 changes: 28 additions & 12 deletions scos_actions/signal_processing/filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,23 @@ def generate_elliptic_iir_low_pass_filter(
+ f"edge frequency {pb_edge_Hz} Hz."
)
ord, wn = ellipord(
pb_edge_Hz, sb_edge_Hz, gpass_dB, gstop_dB, False, sample_rate_Hz
wp=pb_edge_Hz,
ws=sb_edge_Hz,
gpass=gpass_dB,
gstop=gstop_dB,
analog=False,
fs=sample_rate_Hz,
)
sos = ellip(
N=ord,
rp=gpass_dB,
rs=gstop_dB,
Wn=wn,
btype="lowpass",
analog=False,
output="sos",
fs=sample_rate_Hz,
)
sos = ellip(ord, gpass_dB, gstop_dB, wn, "lowpass", False, "sos", sample_rate_Hz)
return sos


Expand All @@ -62,14 +76,16 @@ def generate_fir_low_pass_filter(
:param sample_rate_Hz: Sampling rate, in Hz.
:return: Coeffiecients of the FIR low pass filter.
"""
ord, beta = kaiserord(attenuation_dB, width_Hz / (0.5 * sample_rate_Hz))
ord, beta = kaiserord(
ripple=attenuation_dB, width=width_Hz / (0.5 * sample_rate_Hz)
)
taps = firwin(
ord + 1,
cutoff_Hz,
width_Hz,
("kaiser", beta),
"lowpass",
True,
numtaps=ord + 1,
cutoff=cutoff_Hz,
width=width_Hz,
window=("kaiser", beta),
pass_zero="lowpass",
scale=True,
fs=sample_rate_Hz,
)
return taps
Expand All @@ -91,7 +107,7 @@ def get_iir_frequency_response(
The second is the array containing the frequency response values, which
are complex values in linear units.
"""
w, h = sosfreqz(sos, worN, whole=True, fs=sample_rate_Hz)
w, h = sosfreqz(sos=sos, worN=worN, whole=True, fs=sample_rate_Hz)
return w, h


Expand All @@ -110,7 +126,7 @@ def get_iir_phase_response(
frequencies, in Hz, for which the phase response was calculated.
The second is the array containing the phase response values, in radians.
"""
w, h = sosfreqz(sos, worN, whole=False, fs=sample_rate_Hz)
w, h = sosfreqz(sos=sos, worN=worN, whole=False, fs=sample_rate_Hz)
angles = np.unwrap(np.angle(h))
return w, angles

Expand Down Expand Up @@ -156,6 +172,6 @@ def is_stable(sos: np.ndarray) -> bool:
:param sos: Second-order sections representation of the IIR filter.
:return: True if the filter is stable, False if not.
"""
_, poles, _ = sos2zpk(sos)
_, poles, _ = sos2zpk(sos=sos)
stable = all([p < 1 for p in np.square(np.abs(poles))])
return stable
12 changes: 6 additions & 6 deletions scos_actions/signal_processing/tests/test_fft.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def test_get_fft():
fft_size = 1024
num_ffts = 5
signal_amplitude = 500
window = get_window("flattop", fft_size, True)
window = get_window(window="flattop", Nx=fft_size, fftbins=True)
window_acf = window_amplitude_correction(window)

# Generated signal is constant: the FFT should be zero in all bins except
Expand Down Expand Up @@ -175,7 +175,7 @@ def test_get_fft_window():
# generated using SciPy
for w_type in supported_window_types:
win = fft.get_fft_window(w_type, 1024)
true_win = get_window(w_type, 1024, True)
true_win = get_window(window=w_type, Nx=1024, fftbins=True)
assert isinstance(win, np.ndarray)
assert win.size == 1024
assert np.array_equal(win, true_win)
Expand All @@ -193,10 +193,10 @@ def test_get_fft_window():
# Check that input formatting works as expected
for bad_w_type, true_w_type in window_alt_format_map.items():
win = fft.get_fft_window(bad_w_type, 1024)
true_win = get_window(true_w_type, 1024, True)
true_win = get_window(window=true_w_type, Nx=1024, fftbins=True)
assert np.array_equal(win, true_win)
with pytest.raises(ValueError, match="Unknown window type."):
_ = get_window(bad_w_type, 1024, True)
_ = get_window(window=bad_w_type, Nx=1024, fftbins=True)


def test_get_fft_window_correction():
Expand All @@ -216,7 +216,7 @@ def test_get_fft_window_correction():

# Function under test should raise a ValueError for invalid correction type
bad_correction_type = ["amp", "e", "both", "other"]
test_good_window = get_window("flattop", 1024, True)
test_good_window = get_window(window="flattop", Nx=1024, fftbins=True)
for ct in bad_correction_type:
with pytest.raises(ValueError, match=f"Invalid window correction type: {ct}"):
_ = fft.get_fft_window_correction(test_good_window, ct)
Expand Down Expand Up @@ -253,7 +253,7 @@ def test_get_fft_enbw():
fft_size = 1024
sample_rate__Hz = 14e6
for w_type in window_types:
window = get_window(w_type, fft_size, True)
window = get_window(window=w_type, Nx=fft_size, fftbins=True)
true_fft_bin_enbw__Hz = fft_bin_enbw(window, sample_rate__Hz)
test_fft_bin_enbw__Hz = fft.get_fft_enbw(window, sample_rate__Hz)
assert isinstance(test_fft_bin_enbw__Hz, float)
Expand Down
31 changes: 24 additions & 7 deletions scos_actions/signal_processing/tests/test_filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,17 @@ def sr(): # Sample rate, Hz

@pytest.fixture
def example_sos(gpass, gstop, pb, sb, sr):
o, w = ellipord(pb, sb, gpass, gstop, False, sr)
return ellip(o, gpass, gstop, w, "lowpass", False, "sos", sr)
o, w = ellipord(wp=pb, ws=sb, gpass=gpass, gstop=gstop, analog=False, fs=sr)
return ellip(
N=o,
rp=gpass,
rs=gstop,
Wn=w,
btype="lowpass",
analog=False,
output="sos",
fs=sr,
)


def test_generate_elliptic_iir_low_pass_filter(example_sos, gpass, gstop, pb, sb, sr):
Expand All @@ -56,8 +65,16 @@ def test_generate_elliptic_iir_low_pass_filter(example_sos, gpass, gstop, pb, sb
def test_generate_fir_low_pass_filter():
# Same approach as above for IIR: basically duplicate the functionality here
att, wid, xoff, sr = 10, 100, 1000, 10e3
o, b = kaiserord(att, wid / (0.5 * sr))
true_taps = firwin(o + 1, xoff, wid, ("kaiser", b), "lowpass", True, fs=sr)
o, b = kaiserord(ripple=att, width=wid / (0.5 * sr))
true_taps = firwin(
numtaps=o + 1,
cutoff=xoff,
width=wid,
window=("kaiser", b),
pass_zero="lowpass",
scale=True,
fs=sr,
)
test_taps = filtering.generate_fir_low_pass_filter(att, wid, xoff, sr)
assert isinstance(test_taps, np.ndarray)
assert test_taps.shape == (o + 1,)
Expand All @@ -66,7 +83,7 @@ def test_generate_fir_low_pass_filter():

def test_get_iir_frequency_response(example_sos, pb, sb, sr):
for worN in [100, np.linspace(pb - 500, sb + 500, 3050)]:
true_w, true_h = sosfreqz(example_sos, worN, True, sr)
true_w, true_h = sosfreqz(sos=example_sos, worN=worN, whole=True, fs=sr)
test_w, test_h = filtering.get_iir_frequency_response(example_sos, worN, sr)
if isinstance(worN, int):
assert all(len(x) == worN for x in [test_w, test_h])
Expand All @@ -78,7 +95,7 @@ def test_get_iir_frequency_response(example_sos, pb, sb, sr):

def test_get_iir_phase_response(example_sos, pb, sb, sr):
for worN in [100, np.linspace(pb - 500, sb + 500, 3050)]:
true_w, h = sosfreqz(example_sos, worN, False, sr)
true_w, h = sosfreqz(sos=example_sos, worN=worN, whole=False, fs=sr)
true_angles = np.unwrap(np.angle(h))
test_w, test_angles = filtering.get_iir_phase_response(example_sos, worN, sr)
if isinstance(worN, int):
Expand All @@ -103,6 +120,6 @@ def test_is_stable(example_sos):
stable_test = filtering.is_stable(example_sos)
assert isinstance(stable_test, bool)
assert stable_test is True
_, poles, _ = sos2zpk(example_sos)
_, poles, _ = sos2zpk(sos=example_sos)
stable_true = all([p < 1 for p in np.square(np.abs(poles))])
assert stable_true == stable_test
Loading