diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 9618961d..12ef9bfc 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -18,7 +18,7 @@ repos: - id: end-of-file-fixer - id: trailing-whitespace - repo: https://github.com/asottile/pyupgrade - rev: v3.15.0 + rev: v3.15.1 hooks: - id: pyupgrade args: ["--py38-plus"] @@ -30,12 +30,12 @@ repos: types: [file, python] args: ["--profile", "black", "--filter-files", "--gitignore"] - repo: https://github.com/psf/black - rev: 23.12.1 + rev: 24.2.0 hooks: - id: black types: [file, python] - repo: https://github.com/igorshubovych/markdownlint-cli - rev: v0.38.0 + rev: v0.39.0 hooks: - id: markdownlint types: [file, markdown] diff --git a/sample_debug.py b/sample_debug.py index cc04ae29..3cf415fe 100644 --- a/sample_debug.py +++ b/sample_debug.py @@ -2,6 +2,7 @@ This is a sample file showing how an action be created and called for debugging purposes using a mock signal analyzer. """ + import json from scos_actions.actions.acquire_single_freq_fft import SingleFrequencyFftAcquisition diff --git a/scos_actions/__init__.py b/scos_actions/__init__.py index 9e17e009..6dcf770e 100644 --- a/scos_actions/__init__.py +++ b/scos_actions/__init__.py @@ -1 +1 @@ -__version__ = "8.0.1" +__version__ = "9.0.0" diff --git a/scos_actions/actions/acquire_sea_data_product.py b/scos_actions/actions/acquire_sea_data_product.py index 20a684da..bd2a8d82 100644 --- a/scos_actions/actions/acquire_sea_data_product.py +++ b/scos_actions/actions/acquire_sea_data_product.py @@ -23,6 +23,7 @@ import logging import lzma import platform +import ray import sys from enum import EnumMeta from time import perf_counter @@ -30,7 +31,6 @@ import numpy as np import psutil -import ray from environs import Env from its_preselector import __version__ as PRESELECTOR_API_VERSION from scipy.signal import sos2tf, sosfilt @@ -109,7 +109,6 @@ FFT_WINDOW = get_fft_window(FFT_WINDOW_TYPE, FFT_SIZE) FFT_WINDOW_ECF = get_fft_window_correction(FFT_WINDOW, "energy") IMPEDANCE_OHMS = 50.0 -DATA_REFERENCE_POINT = "noise source output" NUM_ACTORS = 3 # Number of ray actors to initialize # Create power detectors @@ -422,7 +421,7 @@ def __init__(self, params: dict, iir_sos: np.ndarray): ] del params - def run(self, iqdata: np.ndarray) -> list: + def run(self, iqdata: np.ndarray): """ Filter the input IQ data and concurrently compute FFT, PVT, PFP, and APD results. @@ -449,6 +448,7 @@ class NasctnSeaDataProduct(Action): def __init__(self, parameters: dict): super().__init__(parameters) # Assume preselector is present + self.total_channel_data_length = None rf_path_name = utils.get_parameter(RF_PATH, self.parameters) self.rf_path = {self.PRESELECTOR_PATH_KEY: rf_path_name} @@ -506,6 +506,7 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): action_start_tic = perf_counter() # Ray should have already been initialized within scos-sensor, # but check and initialize just in case. + if not ray.is_initialized(): logger.info("Initializing ray.") logger.info("Set RAY_INIT=true to avoid initializing within " + __name__) @@ -525,8 +526,6 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): self.iteration_params, ) self.create_global_sensor_metadata(self.sensor) - self.create_global_data_product_metadata() - # Initialize remote supervisor actors for IQ processing tic = perf_counter() # This uses iteration_params[0] because @@ -538,10 +537,15 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): logger.debug(f"Spawned {NUM_ACTORS} supervisor actors in {toc-tic:.2f} s") # Collect all IQ data and spawn data product computation processes - dp_procs, cpu_speed = [], [] + dp_procs, cpu_speed, reference_points = [], [], [] capture_tic = perf_counter() + for i, parameters in enumerate(self.iteration_params): measurement_result = self.capture_iq(parameters) + if i == 0: + self.create_global_data_product_metadata( + measurement_result["reference"] + ) # Start data product processing but do not block next IQ capture tic = perf_counter() @@ -552,16 +556,22 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): toc = perf_counter() logger.debug(f"IQ data delivered for processing in {toc-tic:.2f} s") # Create capture segment with channel-specific metadata before sigan is reconfigured - tic = perf_counter() self.create_capture_segment(i, measurement_result) - toc = perf_counter() - logger.debug(f"Created capture metadata in {toc-tic:.2f} s") + # Query CPU speed for later averaging in diagnostics metadata cpu_speed.append(get_current_cpu_clock_speed()) + # Append list of data reference points; later we require these to be identical + reference_points.append(measurement_result["reference"]) capture_toc = perf_counter() logger.debug( f"Collected all IQ data and started all processing in {capture_toc-capture_tic:.2f} s" ) + # Create data product metadata: requires all data reference points + # to be identical. + assert ( + len(set(reference_points)) == 1 + ), "Channel data were scaled to different reference points. Cannot build metadata." + # Collect processed data product results all_data, max_max_ch_pwrs, med_mean_ch_pwrs, mean_ch_pwrs, median_ch_pwrs = ( [], @@ -630,14 +640,11 @@ def capture_iq(self, params: dict) -> dict: nskip = utils.get_parameter(NUM_SKIP, params) num_samples = int(params[SAMPLE_RATE] * duration_ms * 1e-3) # Collect IQ data - measurement_result = self.sensor.signal_analyzer.acquire_time_domain_samples( - num_samples, nskip + measurement_result = self.sensor.acquire_time_domain_samples( + num_samples, nskip, cal_params=params ) # Store some metadata with the IQ measurement_result.update(params) - measurement_result[ - "sensor_cal" - ] = self.sensor.signal_analyzer.sensor_calibration_data toc = perf_counter() logger.debug( f"IQ Capture ({duration_ms} ms @ {(params[FREQUENCY]/1e6):.1f} MHz) completed in {toc-tic:.2f} s." @@ -978,7 +985,7 @@ def test_required_components(self): trigger_api_restart.send(sender=self.__class__) return None - def create_global_data_product_metadata(self) -> None: + def create_global_data_product_metadata(self, data_products_reference: str) -> None: p = self.parameters num_iq_samples = int(p[SAMPLE_RATE] * p[DURATION_MS] * 1e-3) iir_obj = ntia_algorithm.DigitalFilter( @@ -1023,7 +1030,7 @@ def create_global_data_product_metadata(self) -> None: x_step=[p[SAMPLE_RATE] / FFT_SIZE], y_units="dBm/Hz", processing=[dft_obj.id], - reference=DATA_REFERENCE_POINT, + reference=data_products_reference, description=( "Results of statistical detectors (max, mean, median, 25th_percentile, 75th_percentile, " + "90th_percentile, 95th_percentile, 99th_percentile, 99.9th_percentile, 99.99th_percentile) " @@ -1043,7 +1050,7 @@ def create_global_data_product_metadata(self) -> None: x_stop=[pvt_x_axis__s[-1]], x_step=[pvt_x_axis__s[1] - pvt_x_axis__s[0]], y_units="dBm", - reference=DATA_REFERENCE_POINT, + reference=data_products_reference, description=( "Max- and mean-detected channel power vs. time, with " + f"an integration time of {p[TD_BIN_SIZE_MS]} ms. " @@ -1070,7 +1077,7 @@ def create_global_data_product_metadata(self) -> None: x_stop=[pfp_x_axis__s[-1]], x_step=[pfp_x_axis__s[1] - pfp_x_axis__s[0]], y_units="dBm", - reference=DATA_REFERENCE_POINT, + reference=data_products_reference, description=( "Channelized periodic frame power statistics reported over" + f" a {p[PFP_FRAME_PERIOD_MS]} ms frame period, with frame resolution" @@ -1093,6 +1100,7 @@ def create_global_data_product_metadata(self) -> None: y_start=[apd_y_axis__dBm[0]], y_stop=[apd_y_axis__dBm[-1]], y_step=[apd_y_axis__dBm[1] - apd_y_axis__dBm[0]], + reference=data_products_reference, description=( f"Estimate of the APD, using a {p[APD_BIN_SIZE_DB]} dB " + "bin size for amplitude values. The data payload includes" @@ -1111,6 +1119,7 @@ def create_global_data_product_metadata(self) -> None: + pfp_length * len(PFP_M3_DETECTOR) * 2 + apd_graph.length ) + logger.debug(f"Total channel length:{self.total_channel_data_length}") def create_capture_segment( self, @@ -1126,11 +1135,15 @@ def create_capture_segment( duration=measurement_result[DURATION_MS], overload=measurement_result["overload"], sensor_calibration=ntia_sensor.Calibration( - datetime=measurement_result["sensor_cal"]["datetime"], - gain=round(measurement_result["sensor_cal"]["gain"], 3), - noise_figure=round(measurement_result["sensor_cal"]["noise_figure"], 3), - temperature=round(measurement_result["sensor_cal"]["temperature"], 1), - reference=DATA_REFERENCE_POINT, + datetime=self.sensor.sensor_calibration_data["datetime"], + gain=round(measurement_result["applied_calibration"]["gain"], 3), + noise_figure=round( + measurement_result["applied_calibration"]["noise_figure"], 3 + ), + temperature=round( + self.sensor.sensor_calibration_data["temperature"], 1 + ), + reference=measurement_result["reference"], ), sigan_settings=ntia_sensor.SiganSettings( reference_level=self.sensor.signal_analyzer.reference_level, diff --git a/scos_actions/actions/acquire_single_freq_fft.py b/scos_actions/actions/acquire_single_freq_fft.py index cf31ae8f..25f0b3f3 100644 --- a/scos_actions/actions/acquire_single_freq_fft.py +++ b/scos_actions/actions/acquire_single_freq_fft.py @@ -91,7 +91,6 @@ from numpy import float32, ndarray from scos_actions.actions.interfaces.measurement_action import MeasurementAction -from scos_actions.hardware.mocks.mock_gps import MockGPS from scos_actions.metadata.structs import ntia_algorithm from scos_actions.signal_processing.fft import ( get_fft, @@ -153,10 +152,6 @@ def __init__(self, parameters: dict): self.classification = get_parameter(CLASSIFICATION, self.parameters) self.cal_adjust = get_parameter(CAL_ADJUST, self.parameters) assert isinstance(self.cal_adjust, bool) - if self.cal_adjust: - self.data_reference = "calibration terminal" - else: - self.data_reference = "signal analyzer input" # FFT setup self.fft_detector = create_statistical_detector( "M4sDetector", ["min", "max", "mean", "median", "sample"] @@ -169,7 +164,7 @@ def __init__(self, parameters: dict): def execute(self, schedule_entry: dict, task_id: int) -> dict: # Acquire IQ data and generate M4S result measurement_result = self.acquire_data( - self.num_samples, self.nskip, self.cal_adjust + self.num_samples, self.nskip, self.cal_adjust, cal_params=self.parameters ) # Actual sample rate may differ from configured value sample_rate_Hz = measurement_result["sample_rate"] @@ -178,9 +173,9 @@ def execute(self, schedule_entry: dict, task_id: int) -> dict: # Save measurement results measurement_result["data"] = m4s_result measurement_result.update(self.parameters) - measurement_result[ - "calibration_datetime" - ] = self.sensor.signal_analyzer.sensor_calibration_data["datetime"] + # measurement_result["calibration_datetime"] = ( + # self.sensor.sensor_calibration_data["datetime"] + # ) measurement_result["task_id"] = task_id measurement_result["classification"] = self.classification @@ -270,7 +265,7 @@ def create_metadata(self, measurement_result: dict, recording: int = None) -> No x_stop=[frequencies[-1]], x_step=[frequencies[1] - frequencies[0]], y_units="dBm", - reference=self.data_reference, + reference=measurement_result["reference"], description=( "Results of min, max, mean, and median statistical detectors, " + f"along with a random sampling, from a set of {self.nffts} " diff --git a/scos_actions/actions/acquire_single_freq_tdomain_iq.py b/scos_actions/actions/acquire_single_freq_tdomain_iq.py index fb505090..8ddaa11a 100644 --- a/scos_actions/actions/acquire_single_freq_tdomain_iq.py +++ b/scos_actions/actions/acquire_single_freq_tdomain_iq.py @@ -37,7 +37,6 @@ from scos_actions import utils from scos_actions.actions.interfaces.measurement_action import MeasurementAction -from scos_actions.hardware.mocks.mock_gps import MockGPS from scos_actions.utils import get_parameter logger = logging.getLogger(__name__) @@ -84,14 +83,16 @@ def execute(self, schedule_entry: dict, task_id: int) -> dict: # Use the sigan's actual reported instead of requested sample rate sample_rate = self.sensor.signal_analyzer.sample_rate num_samples = int(sample_rate * self.duration_ms * 1e-3) - measurement_result = self.acquire_data(num_samples, self.nskip, self.cal_adjust) + measurement_result = self.acquire_data( + num_samples, self.nskip, self.cal_adjust, cal_params=self.parameters + ) end_time = utils.get_datetime_str_now() measurement_result.update(self.parameters) measurement_result["end_time"] = end_time measurement_result["task_id"] = task_id - measurement_result[ - "calibration_datetime" - ] = self.sensor.signal_analyzer.sensor_calibration_data["datetime"] + # measurement_result["calibration_datetime"] = ( + # self.sensor.sensor_calibration_data["datetime"] + # ) measurement_result["classification"] = self.classification sigan_settings = self.get_sigan_settings(measurement_result) logger.debug(f"sigan settings:{sigan_settings}") diff --git a/scos_actions/actions/acquire_stepped_freq_tdomain_iq.py b/scos_actions/actions/acquire_stepped_freq_tdomain_iq.py index 39575f8b..6de6c5a9 100644 --- a/scos_actions/actions/acquire_stepped_freq_tdomain_iq.py +++ b/scos_actions/actions/acquire_stepped_freq_tdomain_iq.py @@ -100,7 +100,9 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): cal_adjust = get_parameter(CAL_ADJUST, measurement_params) sample_rate = self.sensor.signal_analyzer.sample_rate num_samples = int(sample_rate * duration_ms * 1e-3) - measurement_result = super().acquire_data(num_samples, nskip, cal_adjust) + measurement_result = super().acquire_data( + num_samples, nskip, cal_adjust, cal_params=measurement_params + ) measurement_result.update(measurement_params) end_time = utils.get_datetime_str_now() measurement_result["end_time"] = end_time @@ -117,22 +119,18 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): overload=measurement_result["overload"], sigan_settings=sigan_settings, ) - sigan_cal = self.sensor.signal_analyzer.sigan_calibration_data - sensor_cal = self.sensor.signal_analyzer.sensor_calibration_data - if sigan_cal is not None: - if "1db_compression_point" in sigan_cal: - sigan_cal["compression_point"] = sigan_cal.pop( - "1db_compression_point" - ) - capture_segment.sigan_calibration = ntia_sensor.Calibration(**sigan_cal) + sensor_cal = self.sensor.sensor_calibration_data if sensor_cal is not None: if "1db_compression_point" in sensor_cal: sensor_cal["compression_point"] = sensor_cal.pop( "1db_compression_point" ) - capture_segment.sensor_calibration = ntia_sensor.Calibration( - **sensor_cal - ) + if "reference" not in sensor_cal: + # If the calibration data already includes this, don't overwrite + sensor_cal["reference"] = measurement_result["reference"] + capture_segment.sensor_calibration = ntia_sensor.Calibration( + **sensor_cal + ) measurement_result["capture_segment"] = capture_segment self.create_metadata(measurement_result, recording_id) diff --git a/scos_actions/actions/calibrate_y_factor.py b/scos_actions/actions/calibrate_y_factor.py index 15cf9fd9..af00dcbe 100644 --- a/scos_actions/actions/calibrate_y_factor.py +++ b/scos_actions/actions/calibrate_y_factor.py @@ -17,7 +17,13 @@ # - SCOS Markdown Editor: https://ntia.github.io/scos-md-editor/ # r"""Perform a Y-Factor Calibration. -Supports calibration of gain and noise figure for one or more channels. +Supports calculation of gain and noise figure for one or more channels using the +Y-Factor method. Results are written to the file specified by the environment +variable ``ONBOARD_CALIBRATION_FILE``. If the sensor already has a sensor calibration +object, it is used as the starting point, and copied to a new onboard calibration object +which is updated by this action. The sensor object's sensor calibration will be set to +the updated onboard calibration object after this action is run. + For each center frequency, sets the preselector to the noise diode path, turns noise diode on, performs a mean power measurement, turns the noise diode off and performs another mean power measurement. The mean power on and mean power off @@ -71,13 +77,16 @@ import logging import os import time +from pathlib import Path import numpy as np +from environs import Env from scipy.constants import Boltzmann from scipy.signal import sosfilt from scos_actions import utils from scos_actions.actions.interfaces.action import Action +from scos_actions.calibration.sensor_calibration import SensorCalibration from scos_actions.hardware.sensor import Sensor from scos_actions.hardware.sigan_iface import SIGAN_SETTINGS_KEYS from scos_actions.signal_processing.calibration import ( @@ -92,9 +101,10 @@ from scos_actions.signal_processing.power_analysis import calculate_power_watts from scos_actions.signal_processing.unit_conversion import convert_watts_to_dBm from scos_actions.signals import trigger_api_restart -from scos_actions.utils import ParameterException, get_parameter +from scos_actions.utils import ParameterException, get_datetime_str_now, get_parameter logger = logging.getLogger(__name__) +env = Env() # Define parameter keys RF_PATH = Action.PRESELECTOR_PATH_KEY @@ -112,6 +122,7 @@ IIR_RESP_FREQS = "iir_num_response_frequencies" CAL_SOURCE_IDX = "cal_source_idx" TEMP_SENSOR_IDX = "temp_sensor_idx" +REFERENCE_POINT = "reference_point" class YFactorCalibration(Action): @@ -196,6 +207,44 @@ def __init__(self, parameters: dict): def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): """This is the entrypoint function called by the scheduler.""" self.sensor = sensor + + # Prepare the sensor calibration object. + assert all( + self.iteration_params[0][REFERENCE_POINT] == p[REFERENCE_POINT] + for p in self.iteration_params + ), f"All iterations must use the same '{REFERENCE_POINT}' setting" + onboard_cal_reference = self.iteration_params[0][REFERENCE_POINT] + + if self.sensor.sensor_calibration is None: + # Create a new sensor calibration object and attach it to the sensor. + # The calibration parameters will be set to the sigan parameters used + # in the action YAML parameters. + logger.debug(f"Creating a new onboard cal object for the sensor.") + cal_params = [k for k in self.iteration_params if k in SIGAN_SETTINGS_KEYS] + cal_data = dict() + last_cal_datetime = get_datetime_str_now() + clock_rate_lookup_by_sample_rate = [] + sensor_uid = "Sensor calibration file not provided" + self.sensor.sensor_calibration = SensorCalibration( + calibration_parameters=cal_params, + calibration_data=cal_data, + calibration_reference=onboard_cal_reference, + file_path=Path(env("ONBOARD_CALIBRATION_FILE")), + last_calibration_datetime=last_cal_datetime, + clock_rate_lookup_by_sample_rate=clock_rate_lookup_by_sample_rate, + sensor_uid=sensor_uid, + ) + elif self.sensor.sensor_calibration.file_path == env( + "ONBOARD_CALIBRATION_FILE" + ): + # Already using an onboard cal file. + logger.debug("Onboard calibration file already in use. Continuing.") + else: + # Sensor calibration file exists. Change it to an onboard cal file + logger.debug("Making new onboard cal file from existing sensor cal") + self.sensor.sensor_calibration.calibration_reference = onboard_cal_reference + self.sensor.sensor_calibration.file_path = env("ONBOARD_CALIBRATION_FILE") + self.test_required_components() detail = "" @@ -205,6 +254,8 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): detail += self.calibrate(p) else: detail += os.linesep + self.calibrate(p) + # Save results to onboard calibration file + self.sensor.sensor_calibration.to_json() return detail def calibrate(self, params: dict): @@ -228,10 +279,8 @@ def calibrate(self, params: dict): # Get noise diode on IQ logger.debug("Acquiring IQ samples with noise diode ON") - noise_on_measurement_result = ( - self.sensor.signal_analyzer.acquire_time_domain_samples( - num_samples, num_samples_skip=nskip, cal_adjust=False - ) + noise_on_measurement_result = self.sensor.acquire_time_domain_samples( + num_samples, nskip, cal_adjust=False ) sample_rate = noise_on_measurement_result["sample_rate"] @@ -242,10 +291,8 @@ def calibrate(self, params: dict): # Get noise diode off IQ logger.debug("Acquiring IQ samples with noise diode OFF") - noise_off_measurement_result = ( - self.sensor.signal_analyzer.acquire_time_domain_samples( - num_samples, num_samples_skip=nskip, cal_adjust=False - ) + noise_off_measurement_result = self.sensor.acquire_time_domain_samples( + num_samples, nskip, cal_adjust=False ) assert ( sample_rate == noise_off_measurement_result["sample_rate"] @@ -259,24 +306,22 @@ def calibrate(self, params: dict): noise_on_data = sosfilt(self.iir_sos, noise_on_measurement_result["data"]) noise_off_data = sosfilt(self.iir_sos, noise_off_measurement_result["data"]) else: - if self.sensor.signal_analyzer.sensor_calibration.is_default: - raise Exception( - "Calibrations without IIR filter cannot be performed with default calibration." - ) - logger.debug("Skipping IIR filtering") # Get ENBW from sensor calibration - assert set( - self.sensor.signal_analyzer.sensor_calibration.calibration_parameters - ) <= set( + assert set(self.sensor.sensor_calibration.calibration_parameters) <= set( sigan_params.keys() ), f"Action parameters do not include all required calibration parameters" cal_args = [ sigan_params[k] - for k in self.sensor.signal_analyzer.sensor_calibration.calibration_parameters + for k in self.sensor.sensor_calibration.calibration_parameters ] - self.sensor.signal_analyzer.recompute_sensor_calibration_data(cal_args) - enbw_hz = self.sensor.signal_analyzer.sensor_calibration_data["enbw"] + self.sensor.recompute_sensor_calibration_data(cal_args) + if "enbw" not in self.sensor.sensor_calibration_data: + raise Exception( + "Unable to perform Y-Factor calibration without IIR filtering when no" + " ENBW is provided in the sensor calibration file." + ) + enbw_hz = self.sensor.sensor_calibration_data["enbw"] noise_on_data = noise_on_measurement_result["data"] noise_off_data = noise_off_measurement_result["data"] @@ -294,7 +339,7 @@ def calibrate(self, params: dict): ) # Update sensor calibration with results - self.sensor.signal_analyzer.sensor_calibration.update( + self.sensor.sensor_calibration.update( sigan_params, utils.get_datetime_str_now(), gain, noise_figure, temp_c ) diff --git a/scos_actions/actions/interfaces/measurement_action.py b/scos_actions/actions/interfaces/measurement_action.py index 88a0a239..c8fb1847 100644 --- a/scos_actions/actions/interfaces/measurement_action.py +++ b/scos_actions/actions/interfaces/measurement_action.py @@ -52,8 +52,7 @@ def create_capture_segment( overload=overload, sigan_settings=sigan_settings, ) - sigan_cal = self.sensor.signal_analyzer.sigan_calibration_data - sensor_cal = self.sensor.signal_analyzer.sensor_calibration_data + sensor_cal = self.sensor.sensor_calibration_data # Rename compression point keys if they exist # then set calibration metadata if it exists if sensor_cal is not None: @@ -62,10 +61,6 @@ def create_capture_segment( "1db_compression_point" ) capture_segment.sensor_calibration = ntia_sensor.Calibration(**sensor_cal) - if sigan_cal is not None: - if "1db_compression_point" in sigan_cal: - sigan_cal["compression_point"] = sigan_cal.pop("1db_compression_point") - capture_segment.sigan_calibration = ntia_sensor.Calibration(**sigan_cal) return capture_segment def create_metadata( @@ -160,17 +155,22 @@ def send_signals(self, task_id, metadata, measurement_data): ) def acquire_data( - self, num_samples: int, nskip: int = 0, cal_adjust: bool = True + self, + num_samples: int, + nskip: int = 0, + cal_adjust: bool = True, + cal_params: Optional[dict] = None, ) -> dict: logger.debug( f"Acquiring {num_samples} IQ samples, skipping the first {nskip} samples" + f" and {'' if cal_adjust else 'not '}applying gain adjustment based" + " on calibration data" ) - measurement_result = self.sensor.signal_analyzer.acquire_time_domain_samples( + measurement_result = self.sensor.acquire_time_domain_samples( num_samples, num_samples_skip=nskip, cal_adjust=cal_adjust, + cal_params=cal_params, ) return measurement_result diff --git a/scos_actions/actions/tests/test_acquire_single_freq_fft.py b/scos_actions/actions/tests/test_acquire_single_freq_fft.py index 0419d7cc..e4d9c9fb 100644 --- a/scos_actions/actions/tests/test_acquire_single_freq_fft.py +++ b/scos_actions/actions/tests/test_acquire_single_freq_fft.py @@ -1,7 +1,6 @@ from scos_actions.actions.tests.utils import check_metadata_fields from scos_actions.discover import test_actions as actions -from scos_actions.hardware.mocks.mock_sigan import MockSignalAnalyzer -from scos_actions.hardware.sensor import Sensor +from scos_actions.hardware.mocks.mock_sensor import MockSensor from scos_actions.signals import measurement_action_completed SINGLE_FREQUENCY_FFT_ACQUISITION = { @@ -29,9 +28,8 @@ def callback(sender, **kwargs): measurement_action_completed.connect(callback) action = actions["test_single_frequency_m4s_action"] assert action.description - sensor = Sensor(signal_analyzer=MockSignalAnalyzer(), capabilities={}) action( - sensor=sensor, + sensor=MockSensor(), schedule_entry=SINGLE_FREQUENCY_FFT_ACQUISITION, task_id=1, ) @@ -84,11 +82,3 @@ def callback(sender, **kwargs): ] ] ) - - -def test_num_samples_skip(): - action = actions["test_single_frequency_m4s_action"] - assert action.description - sensor = Sensor(signal_analyzer=MockSignalAnalyzer(), capabilities={}) - action(sensor, SINGLE_FREQUENCY_FFT_ACQUISITION, 1) - assert action.sensor.signal_analyzer._num_samples_skip == action.parameters["nskip"] diff --git a/scos_actions/actions/tests/test_monitor_sigan.py b/scos_actions/actions/tests/test_monitor_sigan.py index 9a00ea71..38085efd 100644 --- a/scos_actions/actions/tests/test_monitor_sigan.py +++ b/scos_actions/actions/tests/test_monitor_sigan.py @@ -1,6 +1,6 @@ from scos_actions.discover import test_actions as actions +from scos_actions.hardware.mocks.mock_sensor import MockSensor from scos_actions.hardware.mocks.mock_sigan import MockSignalAnalyzer -from scos_actions.hardware.sensor import Sensor from scos_actions.signals import trigger_api_restart MONITOR_SIGAN_SCHEDULE = { @@ -23,10 +23,10 @@ def callback(sender, **kwargs): action = actions["test_monitor_sigan"] mock_sigan = MockSignalAnalyzer() mock_sigan._is_available = False - sensor = Sensor(signal_analyzer=mock_sigan, capabilities={}) + sensor = MockSensor(signal_analyzer=mock_sigan) action(sensor, MONITOR_SIGAN_SCHEDULE, 1) assert _api_restart_triggered == True # signal sent - mock_sigan._is_available = True + sensor.signal_analyzer._is_available = True def test_monitor_sigan_not_healthy(): @@ -40,7 +40,7 @@ def callback(sender, **kwargs): action = actions["test_monitor_sigan"] mock_sigan = MockSignalAnalyzer() mock_sigan.times_to_fail_recv = 6 - sensor = Sensor(signal_analyzer=mock_sigan, capabilities={}) + sensor = MockSensor(signal_analyzer=mock_sigan) action(sensor, MONITOR_SIGAN_SCHEDULE, 1) assert _api_restart_triggered == True # signal sent @@ -57,6 +57,6 @@ def callback(sender, **kwargs): mock_sigan = MockSignalAnalyzer() mock_sigan._is_available = True mock_sigan.set_times_to_fail_recv(0) - sensor = Sensor(signal_analyzer=mock_sigan, capabilities={}) + sensor = MockSensor(signal_analyzer=mock_sigan) action(sensor, MONITOR_SIGAN_SCHEDULE, 1) assert _api_restart_triggered == False # signal not sent diff --git a/scos_actions/actions/tests/test_single_freq_tdomain_iq.py b/scos_actions/actions/tests/test_single_freq_tdomain_iq.py index 67d86495..774ce91e 100644 --- a/scos_actions/actions/tests/test_single_freq_tdomain_iq.py +++ b/scos_actions/actions/tests/test_single_freq_tdomain_iq.py @@ -2,8 +2,8 @@ from scos_actions.actions.tests.utils import check_metadata_fields from scos_actions.discover import test_actions as actions +from scos_actions.hardware.mocks.mock_sensor import MockSensor from scos_actions.hardware.mocks.mock_sigan import MockSignalAnalyzer -from scos_actions.hardware.sensor import Sensor from scos_actions.signals import measurement_action_completed SINGLE_TIMEDOMAIN_IQ_ACQUISITION = { @@ -31,7 +31,7 @@ def callback(sender, **kwargs): measurement_action_completed.connect(callback) action = actions["test_single_frequency_iq_action"] assert action.description - sensor = Sensor(signal_analyzer=MockSignalAnalyzer(), capabilities={}) + sensor = MockSensor() action(sensor, SINGLE_TIMEDOMAIN_IQ_ACQUISITION, 1) assert _data.any() assert _metadata @@ -62,16 +62,7 @@ def test_required_components(): action = actions["test_single_frequency_m4s_action"] mock_sigan = MockSignalAnalyzer() mock_sigan._is_available = False - sensor = Sensor(signal_analyzer=mock_sigan, capabilities={}) + sensor = MockSensor(signal_analyzer=mock_sigan) with pytest.raises(RuntimeError): action(sensor, SINGLE_TIMEDOMAIN_IQ_ACQUISITION, 1) mock_sigan._is_available = True - - -def test_num_samples_skip(): - action = actions["test_single_frequency_iq_action"] - assert action.description - mock_sigan = MockSignalAnalyzer() - sensor = Sensor(signal_analyzer=mock_sigan, capabilities={}) - action(sensor, SINGLE_TIMEDOMAIN_IQ_ACQUISITION, 1) - assert action.sensor.signal_analyzer._num_samples_skip == action.parameters["nskip"] diff --git a/scos_actions/actions/tests/test_stepped_freq_tdomain_iq.py b/scos_actions/actions/tests/test_stepped_freq_tdomain_iq.py index e06920cc..2183d0bf 100644 --- a/scos_actions/actions/tests/test_stepped_freq_tdomain_iq.py +++ b/scos_actions/actions/tests/test_stepped_freq_tdomain_iq.py @@ -1,6 +1,5 @@ from scos_actions.discover import test_actions as actions -from scos_actions.hardware.mocks.mock_sigan import MockSignalAnalyzer -from scos_actions.hardware.sensor import Sensor +from scos_actions.hardware.mocks.mock_sensor import MockSensor from scos_actions.signals import measurement_action_completed SINGLE_TIMEDOMAIN_IQ_MULTI_RECORDING_ACQUISITION = { @@ -34,8 +33,7 @@ def callback(sender, **kwargs): measurement_action_completed.connect(callback) action = actions["test_multi_frequency_iq_action"] assert action.description - mock_sigan = MockSignalAnalyzer() - sensor = Sensor(signal_analyzer=mock_sigan, capabilities={}) + sensor = MockSensor() action(sensor, SINGLE_TIMEDOMAIN_IQ_MULTI_RECORDING_ACQUISITION, 1) for i in range(_count): assert _datas[i].any() @@ -43,21 +41,3 @@ def callback(sender, **kwargs): assert _task_ids[i] == 1 assert _recording_ids[i] == i + 1 assert _count == 10 - - -def test_num_samples_skip(): - action = actions["test_multi_frequency_iq_action"] - assert action.description - mock_sigan = MockSignalAnalyzer() - sensor = Sensor(signal_analyzer=mock_sigan, capabilities={}) - action(sensor, SINGLE_TIMEDOMAIN_IQ_MULTI_RECORDING_ACQUISITION, 1) - if isinstance(action.parameters["nskip"], list): - assert ( - action.sensor.signal_analyzer._num_samples_skip - == action.parameters["nskip"][-1] - ) - else: - assert ( - action.sensor.signal_analyzer._num_samples_skip - == action.parameters["nskip"] - ) diff --git a/scos_actions/actions/tests/test_sync_gps.py b/scos_actions/actions/tests/test_sync_gps.py index ed7b2691..0a98c154 100644 --- a/scos_actions/actions/tests/test_sync_gps.py +++ b/scos_actions/actions/tests/test_sync_gps.py @@ -4,9 +4,7 @@ import pytest from scos_actions.discover import test_actions -from scos_actions.hardware.mocks.mock_gps import MockGPS -from scos_actions.hardware.mocks.mock_sigan import MockSignalAnalyzer -from scos_actions.hardware.sensor import Sensor +from scos_actions.hardware.mocks.mock_sensor import MockSensor from scos_actions.signals import location_action_completed SYNC_GPS = { @@ -30,9 +28,7 @@ def callback(sender, **kwargs): location_action_completed.connect(callback) action = test_actions["test_sync_gps"] - sensor = Sensor( - signal_analyzer=MockSignalAnalyzer(), capabilities={}, gps=MockGPS() - ) + sensor = MockSensor() if sys.platform == "linux": action(sensor, SYNC_GPS, 1) assert _latitude diff --git a/scos_actions/calibration/calibration.py b/scos_actions/calibration/calibration.py deleted file mode 100644 index 2506d825..00000000 --- a/scos_actions/calibration/calibration.py +++ /dev/null @@ -1,214 +0,0 @@ -import json -import logging -from dataclasses import dataclass -from pathlib import Path -from typing import Dict, List, Union - -from scos_actions.signal_processing.calibration import CalibrationException - -logger = logging.getLogger(__name__) - - -@dataclass -class Calibration: - last_calibration_datetime: str - calibration_parameters: List[str] - calibration_data: dict - clock_rate_lookup_by_sample_rate: List[Dict[str, float]] - is_default: bool - file_path: Path - - def __post_init__(self): - # Convert key names in calibration_data to strings - # This means that formatting will always match between - # native types provided in Python and data loaded from JSON - self.calibration_data = json.loads(json.dumps(self.calibration_data)) - - def get_clock_rate(self, sample_rate: Union[float, int]) -> Union[float, int]: - """Find the clock rate (Hz) using the given sample_rate (samples per second)""" - for mapping in self.clock_rate_lookup_by_sample_rate: - if mapping["sample_rate"] == sample_rate: - return mapping["clock_frequency"] - return sample_rate - - def get_calibration_dict(self, cal_params: List[Union[float, int, bool]]) -> dict: - """ - Get calibration data closest to the specified parameter values. - - :param cal_params: List of calibration parameter values. For example, - if ``calibration_parameters`` are ``["sample_rate", "gain"]``, - then the input to this method could be ``["15360000.0", "40"]``. - :return: The calibration data corresponding to the input parameter values. - """ - - cal_data = self.calibration_data - for i, setting_value in enumerate(cal_params): - setting = self.calibration_parameters[i] - logger.debug(f"Looking up calibration for {setting} at {setting_value}") - cal_data = filter_by_parameter(cal_data, setting_value) - logger.debug(f"Got calibration data: {cal_data}") - - return cal_data - - def update( - self, - params: dict, - calibration_datetime_str: str, - gain_dB: float, - noise_figure_dB: float, - temp_degC: float, - ) -> None: - """ - Update the calibration data by overwriting or adding an entry. - - This method updates the instance variables of the ``Calibration`` - object and additionally writes these changes to the specified - output file. - - :param params: Parameters used for calibration. This must include - entries for all of the ``Calibration.calibration_parameters`` - Example: ``{"sample_rate": 14000000.0, "attenuation": 10.0}`` - :param calibration_datetime_str: Calibration datetime string, - as returned by ``scos_actions.utils.get_datetime_str_now()`` - :param gain_dB: Gain value from calibration, in dB. - :param noise_figure_dB: Noise figure value for calibration, in dB. - :param temp_degC: Temperature at calibration time, in degrees Celsius. - :param file_path: File path for saving the updated calibration data. - :raises Exception: - """ - cal_data = self.calibration_data - self.last_calibration_datetime = calibration_datetime_str - if len(self.calibration_parameters) == 0: - self.calibration_parameters = list(params.keys()) - # Ensure all required calibration parameters were used - elif not set(params.keys()) >= set(self.calibration_parameters): - raise Exception( - "Not enough parameters specified to update calibration.\n" - + f"Required parameters are {self.calibration_parameters}" - ) - - # Get calibration entry by parameters used - for parameter in self.calibration_parameters: - value = str(params[parameter]).lower() - logger.debug(f"Updating calibration at {parameter} = {value}") - try: - cal_data = cal_data[value] - except KeyError: - logger.debug( - f"Creating required calibration data field for {parameter} = {value}" - ) - cal_data[value] = {} - cal_data = cal_data[value] - - # Update calibration data - cal_data.update( - { - "datetime": calibration_datetime_str, - "gain": gain_dB, - "noise_figure": noise_figure_dB, - "temperature": temp_degC, - } - ) - - # Write updated calibration data to file - cal_dict = { - "last_calibration_datetime": self.last_calibration_datetime, - "calibration_parameters": self.calibration_parameters, - "clock_rate_lookup_by_sample_rate": self.clock_rate_lookup_by_sample_rate, - "calibration_data": self.calibration_data, - } - with open(self.file_path, "w") as outfile: - outfile.write(json.dumps(cal_dict)) - - -def load_from_json(fname: Path, is_default: bool) -> Calibration: - """ - Load a calibration from a JSON file. - - The JSON file must contain top-level fields: - ``last_calibration_datetime`` - ``calibration_parameters`` - ``calibration_data`` - ``clock_rate_lookup_by_sample_rate`` - - :param fname: The ``Path`` to the JSON calibration file. - :param is_default: If True, the loaded calibration file - is treated as the default calibration file. - :raises Exception: If the provided file does not include - the required keys. - :return: The ``Calibration`` object generated from the file. - """ - with open(fname) as file: - calibration = json.load(file) - # Check that the required fields are in the dict - required_keys = { - "last_calibration_datetime", - "calibration_data", - "clock_rate_lookup_by_sample_rate", - "calibration_parameters", - } - - if not calibration.keys() >= required_keys: - raise Exception( - "Loaded calibration dictionary is missing required fields." - + f"Existing fields: {set(calibration.keys())}\n" - + f"Required fields: {required_keys}\n" - ) - # Create and return the Calibration object - return Calibration( - calibration["last_calibration_datetime"], - calibration["calibration_parameters"], - calibration["calibration_data"], - calibration["clock_rate_lookup_by_sample_rate"], - is_default, - fname, - ) - - -def filter_by_parameter(calibrations: dict, value: Union[float, int, bool]) -> dict: - """ - Select a certain element by the value of a top-level key in a dictionary. - - This method should be recursively called to select calibration - data matching a set of calibration parameters. The ordering of - nested dictionaries should match the ordering of the required - calibration parameters in the calibration file. - - If ``value`` is a float or bool, ``str(value).lower()`` is used - as the dictionary key. If ``value`` is an int, and the previous - approach does not work, ``str(float(value))`` is attempted. This - allows for value ``1`` to match a key ``"1.0"``. - - :param calibrations: Calibration data dictionary. - :param value: The parameter value for filtering. This value should - exist as a top-level key in ``calibrations``. - :raises CalibrationException: If ``value`` cannot be matched to a - top-level key in ``calibrations``, or if ``calibrations`` is not - a dict. - :return: The value of ``calibrations[value]``, which should be a dict. - """ - try: - filtered_data = calibrations.get(str(value).lower(), None) - if filtered_data is None and isinstance(value, int): - # Try equivalent float for ints, i.e., match "1.0" to 1 - filtered_data = calibrations.get(str(float(value)), None) - if filtered_data is None and isinstance(value, float) and value.is_integer(): - # Check for, e.g., key '25' if value is '25.0' - filtered_data = calibrations.get(str(int(value)), None) - if filtered_data is None: - raise KeyError - else: - return filtered_data - except AttributeError as e: - # calibrations does not have ".get()" - # Generally means that calibrations is None or not a dict - msg = f"Provided calibration data is not a dict: {calibrations}" - raise CalibrationException(msg) - except KeyError as e: - msg = ( - f"Could not locate calibration data at {value}" - + f"\nAttempted lookup using key '{str(value).lower()}'" - + f"{f'and {float(value)}' if isinstance(value, int) else ''}" - + f"\nUsing calibration data: {calibrations}" - ) - raise CalibrationException(msg) diff --git a/scos_actions/calibration/differential_calibration.py b/scos_actions/calibration/differential_calibration.py new file mode 100644 index 00000000..ac518737 --- /dev/null +++ b/scos_actions/calibration/differential_calibration.py @@ -0,0 +1,35 @@ +""" +Dataclass implementation for "differential calibration" handling. + +A differential calibration provides loss values which represent excess loss +between the ``SensorCalibration.calibration_reference`` reference point and +another reference point. A typical usage would be for calibrating out measured +cable losses which exist between the antenna and the Y-factor calibration terminal. +At present, this is measured manually using a calibration probe consisting of a +calibrated noise source and a programmable attenuator. + +The ``DifferentialCalibration.calibration_data`` entries should be dictionaries +containing the key ``"loss"`` and a corresponding value in decibels (dB). A positive +value of ``"loss"`` indicates a LOSS going FROM ``DifferentialCalibration.calibration_reference`` +TO ``SensorCalibration.calibration_reference``. +""" + +from dataclasses import dataclass + +from scos_actions.calibration.interfaces.calibration import Calibration + + +@dataclass +class DifferentialCalibration(Calibration): + def update(self): + """ + SCOS Sensor should not update differential calibration files. + + Instead, these should be generated through an external calibration + process. This class should only be used to read JSON files, and never + to update their entries. Therefore, no ``update`` method is implemented. + + If, at some point in the future, functionality is added to automate these + calibrations, this function may be implemented. + """ + raise NotImplementedError diff --git a/scos_actions/calibration/interfaces/__init__.py b/scos_actions/calibration/interfaces/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/scos_actions/calibration/interfaces/calibration.py b/scos_actions/calibration/interfaces/calibration.py new file mode 100644 index 00000000..011e56b5 --- /dev/null +++ b/scos_actions/calibration/interfaces/calibration.py @@ -0,0 +1,132 @@ +""" +TODO +""" + +import dataclasses +import json +import logging +from abc import abstractmethod +from pathlib import Path +from typing import List, get_origin + +from scos_actions.calibration.utils import ( + CalibrationParametersMissingException, + filter_by_parameter, +) + +logger = logging.getLogger(__name__) + + +@dataclasses.dataclass +class Calibration: + calibration_parameters: List[str] + calibration_data: dict + calibration_reference: str + file_path: Path + + def __post_init__(self): + self._validate_fields() + # Convert key names in data to strings + # This means that formatting will always match between + # native types provided in Python and data loaded from JSON + self.calibration_data = json.loads(json.dumps(self.calibration_data)) + + def _validate_fields(self) -> None: + """Loosely check that the input types are as expected.""" + for f_name, f_def in self.__dataclass_fields__.items(): + # Note that nested types are not checked: i.e., "List[str]" + # will surely be a list, but may not be filled with strings. + f_type = get_origin(f_def.type) or f_def.type + actual_value = getattr(self, f_name) + if not isinstance(actual_value, f_type): + c_name = self.__class__.__name__ + actual_type = type(actual_value) + raise TypeError( + f"{c_name} field {f_name} must be {f_type}, not {actual_type}" + ) + + def get_calibration_dict(self, params: dict) -> dict: + """ + Get calibration data entry at the specified parameter values. + + :param params: Parameters used for calibration. This must include + entries for all of the ``Calibration.calibration_parameters`` + Example: ``{"sample_rate": 14000000.0, "attenuation": 10.0}`` + :return: The calibration data corresponding to the input parameter values. + """ + # Check that input includes all required calibration parameters + if not set(params.keys()) >= set(self.calibration_parameters): + raise CalibrationParametersMissingException( + params, self.calibration_parameters + ) + cal_data = self.calibration_data + for p_name in self.calibration_parameters: + p_value = params[p_name] + logger.debug(f"Looking up calibration data at {p_name}={p_value}") + cal_data = filter_by_parameter(cal_data, p_value) + + logger.debug(f"Got calibration data: {cal_data}") + + return cal_data + + @abstractmethod + def update(self): + """Update the calibration data""" + raise NotImplementedError + + @classmethod + def from_json(cls, fname: Path): + """ + Load a calibration from a JSON file. + + The JSON file must contain top-level fields + with names identical to the dataclass fields for + the class being constructed. + + :param fname: The ``Path`` to the JSON calibration file. + :raises Exception: If the provided file does not include + the required keys. + :return: The ``Calibration`` object generated from the file. + """ + with open(fname) as file: + calibration = json.load(file) + cal_file_keys = set(calibration.keys()) + + # Check that only the required fields are in the dict + required_keys = {f.name for f in dataclasses.fields(cls)} + required_keys -= {"file_path"} # not required in JSON + if cal_file_keys == required_keys: + pass + elif cal_file_keys >= required_keys: + extra_keys = cal_file_keys - required_keys + logger.warning( + f"Loaded calibration file contains fields which will be ignored: {extra_keys}" + ) + for k in extra_keys: + calibration.pop(k, None) + else: + raise Exception( + "Loaded calibration dictionary is missing required fields.\n" + + f"Existing fields: {cal_file_keys}\n" + + f"Required fields: {required_keys}\n" + + f"Missing fields: {required_keys - cal_file_keys}" + ) + + # Create and return the Calibration object + return cls(file_path=fname, **calibration) + + def to_json(self) -> None: + """ + Save the calibration to a JSON file. + + The JSON file will be located at ``self.file_path`` and will + contain a copy of ``self.__dict__``, except for the ``file_path`` + key/value pair. This includes all dataclass fields, with their + parameter names as JSON key names. + """ + dict_to_json = self.__dict__.copy() + # Remove keys which should not save to JSON + dict_to_json.pop("file_path", None) + with open(self.file_path, "w") as outfile: + outfile.write(json.dumps(dict_to_json)) + logger.debug("Finished updating calibration file.") diff --git a/scos_actions/calibration/sensor_calibration.py b/scos_actions/calibration/sensor_calibration.py new file mode 100644 index 00000000..0330b944 --- /dev/null +++ b/scos_actions/calibration/sensor_calibration.py @@ -0,0 +1,130 @@ +""" +TODO +""" + +import logging +from dataclasses import dataclass +from datetime import datetime +from typing import Dict, List, Union + +from environs import Env + +from scos_actions.calibration.interfaces.calibration import Calibration +from scos_actions.calibration.utils import CalibrationEntryMissingException +from scos_actions.utils import parse_datetime_iso_format_str + +logger = logging.getLogger(__name__) + + +@dataclass +class SensorCalibration(Calibration): + last_calibration_datetime: str + clock_rate_lookup_by_sample_rate: List[Dict[str, float]] + sensor_uid: str + + def get_clock_rate(self, sample_rate: Union[float, int]) -> Union[float, int]: + """Find the clock rate (Hz) using the given sample_rate (samples per second)""" + for mapping in self.clock_rate_lookup_by_sample_rate: + if mapping["sample_rate"] == sample_rate: + return mapping["clock_frequency"] + return sample_rate + + def update( + self, + params: dict, + calibration_datetime_str: str, + gain_dB: float, + noise_figure_dB: float, + temp_degC: float, + ) -> None: + """ + Update the calibration data by overwriting or adding an entry. + + This updates the instance variables of the ``SensorCalibration`` + object and additionally writes these changes to the specified + output file. + + :param params: Parameters used for calibration. This must include + entries for all of the ``Calibration.calibration_parameters`` + Example: ``{"sample_rate": 14000000.0, "attenuation": 10.0}`` + :param calibration_datetime_str: Calibration datetime string, + as returned by ``scos_actions.utils.get_datetime_str_now()`` + :param gain_dB: Gain value from calibration, in dB. + :param noise_figure_dB: Noise figure value for calibration, in dB. + :param temp_degC: Temperature at calibration time, in degrees Celsius. + :param file_path: File path for saving the updated calibration data. + """ + logger.debug("Updating calibration file.") + try: + # Get existing calibration data entry which will be updated + data_entry = self.get_calibration_dict(params) + except CalibrationEntryMissingException: + # Existing entry does not exist for these parameters. Make one. + data_entry = self.calibration_data + for p_name in self.calibration_parameters: + p_val = params[p_name] + try: + data_entry = data_entry[p_val] + except KeyError: + logger.debug( + f"Creating calibration data field for {p_name}={p_val}" + ) + data_entry[p_val] = {} + data_entry = data_entry[p_val] + except Exception as e: + logger.exception("Failed to update calibration data.") + raise e + + # Update last calibration datetime + self.last_calibration_datetime = calibration_datetime_str + + # Update calibration data entry (updates entry in self.calibration_data) + data_entry.update( + { + "datetime": calibration_datetime_str, + "gain": gain_dB, + "noise_figure": noise_figure_dB, + "temperature": temp_degC, + } + ) + + # Write updated calibration data to file + self.to_json() + + def expired(self) -> bool: + env = Env() + time_limit = env.int("CALIBRATION_EXPIRATION_LIMIT", default=None) + logger.debug("Checking if calibration has expired.") + if time_limit is None: + return False + elif self.calibration_data is None: + return True + elif len(self.calibration_data) == 0: + return True + else: + now_string = datetime.utcnow().isoformat(timespec="milliseconds") + "Z" + now = parse_datetime_iso_format_str(now_string) + cal_data = self.calibration_data + return has_expired_cal_data(cal_data, now, time_limit) + + +def has_expired_cal_data(cal_data: dict, now: datetime, time_limit: int) -> bool: + expired = False + if "datetime" in cal_data: + expired = expired or date_expired(cal_data, now, time_limit) + + for key, value in cal_data.items(): + if isinstance(value, dict): + expired = expired or has_expired_cal_data(value, now, time_limit) + return expired + + +def date_expired(cal_data: dict, now: datetime, time_limit: int): + cal_datetime = parse_datetime_iso_format_str(cal_data["datetime"]) + elapsed = now - cal_datetime + if elapsed.total_seconds() > time_limit: + logger.debug( + f"Calibration {cal_data} has expired at {elapsed.total_seconds()} seconds old." + ) + return True + return False diff --git a/scos_actions/calibration/tests/test_calibration.py b/scos_actions/calibration/tests/test_calibration.py index 9e503acd..20c160b8 100644 --- a/scos_actions/calibration/tests/test_calibration.py +++ b/scos_actions/calibration/tests/test_calibration.py @@ -1,322 +1,125 @@ -"""Test aspects of ScaleFactors.""" +"""Test the Calibration base dataclass.""" -import datetime +import dataclasses import json -import random -from copy import deepcopy -from math import isclose from pathlib import Path +from typing import List import pytest -from scos_actions.calibration.calibration import ( - Calibration, - filter_by_parameter, - load_from_json, -) -from scos_actions.signal_processing.calibration import CalibrationException -from scos_actions.tests.resources.utils import easy_gain -from scos_actions.utils import get_datetime_str_now, parse_datetime_iso_format_str +from scos_actions.calibration.interfaces.calibration import Calibration +from scos_actions.calibration.sensor_calibration import SensorCalibration +from scos_actions.calibration.tests.utils import recursive_check_keys -class TestCalibrationFile: - # Ensure we load the test file - setup_complete = False - - def rand_index(self, l): - """Get a random index for a list""" - return random.randint(0, len(l) - 1) - - def check_duplicate(self, sr, f, g): - """Check if a set of points was already tested""" - for pt in self.pytest_points: - duplicate_f = f == pt["frequency"] - duplicate_g = g == pt["setting_value"] - duplicate_sr = sr == pt["sample_rate"] - if duplicate_f and duplicate_g and duplicate_sr: - return True - - def run_pytest_point(self, sr, f, g, reason, sr_m=False, f_m=False, g_m=False): - """Test the calculated value against the algorithm - Parameters: - sr, f, g -> Set values for the mock USRP - reason: Test case string for failure reference - sr_m, f_m, g_m -> Set values to use when calculating the expected value - May differ in from actual set points in edge cases - such as tuning in divisions or uncalibrated sample rate""" - # Check that the setup was completed - assert self.setup_complete, "Setup was not completed" - - # If this point was tested before, skip it (triggering a new one) - if self.check_duplicate(sr, f, g): - return False - - # If the point doesn't have modified inputs, use the algorithm ones - if not f_m: - f_m = f - if not g_m: - g_m = g - if not sr_m: - sr_m = sr - - # Calculate what the scale factor should be - calc_gain_sigan = easy_gain(sr_m, f_m, g_m) - - # Get the scale factor from the algorithm - interp_cal_data = self.sample_cal.get_calibration_dict([sr, f, g]) - interp_gain_siggan = interp_cal_data["gain"] - - # Save the point so we don't duplicate - self.pytest_points.append( - { - "sample_rate": int(sr), - "frequency": f, - "setting_value": g, - "gain": calc_gain_sigan, - "test": reason, - } - ) - - # Check if the point was calculated correctly - tolerance = 1e-5 - msg = "Scale factor not correctly calculated!\r\n" - msg = f"{msg} Expected value: {calc_gain_sigan}\r\n" - msg = f"{msg} Calculated value: {interp_gain_siggan}\r\n" - msg = f"{msg} Tolerance: {tolerance}\r\n" - msg = f"{msg} Test: {reason}\r\n" - msg = f"{msg} Sample Rate: {sr / 1e6}({sr_m / 1e6})\r\n" - msg = f"{msg} Frequency: {f / 1e6}({f_m / 1e6})\r\n" - msg = f"{msg} Gain: {g}({g_m})\r\n" - msg = ( - "{} Formula: -1 * (Gain - Frequency[GHz] - Sample Rate[MHz])\r\n".format( - msg - ) - ) - if not isclose(calc_gain_sigan, interp_gain_siggan, abs_tol=tolerance): - interp_cal_data = self.sample_cal.get_calibration_dict([sr, f, g]) - - assert isclose(calc_gain_sigan, interp_gain_siggan, abs_tol=tolerance), msg - return True - +class TestBaseCalibration: @pytest.fixture(autouse=True) - def setup_calibration_file(self, tmpdir): - """Create the dummy calibration file in the pytest temp directory""" - - # Only setup once - if self.setup_complete: - return - - # Create and save the temp directory and file - self.tmpdir = tmpdir.strpath - self.calibration_file = "{}".format(tmpdir.join("dummy_cal_file.json")) - - # Setup variables - self.dummy_noise_figure = 10 - self.dummy_compression = -20 - self.test_repeat_times = 3 - - # Sweep variables - self.sample_rates = [10e6, 15.36e6, 40e6] - self.gain_min = 40 - self.gain_max = 60 - self.gain_step = 10 - gains = list(range(self.gain_min, self.gain_max, self.gain_step)) + [ - self.gain_max - ] - self.frequency_min = 1000000000 - self.frequency_max = 3400000000 - self.frequency_step = 200000000 - frequencies = list( - range(self.frequency_min, self.frequency_max, self.frequency_step) - ) + [self.frequency_max] - frequencies = sorted(frequencies) - - # Start with blank cal data dicts + def setup_calibration_file(self, tmp_path: Path): + """Create a dummy calibration file in the pytest temp directory.""" + # Create some dummy calibration data + self.cal_params = ["frequency", "gain"] + self.frequencies = [3555e6, 3565e6, 3575e6] + self.gains = [10.0, 20.0, 30.0] cal_data = {} - - # Add the simple stuff to new cal format - cal_data["last_calibration_datetime"] = get_datetime_str_now() - cal_data["sensor_uid"] = "SAMPLE_CALIBRATION" - - # Add SR/CF lookup table - cal_data["clock_rate_lookup_by_sample_rate"] = [] - for sr in self.sample_rates: - cr = sr - while cr <= 40e6: - cr *= 2 - cr /= 2 - cal_data["clock_rate_lookup_by_sample_rate"].append( - {"sample_rate": int(sr), "clock_frequency": int(cr)} - ) - - # Create the JSON architecture for the calibration data - cal_data["calibration_data"] = {} - cal_data["calibration_parameters"] = ["sample_rate", "frequency", "gain"] - for k in range(len(self.sample_rates)): - cal_data_f = {} - for i in range(len(frequencies)): - cal_data_g = {} - for j in range(len(gains)): - # Create the scale factor that ensures easy interpolation - gain_sigan = easy_gain( - self.sample_rates[k], frequencies[i], gains[j] - ) - - # Create the data point - cal_data_point = { - "gain": gain_sigan, - "noise_figure": self.dummy_noise_figure, - "1dB_compression_point": self.dummy_compression, - } - - # Add the generated dicts to the parent lists - cal_data_g[gains[j]] = deepcopy(cal_data_point) - cal_data_f[frequencies[i]] = deepcopy(cal_data_g) - - cal_data["calibration_data"][self.sample_rates[k]] = deepcopy(cal_data_f) - - # Write the new json file - with open(self.calibration_file, "w+") as file: - json.dump(cal_data, file, indent=4) - - # Load the data back in - self.sample_cal = load_from_json(self.calibration_file, False) - - # Create a list of previous points to ensure that we don't repeat - self.pytest_points = [] - - # Create sweep lists for test points - self.srs = self.sample_rates - self.gi_s = list(range(self.gain_min, self.gain_max, self.gain_step)) - self.fi_s = list( - range(self.frequency_min, self.frequency_max, self.frequency_step) + for frequency in self.frequencies: + cal_data[frequency] = {} + for gain in self.gains: + cal_data[frequency][gain] = { + "gain": gain * 1.1, + "noise_figure": gain / 5.0, + "1dB_compression_point": -50 + gain, + } + self.cal_data = cal_data + self.dummy_file_path = tmp_path / "dummy_cal.json" + self.dummy_default_file_path = tmp_path / "dummy_default_cal.json" + + self.sample_cal = Calibration( + calibration_parameters=self.cal_params, + calibration_data=self.cal_data, + calibration_reference="testing", + file_path=self.dummy_file_path, ) - self.g_s = self.gi_s + [self.gain_max] - self.f_s = self.fi_s + [self.frequency_max] - - # Don't repeat test setup - self.setup_complete = True - - def test_filter_by_parameter_out_of_range(self): - calibrations = {200.0: {"some_cal_data"}, 300.0: {"more cal data"}} - with pytest.raises(CalibrationException) as e_info: - cal = filter_by_parameter(calibrations, 400.0) - assert ( - e_info.value.args[0] - == f"Could not locate calibration data at 400.0" - + f"\nAttempted lookup using key '400.0'" - + f"\nUsing calibration data: {calibrations}" - ) - def test_filter_by_parameter_in_range_requires_match(self): - calibrations = { - 200.0: {"Gain": "Gain at 200.0"}, - 300.0: {"Gain": "Gain at 300.0"}, - } - with pytest.raises(CalibrationException) as e_info: - cal = filter_by_parameter(calibrations, 150.0) - assert e_info.value.args[0] == ( - f"Could not locate calibration data at 150.0" - + f"\nAttempted lookup using key '150.0'" - + f"\nUsing calibration data: {calibrations}" - ) - - def test_get_calibration_dict_exact_match_lookup(self): - calibration_datetime = datetime.datetime.now() - calibration_params = ["sample_rate", "frequency"] - calibration_data = { - 100.0: {200.0: {"NF": "NF at 100, 200", "Gain": "Gain at 100, 200"}}, - 200.0: {100.0: {"NF": "NF at 200, 100", "Gain": "Gain at 200, 100"}}, - } - clock_rate_lookup_by_sample_rate = {} - cal = Calibration( - calibration_datetime, - calibration_params, - calibration_data, - clock_rate_lookup_by_sample_rate, - False, - Path(""), + self.sample_default_cal = Calibration( + calibration_parameters=self.cal_params, + calibration_data=self.cal_data, + calibration_reference="testing", + file_path=self.dummy_default_file_path, ) - cal_data = cal.get_calibration_dict([100.0, 200.0]) - assert cal_data["NF"] == "NF at 100, 200" - def test_get_calibration_dict_within_range(self): - calibration_datetime = datetime.datetime.now() - calibration_params = calibration_params = ["sample_rate", "frequency"] - calibration_data = { - 100.0: {200: {"NF": "NF at 100, 200"}, 300.0: "Cal data at 100,300"}, - 200.0: {100.0: {"NF": "NF at 200, 100"}}, - } - clock_rate_lookup_by_sample_rate = {} - test_cal_path = Path("test_calibration.json") - cal = Calibration( - calibration_datetime, - calibration_params, - calibration_data, - clock_rate_lookup_by_sample_rate, - False, - test_cal_path, + def test_calibration_data_key_name_conversion(self): + """On post-init, all calibration_data key names should be converted to strings.""" + recursive_check_keys(self.sample_cal.calibration_data) + recursive_check_keys(self.sample_default_cal.calibration_data) + + def test_calibration_dataclass_fields(self): + """Check that the dataclass is set up as expected.""" + fields = {f.name: f.type for f in dataclasses.fields(Calibration)} + # Note: does not check field order + assert fields == { + "calibration_parameters": List[str], + "calibration_reference": str, + "calibration_data": dict, + "file_path": Path, + }, "Calibration class fields have changed" + + def test_field_validator(self): + """Check that the input field type validator works.""" + with pytest.raises(TypeError): + _ = Calibration([], {}, "", False, False) + with pytest.raises(TypeError): + _ = Calibration([], {}, "", 100, Path("")) + with pytest.raises(TypeError): + _ = Calibration([], {}, 5, False, Path("")) + with pytest.raises(TypeError): + _ = Calibration([], [10, 20], "", False, Path("")) + with pytest.raises(TypeError): + _ = Calibration({"test": 1}, {}, "", False, Path("")) + + def test_get_calibration_dict(self): + """Check the get_calibration_dict method with all dummy data.""" + for f in self.frequencies: + for g in self.gains: + assert json.loads( + json.dumps(self.cal_data[f][g]) + ) == self.sample_cal.get_calibration_dict({"frequency": f, "gain": g}) + + def test_to_and_from_json(self, tmp_path: Path): + """Test the ``from_json`` factory method.""" + # First save the calibration data to temporary files + self.sample_cal.to_json() + self.sample_default_cal.to_json() + # Then load and compare + assert self.sample_cal == Calibration.from_json(self.dummy_file_path) + assert self.sample_default_cal == Calibration.from_json( + self.dummy_default_file_path ) - with pytest.raises(CalibrationException) as e_info: - cal_data = cal.get_calibration_dict([100.0, 250.0]) - assert e_info.value.args[0] == ( - f"Could not locate calibration data at 250.0" - + f"\nAttempted lookup using key '250.0'" - + f"\nUsing calibration data: {cal.calibration_data}" - ) - def test_sf_bound_points(self): - """Test SF determination at boundary points""" - self.run_pytest_point( - self.srs[0], self.frequency_min, self.gain_min, "Testing boundary points" - ) - self.run_pytest_point( - self.srs[0], self.frequency_max, self.gain_max, "Testing boundary points" + # from_json should ignore extra keys in the loaded file, but not fail + # Test this by trying to load a SensorCalibration as a Calibration + sensor_cal = SensorCalibration( + self.sample_cal.calibration_parameters, + self.sample_cal.calibration_data, + "testing", + tmp_path / "testing.json", + "dt_str", + [], + "uid", ) - - def test_sf_no_interpolation_points(self): - """Test points without interpolation""" - for i in range(4 * self.test_repeat_times): - while True: - g = self.g_s[self.rand_index(self.g_s)] - f = self.f_s[self.rand_index(self.f_s)] - if self.run_pytest_point( - self.srs[0], f, g, "Testing no interpolation points" - ): - break - - def test_update(self): - calibration_datetime = get_datetime_str_now() - calibration_params = ["sample_rate", "frequency"] - calibration_data = {100.0: {200.0: {"noise_figure": 0, "gain": 0}}} - clock_rate_lookup_by_sample_rate = {} - test_cal_path = Path("test_calibration.json") - cal = Calibration( - calibration_datetime, - calibration_params, - calibration_data, - clock_rate_lookup_by_sample_rate, - False, - test_cal_path, - ) - action_params = {"sample_rate": 100.0, "frequency": 200.0} - update_time = get_datetime_str_now() - cal.update(action_params, update_time, 30.0, 5.0, 21) - cal_from_file = load_from_json(test_cal_path, False) - test_cal_path.unlink() - file_utc_time = parse_datetime_iso_format_str(cal.last_calibration_datetime) - cal_time_utc = parse_datetime_iso_format_str(update_time) - assert file_utc_time.year == cal_time_utc.year - assert file_utc_time.month == cal_time_utc.month - assert file_utc_time.day == cal_time_utc.day - assert file_utc_time.hour == cal_time_utc.hour - assert file_utc_time.minute == cal_time_utc.minute - assert cal.calibration_data["100.0"]["200.0"]["gain"] == 30.0 - assert cal.calibration_data["100.0"]["200.0"]["noise_figure"] == 5.0 - assert cal_from_file.calibration_data["100.0"]["200.0"]["gain"] == 30.0 - assert cal_from_file.calibration_data["100.0"]["200.0"]["noise_figure"] == 5.0 - - def test_filter_by_paramter_integer(self): - calibrations = {"200.0": {"some_cal_data"}, 300.0: {"more cal data"}} - filtered_data = filter_by_parameter(calibrations, 200) - assert filtered_data is calibrations["200.0"] + sensor_cal.to_json() + loaded_cal = Calibration.from_json(tmp_path / "testing.json") + loaded_cal.file_path = self.sample_cal.file_path # Force these to be the same + assert loaded_cal == self.sample_cal + + # from_json should fail if required fields are missing + # Create an incorrect JSON file + almost_a_cal = {"calibration_parameters": []} + with open(tmp_path / "almost_a_cal.json", "w") as outfile: + outfile.write(json.dumps(almost_a_cal)) + with pytest.raises(Exception): + almost = Calibration.from_json(tmp_path / "almost_a_cal.json") + + def test_update_not_implemented(self): + """Ensure the update abstract method is not implemented in the base class""" + with pytest.raises(NotImplementedError): + self.sample_cal.update() diff --git a/scos_actions/calibration/tests/test_differential_calibration.py b/scos_actions/calibration/tests/test_differential_calibration.py new file mode 100644 index 00000000..5c9c80ca --- /dev/null +++ b/scos_actions/calibration/tests/test_differential_calibration.py @@ -0,0 +1,44 @@ +"""Test the DifferentialCalibration dataclass.""" + +import json +from pathlib import Path + +import pytest + +from scos_actions.calibration.differential_calibration import DifferentialCalibration + + +class TestDifferentialCalibration: + @pytest.fixture(autouse=True) + def setup_differential_calibration_file(self, tmp_path: Path): + dict_to_json = { + "calibration_parameters": ["frequency"], + "calibration_reference": "antenna input", + "calibration_data": {3555e6: 11.5}, + } + self.valid_file_path = tmp_path / "sample_diff_cal.json" + self.invalid_file_path = tmp_path / "sample_diff_cal_invalid.json" + + self.sample_diff_cal = DifferentialCalibration( + file_path=self.valid_file_path, **dict_to_json + ) + + with open(self.valid_file_path, "w") as f: + f.write(json.dumps(dict_to_json)) + + dict_to_json.pop("calibration_reference", None) + + with open(self.invalid_file_path, "w") as f: + f.write(json.dumps(dict_to_json)) + + def test_from_json(self): + """Check from_json functionality with valid and invalid dummy data.""" + diff_cal = DifferentialCalibration.from_json(self.valid_file_path) + assert diff_cal == self.sample_diff_cal + with pytest.raises(Exception): + _ = DifferentialCalibration.from_json(self.invalid_file_path) + + def test_update_not_implemented(self): + """Check that the update method is not implemented.""" + with pytest.raises(NotImplementedError): + self.sample_diff_cal.update() diff --git a/scos_actions/calibration/tests/test_sensor_calibration.py b/scos_actions/calibration/tests/test_sensor_calibration.py new file mode 100644 index 00000000..01d70854 --- /dev/null +++ b/scos_actions/calibration/tests/test_sensor_calibration.py @@ -0,0 +1,394 @@ +"""Test the SensorCalibration dataclass.""" + +import dataclasses +import datetime +import json +import random +from copy import deepcopy +from math import isclose +from pathlib import Path +from typing import Dict, List + +import pytest + +from scos_actions.calibration.interfaces.calibration import Calibration +from scos_actions.calibration.sensor_calibration import ( + SensorCalibration, + has_expired_cal_data, +) +from scos_actions.calibration.tests.utils import recursive_check_keys +from scos_actions.calibration.utils import CalibrationException +from scos_actions.tests.resources.utils import easy_gain +from scos_actions.utils import get_datetime_str_now, parse_datetime_iso_format_str + + +class TestSensorCalibrationFile: + # Ensure we load the test file + setup_complete = False + + def rand_index(self, l): + """Get a random index for a list""" + return random.randint(0, len(l) - 1) + + def check_duplicate(self, sr, f, g): + """Check if a set of points was already tested""" + for pt in self.pytest_points: + duplicate_f = f == pt["frequency"] + duplicate_g = g == pt["setting_value"] + duplicate_sr = sr == pt["sample_rate"] + if duplicate_f and duplicate_g and duplicate_sr: + return True + + def run_pytest_point(self, sr, f, g, reason, sr_m=False, f_m=False, g_m=False): + """Test the calculated value against the algorithm + Parameters: + sr, f, g -> Set values for the mock USRP + reason: Test case string for failure reference + sr_m, f_m, g_m -> Set values to use when calculating the expected value + May differ in from actual set points in edge cases + such as tuning in divisions or uncalibrated sample rate""" + # Check that the setup was completed + assert self.setup_complete, "Setup was not completed" + + # If this point was tested before, skip it (triggering a new one) + if self.check_duplicate(sr, f, g): + return False + + # If the point doesn't have modified inputs, use the algorithm ones + if not f_m: + f_m = f + if not g_m: + g_m = g + if not sr_m: + sr_m = sr + + # Calculate what the scale factor should be + calc_gain_sigan = easy_gain(sr_m, f_m, g_m) + + # Get the scale factor from the algorithm + interp_cal_data = self.sample_cal.get_calibration_dict( + {"sample_rate": sr, "frequency": f, "gain": g} + ) + interp_gain_siggan = interp_cal_data["gain"] + + # Save the point so we don't duplicate + self.pytest_points.append( + { + "sample_rate": int(sr), + "frequency": f, + "setting_value": g, + "gain": calc_gain_sigan, + "test": reason, + } + ) + + # Check if the point was calculated correctly + tolerance = 1e-5 + msg = "Scale factor not correctly calculated!\r\n" + msg = f"{msg} Expected value: {calc_gain_sigan}\r\n" + msg = f"{msg} Calculated value: {interp_gain_siggan}\r\n" + msg = f"{msg} Tolerance: {tolerance}\r\n" + msg = f"{msg} Test: {reason}\r\n" + msg = f"{msg} Sample Rate: {sr / 1e6}({sr_m / 1e6})\r\n" + msg = f"{msg} Frequency: {f / 1e6}({f_m / 1e6})\r\n" + msg = f"{msg} Gain: {g}({g_m})\r\n" + msg = ( + "{} Formula: -1 * (Gain - Frequency[GHz] - Sample Rate[MHz])\r\n".format( + msg + ) + ) + if not isclose(calc_gain_sigan, interp_gain_siggan, abs_tol=tolerance): + interp_cal_data = self.sample_cal.get_calibration_dict( + {"sample_rate": sr, "frequency": f, "gain": g} + ) + + assert isclose(calc_gain_sigan, interp_gain_siggan, abs_tol=tolerance), msg + return True + + @pytest.fixture(autouse=True) + def setup_calibration_file(self, tmp_path: Path): + """Create the dummy calibration file in the pytest temp directory""" + + # Only setup once + if self.setup_complete: + return + + # Create and save the temp directory and file + self.calibration_file = tmp_path / "dummy_cal_file.json" + + # Setup variables + self.dummy_noise_figure = 10 + self.dummy_compression = -20 + self.test_repeat_times = 3 + + # Sweep variables + self.sample_rates = [10e6, 15.36e6, 40e6] + self.gain_min = 40 + self.gain_max = 60 + self.gain_step = 10 + gains = list(range(self.gain_min, self.gain_max, self.gain_step)) + [ + self.gain_max + ] + self.frequency_min = 1000000000 + self.frequency_max = 3400000000 + self.frequency_step = 200000000 + frequencies = list( + range(self.frequency_min, self.frequency_max, self.frequency_step) + ) + [self.frequency_max] + frequencies = sorted(frequencies) + + # Start with blank cal data dicts + cal_data = {} + + # Add the simple stuff to new cal format + cal_data["last_calibration_datetime"] = get_datetime_str_now() + cal_data["sensor_uid"] = "SAMPLE_CALIBRATION" + cal_data["calibration_reference"] = "TESTING" + + # Add SR/CF lookup table + cal_data["clock_rate_lookup_by_sample_rate"] = [] + for sr in self.sample_rates: + cr = sr + while cr <= 40e6: + cr *= 2 + cr /= 2 + cal_data["clock_rate_lookup_by_sample_rate"].append( + {"sample_rate": int(sr), "clock_frequency": int(cr)} + ) + + # Create the JSON architecture for the calibration data + cal_data["calibration_data"] = {} + cal_data["calibration_parameters"] = ["sample_rate", "frequency", "gain"] + for k in range(len(self.sample_rates)): + cal_data_f = {} + for i in range(len(frequencies)): + cal_data_g = {} + for j in range(len(gains)): + # Create the scale factor that ensures easy interpolation + gain_sigan = easy_gain( + self.sample_rates[k], frequencies[i], gains[j] + ) + + # Create the data point + cal_data_point = { + "gain": gain_sigan, + "noise_figure": self.dummy_noise_figure, + "1dB_compression_point": self.dummy_compression, + } + + # Add the generated dicts to the parent lists + cal_data_g[gains[j]] = deepcopy(cal_data_point) + cal_data_f[frequencies[i]] = deepcopy(cal_data_g) + + cal_data["calibration_data"][self.sample_rates[k]] = deepcopy(cal_data_f) + + # Write the new json file + with open(self.calibration_file, "w+") as file: + json.dump(cal_data, file, indent=4) + + # Load the data back in + self.sample_cal = SensorCalibration.from_json(self.calibration_file) + + # Create a list of previous points to ensure that we don't repeat + self.pytest_points = [] + + # Create sweep lists for test points + self.srs = self.sample_rates + self.gi_s = list(range(self.gain_min, self.gain_max, self.gain_step)) + self.fi_s = list( + range(self.frequency_min, self.frequency_max, self.frequency_step) + ) + self.g_s = self.gi_s + [self.gain_max] + self.f_s = self.fi_s + [self.frequency_max] + + # Don't repeat test setup + self.setup_complete = True + + def test_calibration_data_key_name_conversion(self): + """On post-init, all calibration_data key names should be converted to strings.""" + recursive_check_keys(self.sample_cal.calibration_data) + + def test_sensor_calibration_dataclass_fields(self): + """Check that the dataclass fields are as expected.""" + fields = { + f.name: f.type + for f in dataclasses.fields(SensorCalibration) + if f not in dataclasses.fields(Calibration) + } + # Note: does not check field order + assert fields == { + "last_calibration_datetime": str, + "clock_rate_lookup_by_sample_rate": List[Dict[str, float]], + "sensor_uid": str, + } + + def test_field_validator(self): + """Check that the input field type validator works.""" + # only check fields not inherited from Calibration base class + with pytest.raises(TypeError): + _ = SensorCalibration([], {}, False, Path(""), "dt_str", [], 10) + with pytest.raises(TypeError): + _ = SensorCalibration([], {}, False, Path(""), "dt_str", {}, "uid") + with pytest.raises(TypeError): + _ = SensorCalibration( + [], {}, False, Path(""), datetime.datetime.now(), [], "uid" + ) + + def test_get_clock_rate(self): + """Test the get_clock_rate method""" + # Test getting a clock rate by sample rate + assert self.sample_cal.get_clock_rate(10e6) == 40e6 + # If there isn't an entry, the sample rate should be returned + assert self.sample_cal.get_clock_rate(-999) == -999 + + def test_get_calibration_dict_exact_match_lookup(self): + calibration_datetime = get_datetime_str_now() + calibration_params = ["sample_rate", "frequency"] + calibration_data = { + 100.0: {200.0: {"NF": "NF at 100, 200", "Gain": "Gain at 100, 200"}}, + 200.0: {100.0: {"NF": "NF at 200, 100", "Gain": "Gain at 200, 100"}}, + } + cal = SensorCalibration( + calibration_parameters=calibration_params, + calibration_data=calibration_data, + calibration_reference="testing", + file_path=Path(""), + last_calibration_datetime=calibration_datetime, + clock_rate_lookup_by_sample_rate=[], + sensor_uid="TESTING", + ) + cal_data = cal.get_calibration_dict({"sample_rate": 100.0, "frequency": 200.0}) + assert cal_data["NF"] == "NF at 100, 200" + + def test_get_calibration_dict_within_range(self): + calibration_datetime = get_datetime_str_now() + calibration_params = calibration_params = ["sample_rate", "frequency"] + calibration_data = { + 100.0: {200: {"NF": "NF at 100, 200"}, 300.0: "Cal data at 100,300"}, + 200.0: {100.0: {"NF": "NF at 200, 100"}}, + } + cal = SensorCalibration( + calibration_parameters=calibration_params, + calibration_data=calibration_data, + calibration_reference="testing", + file_path=Path("test_calibration.json"), + last_calibration_datetime=calibration_datetime, + clock_rate_lookup_by_sample_rate=[], + sensor_uid="TESTING", + ) + with pytest.raises(CalibrationException) as e_info: + _ = cal.get_calibration_dict({"sample_rate": 100.0, "frequency": 250.0}) + assert e_info.value.args[0] == ( + f"Could not locate calibration data at 250.0" + + f"\nAttempted lookup using key '250.0' and 250.0" + + f"\nUsing calibration data: {cal.calibration_data['100.0']}" + ) + + def test_sf_bound_points(self): + """Test SF determination at boundary points""" + self.run_pytest_point( + self.srs[0], self.frequency_min, self.gain_min, "Testing boundary points" + ) + self.run_pytest_point( + self.srs[0], self.frequency_max, self.gain_max, "Testing boundary points" + ) + + def test_sf_no_interpolation_points(self): + """Test points without interpolation""" + for i in range(4 * self.test_repeat_times): + while True: + g = self.g_s[self.rand_index(self.g_s)] + f = self.f_s[self.rand_index(self.f_s)] + if self.run_pytest_point( + self.srs[0], f, g, "Testing no interpolation points" + ): + break + + def test_update(self): + calibration_datetime = get_datetime_str_now() + calibration_params = ["sample_rate", "frequency"] + calibration_data = {100.0: {200.0: {"noise_figure": 0, "gain": 0}}} + test_cal_path = Path("test_calibration.json") + cal = SensorCalibration( + calibration_parameters=calibration_params, + calibration_data=calibration_data, + calibration_reference="testing", + file_path=test_cal_path, + last_calibration_datetime=calibration_datetime, + clock_rate_lookup_by_sample_rate=[], + sensor_uid="TESTING", + ) + action_params = {"sample_rate": 100.0, "frequency": 200.0} + update_time = get_datetime_str_now() + cal.update(action_params, update_time, 30.0, 5.0, 21) + cal_from_file = SensorCalibration.from_json(test_cal_path) + test_cal_path.unlink() + file_utc_time = parse_datetime_iso_format_str(cal.last_calibration_datetime) + cal_time_utc = parse_datetime_iso_format_str(update_time) + assert file_utc_time.year == cal_time_utc.year + assert file_utc_time.month == cal_time_utc.month + assert file_utc_time.day == cal_time_utc.day + assert file_utc_time.hour == cal_time_utc.hour + assert file_utc_time.minute == cal_time_utc.minute + assert cal.calibration_data["100.0"]["200.0"]["gain"] == 30.0 + assert cal.calibration_data["100.0"]["200.0"]["noise_figure"] == 5.0 + assert cal_from_file.calibration_data["100.0"]["200.0"]["gain"] == 30.0 + assert cal_from_file.calibration_data["100.0"]["200.0"]["noise_figure"] == 5.0 + + def test_has_expired_cal_data_not_expired(self): + cal_date = "2024-03-14T15:48:38.039Z" + now_date = "2024-03-14T15:49:38.039Z" + cal_data = { + "3550": { + "20.0": { + "false": {"datetime": cal_date}, + }, + } + } + expired = has_expired_cal_data( + cal_data, parse_datetime_iso_format_str(now_date), 100 + ) + assert expired == False + + def test_has_expired_cal_data_expired(self): + cal_date = "2024-03-14T15:48:38.039Z" + now_date = "2024-03-14T15:49:38.039Z" + cal_data = { + "3550": { + "20.0": { + "false": {"datetime": cal_date}, + }, + } + } + expired = has_expired_cal_data( + cal_data, parse_datetime_iso_format_str(now_date), 30 + ) + assert expired == True + + def test_has_expired_cal_data_multipledates_expired(self): + cal_date_1 = "2024-03-14T15:48:38.039Z" + cal_date_2 = "2024-03-14T15:40:38.039Z" + now_date = "2024-03-14T15:49:38.039Z" + cal_data = { + "3550": { + "20.0": { + "false": {"datetime": cal_date_1}, + }, + "true": {"datetime": cal_date_2}, + } + } + expired = has_expired_cal_data( + cal_data, parse_datetime_iso_format_str(now_date), 100 + ) + assert expired == True + cal_data = { + "3550": { + "20.0": { + "false": {"datetime": cal_date_2}, + }, + "true": {"datetime": cal_date_1}, + } + } + expired = has_expired_cal_data( + cal_data, parse_datetime_iso_format_str(now_date), 100 + ) diff --git a/scos_actions/calibration/tests/test_utils.py b/scos_actions/calibration/tests/test_utils.py new file mode 100644 index 00000000..15ab29cf --- /dev/null +++ b/scos_actions/calibration/tests/test_utils.py @@ -0,0 +1,42 @@ +import pytest + +from scos_actions.calibration.utils import CalibrationException, filter_by_parameter + + +class TestCalibrationUtils: + def test_filter_by_parameter_out_of_range(self): + calibrations = {200.0: {"some_cal_data"}, 300.0: {"more cal data"}} + with pytest.raises(CalibrationException) as e_info: + _ = filter_by_parameter(calibrations, 400.0) + assert ( + e_info.value.args[0] + == f"Could not locate calibration data at 400.0" + + f"\nAttempted lookup using key '400.0' and 400.0" + + f"\nUsing calibration data: {calibrations}" + ) + + def test_filter_by_parameter_in_range_requires_match(self): + calibrations = { + 200.0: {"Gain": "Gain at 200.0"}, + 300.0: {"Gain": "Gain at 300.0"}, + } + with pytest.raises(CalibrationException) as e_info: + _ = filter_by_parameter(calibrations, 150.0) + assert e_info.value.args[0] == ( + f"Could not locate calibration data at 150.0" + + f"\nAttempted lookup using key '150.0' and 150.0" + + f"\nUsing calibration data: {calibrations}" + ) + + def test_filter_by_paramter_integer(self): + calibrations = {"200.0": {"some_cal_data"}, 300.0: {"more cal data"}} + filtered_data = filter_by_parameter(calibrations, 200) + assert filtered_data is calibrations["200.0"] + + def test_filter_by_parameter_type_error(self): + calibrations = [300.0, 400.0] + with pytest.raises(CalibrationException) as e_info: + _ = filter_by_parameter(calibrations, 300.0) + assert e_info.value.args[0] == ( + f"Provided calibration data is not a dict: {calibrations}" + ) diff --git a/scos_actions/calibration/tests/utils.py b/scos_actions/calibration/tests/utils.py new file mode 100644 index 00000000..a2ad6e51 --- /dev/null +++ b/scos_actions/calibration/tests/utils.py @@ -0,0 +1,10 @@ +"""Utility functions used for scos_sensor.calibration unit tests.""" + + +def recursive_check_keys(d: dict): + """Recursively checks a dict to check that all keys are strings""" + for k, v in d.items(): + if isinstance(v, dict): + recursive_check_keys(v) + else: + assert isinstance(k, str) diff --git a/scos_actions/calibration/utils.py b/scos_actions/calibration/utils.py new file mode 100644 index 00000000..fa977653 --- /dev/null +++ b/scos_actions/calibration/utils.py @@ -0,0 +1,76 @@ +from typing import Union + + +class CalibrationException(Exception): + """Basic exception handling for calibration functions.""" + + def __init__(self, msg): + super().__init__(msg) + + +class CalibrationEntryMissingException(CalibrationException): + """Raised when filter_by_parameter cannot locate calibration data.""" + + def __init__(self, msg): + super().__init__(msg) + + +class CalibrationParametersMissingException(CalibrationException): + """Raised when a dictionary does not contain all calibration parameters as keys.""" + + def __init__(self, provided_dict: dict, required_keys: list): + msg = ( + "Missing required parameters to lookup calibration data.\n" + + f"Required parameters are {required_keys}\n" + + f"Provided parameters are {list(provided_dict.keys())}" + ) + super().__init__(msg) + + +def filter_by_parameter(calibrations: dict, value: Union[float, int, bool]) -> dict: + """ + Select a certain element by the value of a top-level key in a dictionary. + + This method should be recursively called to select calibration + data matching a set of calibration parameters. The ordering of + nested dictionaries should match the ordering of the required + calibration parameters in the calibration file. + + If ``value`` is a float or bool, ``str(value).lower()`` is used + as the dictionary key. If ``value`` is an int, and the previous + approach does not work, ``str(float(value))`` is attempted. This + allows for value ``1`` to match a key ``"1.0"``. + + :param calibrations: Calibration data dictionary. + :param value: The parameter value for filtering. This value should + exist as a top-level key in ``calibrations``. + :raises CalibrationException: If ``value`` cannot be matched to a + top-level key in ``calibrations``, or if ``calibrations`` is not + a dict. + :return: The value of ``calibrations[value]``, which should be a dict. + """ + try: + filtered_data = calibrations.get(str(value).lower(), None) + if filtered_data is None and isinstance(value, int): + # Try equivalent float for ints, i.e., match "1.0" to 1 + filtered_data = calibrations.get(str(float(value)), None) + if filtered_data is None and isinstance(value, float) and value.is_integer(): + # Check for, e.g., key '25' if value is '25.0' + filtered_data = calibrations.get(str(int(value)), None) + if filtered_data is None: + raise KeyError + else: + return filtered_data + except AttributeError: + # calibrations does not have ".get()" + # Generally means that calibrations is None or not a dict + msg = f"Provided calibration data is not a dict: {calibrations}" + raise CalibrationException(msg) + except KeyError: + msg = ( + f"Could not locate calibration data at {value}" + + f"\nAttempted lookup using key '{str(value).lower()}'" + + f"{f' and {float(value)}' if isinstance(value, float) and value.is_integer() else ''}" + + f"\nUsing calibration data: {calibrations}" + ) + raise CalibrationEntryMissingException(msg) diff --git a/scos_actions/hardware/mocks/mock_sensor.py b/scos_actions/hardware/mocks/mock_sensor.py new file mode 100644 index 00000000..529919de --- /dev/null +++ b/scos_actions/hardware/mocks/mock_sensor.py @@ -0,0 +1,59 @@ +import logging + +from scos_actions.hardware.mocks.mock_gps import MockGPS +from scos_actions.hardware.mocks.mock_sigan import MockSignalAnalyzer +from scos_actions.hardware.sensor import Sensor +from scos_actions.utils import get_datetime_str_now + +_mock_sensor_cal_data = { + "datetime": get_datetime_str_now(), + "gain": 0, + "enbw": None, + "noise_figure": None, + "1db_compression_point": None, + "temperature": 26.85, +} + +_mock_differential_cal_data = {"loss": 10.0} + +_mock_capabilities = {"sensor": {}} + +_mock_location = {"x": -999, "y": -999, "z": -999, "description": "Testing"} + +logger = logging.getLogger(__name__) + + +class MockSensor(Sensor): + def __init__( + self, + signal_analyzer=MockSignalAnalyzer(), + gps=MockGPS(), + preselector=None, + switches={}, + location=_mock_location, + capabilities=_mock_capabilities, + sensor_cal=None, + differential_cal=None, + ): + if (sensor_cal is not None) or (differential_cal is not None): + logger.warning( + "Calibration object provided to mock sensor will not be used to query calibration data." + ) + super().__init__( + signal_analyzer=signal_analyzer, + gps=gps, + preselector=preselector, + switches=switches, + location=location, + capabilities=capabilities, + sensor_cal=sensor_cal, + differential_cal=differential_cal, + ) + + @property + def sensor_calibration_data(self): + return _mock_sensor_cal_data + + @property + def differential_calibration_data(self): + return _mock_differential_cal_data diff --git a/scos_actions/hardware/mocks/mock_sigan.py b/scos_actions/hardware/mocks/mock_sigan.py index 49687574..b4394e06 100644 --- a/scos_actions/hardware/mocks/mock_sigan.py +++ b/scos_actions/hardware/mocks/mock_sigan.py @@ -1,4 +1,5 @@ """Mock a signal analyzer for testing.""" + import logging from collections import namedtuple from typing import Optional @@ -6,7 +7,6 @@ import numpy as np from scos_actions import __version__ as SCOS_ACTIONS_VERSION -from scos_actions.calibration.calibration import Calibration from scos_actions.hardware.sigan_iface import SignalAnalyzerInterface from scos_actions.utils import get_datetime_str_now @@ -28,31 +28,11 @@ class MockSignalAnalyzer(SignalAnalyzerInterface): def __init__( self, - sensor_cal: Optional[Calibration] = None, - sigan_cal: Optional[Calibration] = None, switches: Optional[dict] = None, randomize_values: bool = False, ): - super().__init__(sensor_cal, sigan_cal, switches) - # Define the default calibration dicts - self.DEFAULT_SIGAN_CALIBRATION = { - "datetime": get_datetime_str_now(), - "gain": 0, # Defaults to gain setting - "enbw": None, # Defaults to sample rate - "noise_figure": 0, - "1db_compression_point": 100, - "temperature": 26.85, - } - - self.DEFAULT_SENSOR_CALIBRATION = { - "datetime": get_datetime_str_now(), - "gain": 0, # Defaults to sigan gain - "enbw": None, # Defaults to sigan enbw - "noise_figure": None, # Defaults to sigan noise figure - "1db_compression_point": None, # Defaults to sigan compression + preselector gain - "temperature": 26.85, - } - self.auto_dc_offset = False + super().__init__(switches) + self._model = "Mock Signal Analyzer" self._frequency = 700e6 self._sample_rate = 10e6 self.clock_rate = 40e6 @@ -60,8 +40,6 @@ def __init__( self._attenuation = 0 self._preamp_enable = False self._reference_level = -30 - self._overload = False - self._capture_time = None self._is_available = True self._plugin_version = SCOS_ACTIONS_VERSION self._firmware_version = "1.2.3" @@ -73,8 +51,6 @@ def __init__( self.times_failed_recv = 0 self.randomize_values = randomize_values - self.sensor_calibration_data = self.DEFAULT_SENSOR_CALIBRATION - self.sigan_calibration_data = self.DEFAULT_SIGAN_CALIBRATION @property def is_available(self): @@ -84,14 +60,6 @@ def is_available(self): def plugin_version(self): return self._plugin_version - @property - def firmware_version(self): - return self._firmware_version - - @property - def api_version(self): - return self._api_version - @property def sample_rate(self): return self._sample_rate @@ -147,81 +115,47 @@ def connect(self): pass def acquire_time_domain_samples( - self, num_samples, num_samples_skip=0, retries=5, cal_adjust=True - ): + self, num_samples: int, num_samples_skip: int = 0 + ) -> dict: logger.warning("Using mock signal analyzer!") - self.sigan_overload = False - self._capture_time = None - self._num_samples_skip = num_samples_skip + overload = False + capture_time = None # Try to acquire the samples - max_retries = retries data = [] - while True: - if self.times_failed_recv < self.times_to_fail_recv: - self.times_failed_recv += 1 - data = np.ones(0, dtype=np.complex64) - else: - self._capture_time = get_datetime_str_now() - if self.randomize_values: - i = np.random.normal(0.5, 0.5, num_samples) - q = np.random.normal(0.5, 0.5, num_samples) - rand_iq = np.empty(num_samples, dtype=np.complex64) - rand_iq.real = i - rand_iq.imag = q - data = rand_iq - else: - data = np.ones(num_samples, dtype=np.complex64) - - data_len = len(data) - if not len(data) == num_samples: - if retries > 0: - msg = "Signal analyzer error: requested {} samples, but got {}." - logger.warning(msg.format(num_samples + num_samples_skip, data_len)) - logger.warning(f"Retrying {retries} more times.") - retries = retries - 1 - else: - err = "Failed to acquire correct number of samples " - err += f"{max_retries} times in a row." - raise RuntimeError(err) + if self.times_failed_recv < self.times_to_fail_recv: + self.times_failed_recv += 1 + data = np.ones(0, dtype=np.complex64) + else: + capture_time = get_datetime_str_now() + if self.randomize_values: + i = np.random.normal(0.5, 0.5, num_samples) + q = np.random.normal(0.5, 0.5, num_samples) + rand_iq = np.empty(num_samples, dtype=np.complex64) + rand_iq.real = i + rand_iq.imag = q + data = rand_iq else: - logger.debug(f"Successfully acquired {num_samples} samples.") - return { - "data": data, - "overload": self._overload, - "frequency": self._frequency, - "gain": self._gain, - "attenuation": self._attenuation, - "preamp_enable": self._preamp_enable, - "reference_level": self._reference_level, - "sample_rate": self._sample_rate, - "capture_time": self._capture_time, - } + data = np.ones(num_samples, dtype=np.complex64) + + if (data_len := len(data)) != num_samples: + err = "Failed to acquire correct number of samples: " + err += f"got {data_len} instead of {num_samples}" + raise RuntimeError(err) + else: + logger.debug(f"Successfully acquired {num_samples} samples.") + return { + "data": data, + "overload": overload, + "frequency": self._frequency, + "gain": self._gain, + "attenuation": self._attenuation, + "preamp_enable": self._preamp_enable, + "reference_level": self._reference_level, + "sample_rate": self._sample_rate, + "capture_time": capture_time, + } def set_times_to_fail_recv(self, n): self.times_to_fail_recv = n self.times_failed_recv = 0 - - @property - def last_calibration_time(self): - return get_datetime_str_now() - - def update_calibration(self, params): - pass - - def recompute_sensor_calibration_data(self, cal_args: list) -> None: - if self.sensor_calibration is not None: - self.sensor_calibration_data.update( - self._sensor_calibration.get_calibration_dict(cal_args) - ) - else: - logger.warning("Sensor calibration does not exist.") - - def recompute_sigan_calibration_data(self, cal_args: list) -> None: - """Set the sigan calibration data based on the current tuning""" - if self.sigan_calibration is not None: - self.sigan_calibration_data.update( - self.sigan_calibration.get_calibration_dict(cal_args) - ) - else: - logger.warning("Sigan calibration does not exist.") diff --git a/scos_actions/hardware/sensor.py b/scos_actions/hardware/sensor.py index b9e14806..08dbcd9c 100644 --- a/scos_actions/hardware/sensor.py +++ b/scos_actions/hardware/sensor.py @@ -2,13 +2,19 @@ import hashlib import json import logging -from typing import Dict, Optional +from typing import Any, Dict, List, Optional from its_preselector.preselector import Preselector from its_preselector.web_relay import WebRelay -from .gps_iface import GPSInterface -from .sigan_iface import SignalAnalyzerInterface +from scos_actions.calibration.differential_calibration import DifferentialCalibration +from scos_actions.calibration.interfaces.calibration import Calibration +from scos_actions.calibration.sensor_calibration import SensorCalibration +from scos_actions.hardware.gps_iface import GPSInterface +from scos_actions.hardware.sigan_iface import SignalAnalyzerInterface +from scos_actions.utils import convert_string_to_millisecond_iso_format + +logger = logging.getLogger(__name__) class Sensor: @@ -33,6 +39,8 @@ def __init__( preselector: Optional[Preselector] = None, switches: Optional[Dict[str, WebRelay]] = {}, location: Optional[dict] = None, + sensor_cal: Optional[SensorCalibration] = None, + differential_cal: Optional[DifferentialCalibration] = None, ): self.signal_analyzer = signal_analyzer self.gps = gps @@ -40,15 +48,19 @@ def __init__( self.switches = switches self.location = location self.capabilities = capabilities + self._sensor_calibration_data = {} + self._sensor_calibration = sensor_cal + self._differential_calibration_data = {} + self._differential_calibration = differential_cal # There is no setter for start_time property self._start_time = datetime.datetime.utcnow() @property - def signal_analyzer(self) -> SignalAnalyzerInterface: + def signal_analyzer(self) -> Optional[SignalAnalyzerInterface]: return self._signal_analyzer @signal_analyzer.setter - def signal_analyzer(self, sigan: SignalAnalyzerInterface): + def signal_analyzer(self, sigan: Optional[SignalAnalyzerInterface]): self._signal_analyzer = sigan @property @@ -173,5 +185,182 @@ def has_configurable_preselector(self) -> bool: return False @property - def start_time(self): + def start_time(self) -> datetime.datetime: return self._start_time + + @property + def sensor_calibration(self) -> Optional[SensorCalibration]: + return self._sensor_calibration + + @sensor_calibration.setter + def sensor_calibration(self, cal: Optional[SensorCalibration]): + self._sensor_calibration = cal + + @property + def differential_calibration(self) -> Optional[DifferentialCalibration]: + return self._differential_calibration + + @differential_calibration.setter + def differential_calibration(self, cal: Optional[DifferentialCalibration]): + self._differential_calibration = cal + + @property + def last_calibration_time(self) -> str: + """A datetime string for the most recent sensor calibration.""" + return convert_string_to_millisecond_iso_format( + self.sensor_calibration.last_calibration_datetime + ) + + @property + def sensor_calibration_data(self) -> Dict[str, Any]: + """Sensor calibration data for the current sensor settings.""" + return self._sensor_calibration_data + + @property + def differential_calibration_data(self) -> Dict[str, float]: + """Differential calibration data for the current sensor settings.""" + return self._differential_calibration_data + + def recompute_calibration_data(self, params: dict) -> None: + """ + Set the differential_calibration_data and sensor_calibration_data + based on the specified ``params``. + """ + recomputed = False + if self.differential_calibration is not None: + self._differential_calibration_data.update( + self.differential_calibration.get_calibration_dict(params) + ) + recomputed = True + else: + logger.debug("No differential calibration available to recompute") + + if self.sensor_calibration is not None: + self._sensor_calibration_data.update( + self.sensor_calibration.get_calibration_dict(params) + ) + recomputed = True + else: + logger.debug("No sensor calibration available to recompute") + + if not recomputed: + logger.warning("Failed to recompute calibration data") + + def acquire_time_domain_samples( + self, + num_samples: int, + num_samples_skip: int = 0, + retries: int = 5, + cal_adjust: bool = True, + cal_params: Optional[dict] = None, + ) -> dict: + """ + Acquire time-domain IQ samples from the signal analyzer. + + Signal analyzer settings, preselector state, etc. should already be + set before calling this function. + + Gain adjustment can be applied to acquired samples using ``cal_adjust``. + If ``True``, the samples acquired from the signal analyzer will be + scaled based on the calibrated ``gain`` value in the ``SensorCalibration``, + if one exists for this sensor, and "calibration terminal" will be the value + of the "reference" key in the returned dict. + + :param num_samples: Number of samples to acquire + :param num_samples_skip: Number of samples to skip + :param retries: Maximum number of retries on failure + :param cal_adjust: If True, use available calibration data to scale the samples. + :param cal_params: A dictionary with keys for all of the calibration parameters. + May contain additional keys. Example: ``{"sample_rate": 14000000.0, "gain": 10.0}`` + Must be specified if ``cal_adjust`` is ``True``. Otherwise, ignored. + :return: dictionary containing data, sample_rate, frequency, capture_time, etc + :raises Exception: If the sample acquisition fails, or the sensor has + no signal analyzer. + """ + logger.debug("***********************************\n") + logger.debug("Sensor.acquire_time_domain_samples starting") + logger.debug(f"Number of retries = {retries}") + if self.differential_calibration is not None: + logger.debug( + f"USING DIFF. CAL: {self.differential_calibration.calibration_data}" + ) + if self.sensor_calibration is not None: + logger.debug( + f"USING SENSOR CAL: {self.sensor_calibration.calibration_data}" + ) + logger.debug("*************************************\n") + + max_retries = retries + # Acquire samples from signal analyzer + if self.signal_analyzer is not None: + while True: + try: + measurement_result = ( + self.signal_analyzer.acquire_time_domain_samples( + num_samples, num_samples_skip + ) + ) + break + except Exception as e: + retries -= 1 + logger.info("Error while acquiring samples from signal analyzer.") + if retries == 0: + logger.exception( + "Failed to acquire samples from signal analyzer. " + + f"Tried {max_retries} times." + ) + raise e + else: + msg = "Failed to acquire samples: sensor has no signal analyzer" + logger.error(msg) + raise Exception(msg) + + # Apply gain adjustment based on calibration + if cal_adjust: + if cal_params is None: + raise ValueError( + "Data scaling cannot occur without specified calibration parameters." + ) + if self.sensor_calibration is not None: + logger.debug("Scaling samples using calibration data") + self.recompute_calibration_data(cal_params) + calibrated_gain__db = self.sensor_calibration_data["gain"] + calibrated_nf__db = self.sensor_calibration_data["noise_figure"] + logger.debug(f"Using sensor gain: {calibrated_gain__db} dB") + measurement_result["reference"] = ( + self.sensor_calibration.calibration_reference + ) + if self.differential_calibration is not None: + # Also apply differential calibration correction + differential_loss = self.differential_calibration_data["loss"] + logger.debug(f"Using differential loss: {differential_loss} dB") + calibrated_gain__db -= differential_loss + calibrated_nf__db += differential_loss + measurement_result["reference"] = ( + self.differential_calibration.calibration_reference + ) + else: + # No differential calibration exists + logger.debug("No differential calibration was applied") + + linear_gain = 10.0 ** (calibrated_gain__db / 20.0) + logger.debug(f"Applying total gain of {calibrated_gain__db}") + measurement_result["data"] /= linear_gain + + # Metadata: record the gain and noise figure based on the actual + # scaling which was used. + measurement_result["applied_calibration"] = { + "gain": calibrated_gain__db, + "noise_figure": calibrated_nf__db, + } + else: + # No sensor calibration exists + msg = "Unable to scale samples without sensor calibration data" + logger.error(msg) + raise Exception(msg) + else: + # Set the data reference in the measurement_result + measurement_result["reference"] = "signal analyzer input" + measurement_result["applied_calibration"] = None + + return measurement_result diff --git a/scos_actions/hardware/sigan_iface.py b/scos_actions/hardware/sigan_iface.py index 3b211113..8b6d207c 100644 --- a/scos_actions/hardware/sigan_iface.py +++ b/scos_actions/hardware/sigan_iface.py @@ -5,9 +5,7 @@ from its_preselector.web_relay import WebRelay -from scos_actions.calibration.calibration import Calibration from scos_actions.hardware.utils import power_cycle_sigan -from scos_actions.utils import convert_string_to_millisecond_iso_format logger = logging.getLogger(__name__) @@ -26,24 +24,13 @@ class SignalAnalyzerInterface(ABC): def __init__( self, - sensor_cal: Optional[Calibration] = None, - sigan_cal: Optional[Calibration] = None, switches: Optional[Dict[str, WebRelay]] = None, ): - self.sensor_calibration_data = {} - self.sigan_calibration_data = {} - self._sensor_calibration = sensor_cal - self._sigan_calibration = sigan_cal self._model = "Unknown" + self._api_version = "Unknown" + self._firmware_version = "Unknown" self.switches = switches - @property - def last_calibration_time(self) -> str: - """Returns the last calibration time from calibration data.""" - return convert_string_to_millisecond_iso_format( - self.sensor_calibration.last_calibration_datetime - ) - @property @abstractmethod def is_available(self) -> bool: @@ -59,28 +46,25 @@ def plugin_version(self) -> str: @property def firmware_version(self) -> str: """Returns the version of the signal analyzer firmware.""" - return "Unknown" + return self._firmware_version @property def api_version(self) -> str: """Returns the version of the underlying signal analyzer API.""" - return "Unknown" + return self._api_version @abstractmethod def acquire_time_domain_samples( self, num_samples: int, num_samples_skip: int = 0, - retries: int = 5, - cal_adjust: bool = True, ) -> dict: """ - Acquire time domain IQ samples + Acquire time domain IQ samples, scaled to Volts at + the signal analyzer input. :param num_samples: Number of samples to acquire :param num_samples_skip: Number of samples to skip - :param retries: Maximum number of retries on failure - :param cal_adjust: If True, scale IQ samples based on calibration data. :return: dictionary containing data, sample_rate, frequency, capture_time, etc """ pass @@ -98,9 +82,7 @@ def healthy(self, num_samples: int = 56000) -> bool: if not self.is_available: return False try: - measurement_result = self.acquire_time_domain_samples( - num_samples, cal_adjust=False - ) + measurement_result = self.acquire_time_domain_samples(num_samples) data = measurement_result["data"] except Exception as e: logger.exception("Unable to acquire samples from device.") @@ -136,25 +118,6 @@ def power_cycle_and_connect(self, sleep_time: float = 2.0) -> None: ) return - def recompute_sensor_calibration_data(self, cal_args: list) -> None: - self.sensor_calibration_data = {} - if self.sensor_calibration is not None: - self.sensor_calibration_data.update( - self.sensor_calibration.get_calibration_dict(cal_args) - ) - else: - logger.warning("Sensor calibration does not exist.") - - def recompute_sigan_calibration_data(self, cal_args: list) -> None: - self.sigan_calibration_data = {} - """Set the sigan calibration data based on the current tuning""" - if self.sigan_calibration is not None: - self.sigan_calibration_data.update( - self.sigan_calibration.get_calibration_dict(cal_args) - ) - else: - logger.warning("Sigan calibration does not exist.") - def get_status(self) -> dict: return {"model": self._model, "healthy": self.healthy()} @@ -165,19 +128,3 @@ def model(self) -> str: @model.setter def model(self, value: str): self._model = value - - @property - def sensor_calibration(self) -> Calibration: - return self._sensor_calibration - - @sensor_calibration.setter - def sensor_calibration(self, cal: Calibration): - self._sensor_calibration = cal - - @property - def sigan_calibration(self) -> Calibration: - return self._sigan_calibration - - @sigan_calibration.setter - def sigan_calibration(self, cal: Calibration): - self._sigan_calibration = cal diff --git a/scos_actions/hardware/tests/test_sensor.py b/scos_actions/hardware/tests/test_sensor.py index 216d626a..1c3eac95 100644 --- a/scos_actions/hardware/tests/test_sensor.py +++ b/scos_actions/hardware/tests/test_sensor.py @@ -1,74 +1,122 @@ +import datetime + +import pytest from its_preselector.controlbyweb_web_relay import ControlByWebWebRelay from its_preselector.web_relay_preselector import WebRelayPreselector from scos_actions.hardware.mocks.mock_gps import MockGPS +from scos_actions.hardware.mocks.mock_sensor import ( + MockSensor, + _mock_capabilities, + _mock_differential_cal_data, + _mock_location, + _mock_sensor_cal_data, +) from scos_actions.hardware.mocks.mock_sigan import MockSignalAnalyzer -from scos_actions.hardware.sensor import Sensor -def test_sensor(): - sensor = Sensor( - signal_analyzer=MockSignalAnalyzer(), capabilities={}, gps=MockGPS() +@pytest.fixture +def mock_sensor(): + sensor = MockSensor() + return sensor + + +def test_mock_sensor_defaults(mock_sensor): + assert isinstance(mock_sensor.signal_analyzer, MockSignalAnalyzer) + assert isinstance(mock_sensor.gps, MockGPS) + assert mock_sensor.preselector is None + assert mock_sensor.switches == {} + assert mock_sensor.location == _mock_location + assert mock_sensor.capabilities == _mock_capabilities + assert mock_sensor.sensor_calibration is None + assert mock_sensor.differential_calibration is None + assert mock_sensor.has_configurable_preselector is False + assert mock_sensor.has_configurable_preselector is False + assert mock_sensor.sensor_calibration_data == _mock_sensor_cal_data + assert mock_sensor.differential_calibration_data == _mock_differential_cal_data + assert isinstance(mock_sensor.start_time, datetime.datetime) + + +def test_set_get_sigan(mock_sensor): + mock_sigan = MockSignalAnalyzer() + mock_sensor.signal_analyzer = mock_sigan + assert mock_sensor.signal_analyzer == mock_sigan + + +def test_set_get_gps(mock_sensor): + mock_gps = MockGPS() + mock_sensor.gps = mock_gps + assert mock_sensor.gps == mock_gps + + +def test_set_get_preselector(mock_sensor): + mock_preselector = WebRelayPreselector( + {}, {"name": "mock_preselector", "base_url": "url"} ) - assert sensor is not None - assert sensor.signal_analyzer is not None - assert sensor.gps is not None + mock_sensor.preselector = mock_preselector + assert mock_sensor.preselector == mock_preselector + + +def test_set_get_switches(mock_sensor): + mock_switches = { + "mock": ControlByWebWebRelay({"name": "mock_switch", "base_url": "url"}) + } + mock_sensor.switches = mock_switches + assert mock_sensor.switches == mock_switches + + +def test_set_get_location(mock_sensor): + mock_location = {"x": 0, "y": 0, "z": 0, "description": "Test"} + mock_sensor.location = mock_location + assert mock_sensor.location == mock_location -def test_set_get_preselector(): - preselector = WebRelayPreselector({}, {"name": "preselector", "base_url": "url"}) - sensor = Sensor(signal_analyzer=MockSignalAnalyzer(), capabilities={}) - sensor.preselector = preselector - assert sensor.preselector == preselector +def test_set_get_capabilities(mock_sensor): + mock_capabilities = {"fake": "capabilities"} + mock_sensor.capabilities = mock_capabilities + assert mock_sensor.capabilities == mock_capabilities -def test_set_get_gps(): - gps = MockGPS() - sensor = Sensor(signal_analyzer=MockSignalAnalyzer(), capabilities={}) - sensor.gps = gps - assert sensor.gps == gps +def test_set_get_sensor_calibration(mock_sensor): + assert mock_sensor.sensor_calibration is None -def test_set_get_switches(): - switches = {"spu": ControlByWebWebRelay({"name": "spu", "base_url": "url"})} - sensor = Sensor(signal_analyzer=MockSignalAnalyzer(), capabilities={}) - sensor.switches = switches - assert sensor.switches == switches +def test_set_get_differential_calibration(mock_sensor): + assert mock_sensor.differential_calibration is None -def test_has_configurable_preselector_in_capabilities(): +def test_has_configurable_preselector_in_capabilities(mock_sensor): capabilities = { "sensor": { "preselector": {"rf_paths": [{"name": "antenna"}, {"name": "noise_diode"}]} } } - sensor = Sensor(signal_analyzer=MockSignalAnalyzer(), capabilities=capabilities) - assert sensor.has_configurable_preselector == True + mock_sensor.capabilities = capabilities + assert mock_sensor.has_configurable_preselector == True -def test_has_configurable_preselector_in_preselector(): - sensor = Sensor(signal_analyzer=MockSignalAnalyzer(), capabilities={}) - sensor.preselector = WebRelayPreselector( +def test_has_configurable_preselector_in_preselector(mock_sensor): + mock_sensor.preselector = WebRelayPreselector( {}, {"name": "preselector", "base_url": "url"} ) - sensor.preselector.rf_paths = [{"name": "antenna"}, {"name": "noise_diode"}] - assert sensor.has_configurable_preselector == True + mock_sensor.preselector.rf_paths = [{"name": "antenna"}, {"name": "noise_diode"}] + assert mock_sensor.has_configurable_preselector == True -def test_has_configurable_preselector_not_configurable(): +def test_has_configurable_preselector_not_configurable(mock_sensor): capabilities = {"sensor": {"preselector": {"rf_paths": [{"name": "antenna"}]}}} - sensor = Sensor(signal_analyzer=MockSignalAnalyzer(), capabilities=capabilities) - assert sensor.has_configurable_preselector == False + mock_sensor.capabilities = capabilities + assert mock_sensor.has_configurable_preselector == False -def test_hash_set_when_not_present(): +def test_hash_set_when_not_present(mock_sensor): capabilities = {"sensor": {"preselector": {"rf_paths": [{"name": "antenna"}]}}} - sensor = Sensor(signal_analyzer=MockSignalAnalyzer(), capabilities=capabilities) - assert "sensor_sha512" in sensor.capabilities["sensor"] - assert sensor.capabilities["sensor"]["sensor_sha512"] is not None + mock_sensor.capabilities = capabilities + assert "sensor_sha512" in mock_sensor.capabilities["sensor"] + assert mock_sensor.capabilities["sensor"]["sensor_sha512"] is not None -def test_hash_not_overwritten(): +def test_hash_not_overwritten(mock_sensor): capabilities = {"sensor": {"sensor_sha512": "some hash"}} - sensor = Sensor(signal_analyzer=MockSignalAnalyzer(), capabilities=capabilities) - assert sensor.capabilities["sensor"]["sensor_sha512"] == "some hash" + mock_sensor.capabilities = capabilities + assert mock_sensor.capabilities["sensor"]["sensor_sha512"] == "some hash" diff --git a/scos_actions/hardware/tests/test_sigan.py b/scos_actions/hardware/tests/test_sigan.py index c82ffeec..4fce6562 100644 --- a/scos_actions/hardware/tests/test_sigan.py +++ b/scos_actions/hardware/tests/test_sigan.py @@ -1,8 +1,19 @@ +import pytest + from scos_actions.hardware.mocks.mock_sigan import MockSignalAnalyzer -def test_sigan_default_cal(): +def test_mock_sigan(): sigan = MockSignalAnalyzer() - sigan.recompute_sensor_calibration_data([]) - sensor_cal = sigan.sensor_calibration_data - assert sensor_cal["gain"] == 0 + # Test default values are available as properties + assert sigan.model == sigan._model + assert sigan.frequency == sigan._frequency + assert sigan.sample_rate == sigan._sample_rate + assert sigan.gain == sigan._gain + assert sigan.attenuation == sigan._attenuation + assert sigan.preamp_enable == sigan._preamp_enable + assert sigan.reference_level == sigan._reference_level + assert sigan.is_available == sigan._is_available + assert sigan.plugin_version == sigan._plugin_version + assert sigan.firmware_version == sigan._firmware_version + assert sigan.api_version == sigan._api_version diff --git a/scos_actions/metadata/structs/capture.py b/scos_actions/metadata/structs/capture.py index 16f12280..abcff4fb 100644 --- a/scos_actions/metadata/structs/capture.py +++ b/scos_actions/metadata/structs/capture.py @@ -14,6 +14,8 @@ "duration": "ntia-sensor:duration", "overload": "ntia-sensor:overload", "sensor_calibration": "ntia-sensor:sensor_calibration", + # sigan_calibration is unused by SCOS Sensor but still defined + # in the ntia-sensor extension as of v2.0.0 "sigan_calibration": "ntia-sensor:sigan_calibration", "sigan_settings": "ntia-sensor:sigan_settings", } diff --git a/scos_actions/settings.py b/scos_actions/settings.py index fa63aaa1..7d45d179 100644 --- a/scos_actions/settings.py +++ b/scos_actions/settings.py @@ -1,5 +1,4 @@ import logging -from os import path from pathlib import Path from environs import Env diff --git a/scos_actions/signal_processing/calibration.py b/scos_actions/signal_processing/calibration.py index 785685d3..e1a4c847 100644 --- a/scos_actions/signal_processing/calibration.py +++ b/scos_actions/signal_processing/calibration.py @@ -3,29 +3,23 @@ import numpy as np from its_preselector.preselector import Preselector +from numpy.typing import NDArray from scipy.constants import Boltzmann +from scos_actions.calibration.utils import CalibrationException from scos_actions.signal_processing.unit_conversion import ( convert_celsius_to_fahrenheit, convert_celsius_to_kelvins, convert_dB_to_linear, convert_linear_to_dB, - convert_watts_to_dBm, ) logger = logging.getLogger(__name__) -class CalibrationException(Exception): - """Basic exception handling for calibration functions.""" - - def __init__(self, msg): - super().__init__(msg) - - def y_factor( - pwr_noise_on_watts: np.ndarray, - pwr_noise_off_watts: np.ndarray, + pwr_noise_on_watts: NDArray, + pwr_noise_off_watts: NDArray, enr_linear: float, enbw_hz: float, temp_kelvins: float = 300.0, @@ -49,16 +43,16 @@ def y_factor( :return: A tuple (noise_figure, gain) containing the calculated noise figure and gain, both in dB, from the Y-factor method. """ - mean_on_dBm = convert_watts_to_dBm(np.mean(pwr_noise_on_watts)) - mean_off_dBm = convert_watts_to_dBm(np.mean(pwr_noise_off_watts)) + mean_on_dBW = convert_linear_to_dB(np.mean(pwr_noise_on_watts)) + mean_off_dBW = convert_linear_to_dB(np.mean(pwr_noise_off_watts)) if logger.isEnabledFor(logging.DEBUG): logger.debug(f"ENR: {convert_linear_to_dB(enr_linear)} dB") logger.debug(f"ENBW: {enbw_hz} Hz") - logger.debug(f"Mean power on: {mean_on_dBm:.2f} dBm") - logger.debug(f"Mean power off: {mean_off_dBm:.2f} dBm") - y = convert_dB_to_linear(mean_on_dBm - mean_off_dBm) + logger.debug(f"Mean power on: {mean_on_dBW+30:.2f} dBm") + logger.debug(f"Mean power off: {mean_off_dBW+30:.2f} dBm") + y = convert_dB_to_linear(mean_on_dBW - mean_off_dBW) noise_factor = enr_linear / (y - 1.0) - gain_dB = mean_on_dBm - convert_watts_to_dBm( + gain_dB = mean_on_dBW - convert_linear_to_dB( Boltzmann * temp_kelvins * enbw_hz * (enr_linear + noise_factor) ) noise_figure_dB = convert_linear_to_dB(noise_factor) diff --git a/scos_actions/signal_processing/tests/test_calibration.py b/scos_actions/signal_processing/tests/test_calibration.py index c47a2a37..6b29b472 100644 --- a/scos_actions/signal_processing/tests/test_calibration.py +++ b/scos_actions/signal_processing/tests/test_calibration.py @@ -1,6 +1,7 @@ """ Unit test for scos_actions.signal_processing.calibration """ + import numpy as np from scipy.constants import Boltzmann diff --git a/scos_actions/signal_processing/tests/test_fft.py b/scos_actions/signal_processing/tests/test_fft.py index a848340f..1ea7458c 100644 --- a/scos_actions/signal_processing/tests/test_fft.py +++ b/scos_actions/signal_processing/tests/test_fft.py @@ -1,6 +1,7 @@ """ Unit test for scos_actions.signal_processing.fft """ + import numpy as np import pytest from scipy.signal import get_window diff --git a/scos_actions/signal_processing/tests/test_filtering.py b/scos_actions/signal_processing/tests/test_filtering.py index d33c37c6..0df7b845 100644 --- a/scos_actions/signal_processing/tests/test_filtering.py +++ b/scos_actions/signal_processing/tests/test_filtering.py @@ -5,6 +5,7 @@ tests mostly exist to ensure that tests will fail if substantial changes are made to the wrappers. """ + import numpy as np import pytest from scipy.signal import ellip, ellipord, firwin, kaiserord, sos2zpk, sosfreqz diff --git a/scos_actions/signal_processing/tests/test_power_analysis.py b/scos_actions/signal_processing/tests/test_power_analysis.py index 42e2db75..02aaf454 100644 --- a/scos_actions/signal_processing/tests/test_power_analysis.py +++ b/scos_actions/signal_processing/tests/test_power_analysis.py @@ -1,6 +1,7 @@ """ Unit test for scos_actions.signal_processing.power_analysis """ + from enum import EnumMeta import numpy as np diff --git a/scos_actions/signal_processing/tests/test_unit_conversion.py b/scos_actions/signal_processing/tests/test_unit_conversion.py index 4da61f86..fe404472 100644 --- a/scos_actions/signal_processing/tests/test_unit_conversion.py +++ b/scos_actions/signal_processing/tests/test_unit_conversion.py @@ -1,6 +1,7 @@ """ Unit test for scos_actions.signal_processing.unit_conversion """ + import numpy as np import pytest