diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 12ef9bfc..f6d8b0db 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -18,7 +18,7 @@ repos: - id: end-of-file-fixer - id: trailing-whitespace - repo: https://github.com/asottile/pyupgrade - rev: v3.15.1 + rev: v3.15.2 hooks: - id: pyupgrade args: ["--py38-plus"] @@ -30,7 +30,7 @@ repos: types: [file, python] args: ["--profile", "black", "--filter-files", "--gitignore"] - repo: https://github.com/psf/black - rev: 24.2.0 + rev: 24.3.0 hooks: - id: black types: [file, python] diff --git a/scos_actions/actions/acquire_sea_data_product.py b/scos_actions/actions/acquire_sea_data_product.py index 4716e082..728b6b09 100644 --- a/scos_actions/actions/acquire_sea_data_product.py +++ b/scos_actions/actions/acquire_sea_data_product.py @@ -74,7 +74,11 @@ create_statistical_detector, ) from scos_actions.signals import measurement_action_completed, trigger_api_restart -from scos_actions.utils import convert_datetime_to_millisecond_iso_format, get_days_up +from scos_actions.utils import ( + convert_datetime_to_millisecond_iso_format, + get_days_up, + get_disk_usage, +) env = Env() logger = logging.getLogger(__name__) @@ -110,7 +114,6 @@ FFT_WINDOW = get_fft_window(FFT_WINDOW_TYPE, FFT_SIZE) FFT_WINDOW_ECF = get_fft_window_correction(FFT_WINDOW, "energy") IMPEDANCE_OHMS = 50.0 -DATA_REFERENCE_POINT = "noise source output" NUM_ACTORS = 3 # Number of ray actors to initialize # Create power detectors @@ -451,6 +454,7 @@ class NasctnSeaDataProduct(Action): def __init__(self, parameters: dict): super().__init__(parameters) # Assume preselector is present + self.total_channel_data_length = None rf_path_name = utils.get_parameter(RF_PATH, self.parameters) self.rf_path = {self.PRESELECTOR_PATH_KEY: rf_path_name} @@ -508,6 +512,7 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): action_start_tic = perf_counter() # Ray should have already been initialized within scos-sensor, # but check and initialize just in case. + if not ray.is_initialized(): logger.info("Initializing ray.") logger.info("Set RAY_INIT=true to avoid initializing within " + __name__) @@ -527,8 +532,6 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): self.iteration_params, ) self.create_global_sensor_metadata(self.sensor) - self.create_global_data_product_metadata() - # Initialize remote supervisor actors for IQ processing tic = perf_counter() # This uses iteration_params[0] because @@ -540,10 +543,15 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): logger.debug(f"Spawned {NUM_ACTORS} supervisor actors in {toc-tic:.2f} s") # Collect all IQ data and spawn data product computation processes - dp_procs, cpu_speed = [], [] + dp_procs, cpu_speed, reference_points = [], [], [] capture_tic = perf_counter() + for i, parameters in enumerate(self.iteration_params): measurement_result = self.capture_iq(parameters) + if i == 0: + self.create_global_data_product_metadata( + measurement_result["reference"] + ) # Start data product processing but do not block next IQ capture tic = perf_counter() @@ -554,16 +562,22 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): toc = perf_counter() logger.debug(f"IQ data delivered for processing in {toc-tic:.2f} s") # Create capture segment with channel-specific metadata before sigan is reconfigured - tic = perf_counter() self.create_capture_segment(i, measurement_result) - toc = perf_counter() - logger.debug(f"Created capture metadata in {toc-tic:.2f} s") + # Query CPU speed for later averaging in diagnostics metadata cpu_speed.append(get_current_cpu_clock_speed()) + # Append list of data reference points; later we require these to be identical + reference_points.append(measurement_result["reference"]) capture_toc = perf_counter() logger.debug( f"Collected all IQ data and started all processing in {capture_toc-capture_tic:.2f} s" ) + # Create data product metadata: requires all data reference points + # to be identical. + assert ( + len(set(reference_points)) == 1 + ), "Channel data were scaled to different reference points. Cannot build metadata." + # Collect processed data product results all_data, max_max_ch_pwrs, med_mean_ch_pwrs, mean_ch_pwrs, median_ch_pwrs = ( [], @@ -639,15 +653,11 @@ def capture_iq(self, params: dict) -> dict: self.cal_adjust = utils.get_parameter(CAL_ADJUST, params) logger.debug(f"cal_adjust={self.cal_adjust}") # Collect IQ data - measurement_result = self.sensor.signal_analyzer.acquire_time_domain_samples( - num_samples, nskip, cal_adjust=self.cal_adjust + measurement_result = self.sensor.acquire_time_domain_samples( + num_samples, nskip, cal_params=params, cal_adjust=self.cal_adjust ) # Store some metadata with the IQ measurement_result.update(params) - if self.cal_adjust: - measurement_result["sensor_cal"] = ( - self.sensor.signal_analyzer.sensor_calibration_data - ) toc = perf_counter() logger.debug( f"IQ Capture ({duration_ms} ms @ {(params[FREQUENCY]/1e6):.1f} MHz) completed in {toc-tic:.2f} s." @@ -788,6 +798,11 @@ def capture_diagnostics( cpu_diag["ssd_smart_data"] = ntia_diagnostics.SsdSmartData(**smart_data) except: logger.warning("Failed to get SSD SMART data") + try: # Disk usage + disk_usage = get_disk_usage() + cpu_diag["disk_usage"] = disk_usage + except: + logger.warning("Failed to get disk usage") # Get software versions software_diag = { @@ -989,7 +1004,7 @@ def test_required_components(self): trigger_api_restart.send(sender=self.__class__) return None - def create_global_data_product_metadata(self) -> None: + def create_global_data_product_metadata(self, data_products_reference: str) -> None: p = self.parameters num_iq_samples = int(p[SAMPLE_RATE] * p[DURATION_MS] * 1e-3) iir_obj = ntia_algorithm.DigitalFilter( @@ -1034,7 +1049,7 @@ def create_global_data_product_metadata(self) -> None: x_step=[p[SAMPLE_RATE] / FFT_SIZE], y_units="dBm/Hz", processing=[dft_obj.id], - reference=DATA_REFERENCE_POINT, + reference=data_products_reference, description=( "Results of statistical detectors (max, mean, median, 25th_percentile, 75th_percentile, " + "90th_percentile, 95th_percentile, 99th_percentile, 99.9th_percentile, 99.99th_percentile) " @@ -1054,7 +1069,7 @@ def create_global_data_product_metadata(self) -> None: x_stop=[pvt_x_axis__s[-1]], x_step=[pvt_x_axis__s[1] - pvt_x_axis__s[0]], y_units="dBm", - reference=DATA_REFERENCE_POINT, + reference=data_products_reference, description=( "Max- and mean-detected channel power vs. time, with " + f"an integration time of {p[TD_BIN_SIZE_MS]} ms. " @@ -1081,7 +1096,7 @@ def create_global_data_product_metadata(self) -> None: x_stop=[pfp_x_axis__s[-1]], x_step=[pfp_x_axis__s[1] - pfp_x_axis__s[0]], y_units="dBm", - reference=DATA_REFERENCE_POINT, + reference=data_products_reference, description=( "Channelized periodic frame power statistics reported over" + f" a {p[PFP_FRAME_PERIOD_MS]} ms frame period, with frame resolution" @@ -1104,6 +1119,7 @@ def create_global_data_product_metadata(self) -> None: y_start=[apd_y_axis__dBm[0]], y_stop=[apd_y_axis__dBm[-1]], y_step=[apd_y_axis__dBm[1] - apd_y_axis__dBm[0]], + reference=data_products_reference, description=( f"Estimate of the APD, using a {p[APD_BIN_SIZE_DB]} dB " + "bin size for amplitude values. The data payload includes" @@ -1122,6 +1138,7 @@ def create_global_data_product_metadata(self) -> None: + pfp_length * len(PFP_M3_DETECTOR) * 2 + apd_graph.length ) + logger.debug(f"Total channel length:{self.total_channel_data_length}") def create_capture_segment( self, @@ -1144,12 +1161,20 @@ def create_capture_segment( ) if self.cal_adjust: capture_segment.sensor_calibration = ntia_sensor.Calibration( - datetime=measurement_result["sensor_cal"]["datetime"], - gain=round(measurement_result["sensor_cal"]["gain"], 3), - noise_figure=round(measurement_result["sensor_cal"]["noise_figure"], 3), - temperature=round(measurement_result["sensor_cal"]["temperature"], 1), - reference=DATA_REFERENCE_POINT, + datetime=self.sensor.sensor_calibration_data["datetime"], + gain=round(measurement_result["applied_calibration"]["gain"], 3), + noise_figure=round( + measurement_result["applied_calibration"]["noise_figure"], 3 + ), + temperature=round( + self.sensor.sensor_calibration_data["temperature"], 1 + ), + reference=measurement_result["reference"], ) + if "compression_point" in measurement_result["applied_calibration"]: + capture_segment.sensor_calibration.compression_point = measurement_result[ + "applied_calibration" + ]["compression_point"] self.sigmf_builder.add_capture(capture_segment) def get_sigmf_builder( diff --git a/scos_actions/actions/acquire_single_freq_fft.py b/scos_actions/actions/acquire_single_freq_fft.py index daccf9d6..ac78d802 100644 --- a/scos_actions/actions/acquire_single_freq_fft.py +++ b/scos_actions/actions/acquire_single_freq_fft.py @@ -91,7 +91,6 @@ from numpy import float32, ndarray from scos_actions.actions.interfaces.measurement_action import MeasurementAction -from scos_actions.hardware.mocks.mock_gps import MockGPS from scos_actions.metadata.structs import ntia_algorithm from scos_actions.signal_processing.fft import ( get_fft, @@ -153,10 +152,6 @@ def __init__(self, parameters: dict): self.classification = get_parameter(CLASSIFICATION, self.parameters) self.cal_adjust = get_parameter(CAL_ADJUST, self.parameters) assert isinstance(self.cal_adjust, bool) - if self.cal_adjust: - self.data_reference = "calibration terminal" - else: - self.data_reference = "signal analyzer input" # FFT setup self.fft_detector = create_statistical_detector( "M4sDetector", ["min", "max", "mean", "median", "sample"] @@ -169,7 +164,7 @@ def __init__(self, parameters: dict): def execute(self, schedule_entry: dict, task_id: int) -> dict: # Acquire IQ data and generate M4S result measurement_result = self.acquire_data( - self.num_samples, self.nskip, self.cal_adjust + self.num_samples, self.nskip, self.cal_adjust, cal_params=self.parameters ) # Actual sample rate may differ from configured value sample_rate_Hz = measurement_result["sample_rate"] @@ -184,13 +179,13 @@ def execute(self, schedule_entry: dict, task_id: int) -> dict: # Build capture metadata sigan_settings = self.get_sigan_settings(measurement_result) logger.debug(f"sigan settings:{sigan_settings}") + measurement_result["duration_ms"] = round( + (self.num_samples / sample_rate_Hz) * 1000 + ) measurement_result["capture_segment"] = self.create_capture_segment( sample_start=0, - start_time=measurement_result["capture_time"], - center_frequency_Hz=self.frequency_Hz, - duration_ms=round((self.num_samples / sample_rate_Hz) * 1000), - overload=measurement_result["overload"], sigan_settings=sigan_settings, + measurement_result=measurement_result, ) return measurement_result @@ -267,7 +262,7 @@ def create_metadata(self, measurement_result: dict, recording: int = None) -> No x_stop=[frequencies[-1]], x_step=[frequencies[1] - frequencies[0]], y_units="dBm", - reference=self.data_reference, + reference=measurement_result["reference"], description=( "Results of min, max, mean, and median statistical detectors, " + f"along with a random sampling, from a set of {self.nffts} " diff --git a/scos_actions/actions/acquire_single_freq_tdomain_iq.py b/scos_actions/actions/acquire_single_freq_tdomain_iq.py index 8f359531..faa3ece9 100644 --- a/scos_actions/actions/acquire_single_freq_tdomain_iq.py +++ b/scos_actions/actions/acquire_single_freq_tdomain_iq.py @@ -37,7 +37,6 @@ from scos_actions import utils from scos_actions.actions.interfaces.measurement_action import MeasurementAction -from scos_actions.hardware.mocks.mock_gps import MockGPS from scos_actions.utils import get_parameter logger = logging.getLogger(__name__) @@ -84,7 +83,9 @@ def execute(self, schedule_entry: dict, task_id: int) -> dict: # Use the sigan's actual reported instead of requested sample rate sample_rate = self.sensor.signal_analyzer.sample_rate num_samples = int(sample_rate * self.duration_ms * 1e-3) - measurement_result = self.acquire_data(num_samples, self.nskip, self.cal_adjust) + measurement_result = self.acquire_data( + num_samples, self.nskip, self.cal_adjust, cal_params=self.parameters + ) end_time = utils.get_datetime_str_now() measurement_result.update(self.parameters) measurement_result["end_time"] = end_time @@ -94,11 +95,8 @@ def execute(self, schedule_entry: dict, task_id: int) -> dict: logger.debug(f"sigan settings:{sigan_settings}") measurement_result["capture_segment"] = self.create_capture_segment( sample_start=0, - start_time=measurement_result["capture_time"], - center_frequency_Hz=self.frequency_Hz, - duration_ms=self.duration_ms, - overload=measurement_result["overload"], sigan_settings=sigan_settings, + measurement_result=measurement_result, ) return measurement_result diff --git a/scos_actions/actions/acquire_stepped_freq_tdomain_iq.py b/scos_actions/actions/acquire_stepped_freq_tdomain_iq.py index cfd53a11..6fd4c685 100644 --- a/scos_actions/actions/acquire_stepped_freq_tdomain_iq.py +++ b/scos_actions/actions/acquire_stepped_freq_tdomain_iq.py @@ -100,7 +100,9 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): cal_adjust = get_parameter(CAL_ADJUST, measurement_params) sample_rate = self.sensor.signal_analyzer.sample_rate num_samples = int(sample_rate * duration_ms * 1e-3) - measurement_result = super().acquire_data(num_samples, nskip, cal_adjust) + measurement_result = super().acquire_data( + num_samples, nskip, cal_adjust, cal_params=measurement_params + ) measurement_result.update(measurement_params) end_time = utils.get_datetime_str_now() measurement_result["end_time"] = end_time @@ -108,31 +110,9 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): measurement_result["name"] = self.name measurement_result["classification"] = self.classification sigan_settings = self.get_sigan_settings(measurement_result) - capture_segment = CaptureSegment( - sample_start=0, - global_index=saved_samples, - frequency=measurement_params[FREQUENCY], - datetime=measurement_result["capture_time"], - duration=duration_ms, - overload=measurement_result["overload"], - sigan_settings=sigan_settings, + capture_segment = self.create_capture_segment( + 0, sigan_settings, measurement_result ) - sigan_cal = self.sensor.signal_analyzer.sigan_calibration_data - sensor_cal = self.sensor.signal_analyzer.sensor_calibration_data - if sigan_cal is not None: - if "1db_compression_point" in sigan_cal: - sigan_cal["compression_point"] = sigan_cal.pop( - "1db_compression_point" - ) - capture_segment.sigan_calibration = ntia_sensor.Calibration(**sigan_cal) - if sensor_cal is not None: - if "1db_compression_point" in sensor_cal: - sensor_cal["compression_point"] = sensor_cal.pop( - "1db_compression_point" - ) - capture_segment.sensor_calibration = ntia_sensor.Calibration( - **sensor_cal - ) measurement_result["capture_segment"] = capture_segment self.create_metadata(measurement_result, recording_id) diff --git a/scos_actions/actions/calibrate_y_factor.py b/scos_actions/actions/calibrate_y_factor.py index 15cf9fd9..69fc5d5b 100644 --- a/scos_actions/actions/calibrate_y_factor.py +++ b/scos_actions/actions/calibrate_y_factor.py @@ -17,7 +17,13 @@ # - SCOS Markdown Editor: https://ntia.github.io/scos-md-editor/ # r"""Perform a Y-Factor Calibration. -Supports calibration of gain and noise figure for one or more channels. +Supports calculation of gain and noise figure for one or more channels using the +Y-Factor method. Results are written to the file specified by the environment +variable ``ONBOARD_CALIBRATION_FILE``. If the sensor already has a sensor calibration +object, it is used as the starting point, and copied to a new onboard calibration object +which is updated by this action. The sensor object's sensor calibration will be set to +the updated onboard calibration object after this action is run. + For each center frequency, sets the preselector to the noise diode path, turns noise diode on, performs a mean power measurement, turns the noise diode off and performs another mean power measurement. The mean power on and mean power off @@ -71,15 +77,18 @@ import logging import os import time +from pathlib import Path import numpy as np +from environs import Env from scipy.constants import Boltzmann from scipy.signal import sosfilt from scos_actions import utils from scos_actions.actions.interfaces.action import Action +from scos_actions.calibration.sensor_calibration import SensorCalibration from scos_actions.hardware.sensor import Sensor -from scos_actions.hardware.sigan_iface import SIGAN_SETTINGS_KEYS +from scos_actions.hardware.sigan_iface import SignalAnalyzerInterface from scos_actions.signal_processing.calibration import ( get_linear_enr, get_temperature, @@ -92,9 +101,10 @@ from scos_actions.signal_processing.power_analysis import calculate_power_watts from scos_actions.signal_processing.unit_conversion import convert_watts_to_dBm from scos_actions.signals import trigger_api_restart -from scos_actions.utils import ParameterException, get_parameter +from scos_actions.utils import ParameterException, get_datetime_str_now, get_parameter logger = logging.getLogger(__name__) +env = Env() # Define parameter keys RF_PATH = Action.PRESELECTOR_PATH_KEY @@ -112,6 +122,7 @@ IIR_RESP_FREQS = "iir_num_response_frequencies" CAL_SOURCE_IDX = "cal_source_idx" TEMP_SENSOR_IDX = "temp_sensor_idx" +REFERENCE_POINT = "reference_point" class YFactorCalibration(Action): @@ -196,6 +207,51 @@ def __init__(self, parameters: dict): def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): """This is the entrypoint function called by the scheduler.""" self.sensor = sensor + + # Prepare the sensor calibration object. + assert all( + self.iteration_params[0][REFERENCE_POINT] == p[REFERENCE_POINT] + for p in self.iteration_params + ), f"All iterations must use the same '{REFERENCE_POINT}' setting" + onboard_cal_reference = self.iteration_params[0][REFERENCE_POINT] + + if self.sensor.sensor_calibration is None: + # Create a new sensor calibration object and attach it to the sensor. + # The calibration parameters will be set to the sigan parameters used + # in the action YAML parameters. + sensor_uid = self.sensor.capabilities["sensor"]["sensor_spec"]["id"] + logger.debug( + f"Creating a new onboard cal object for the sensor {sensor_uid}." + ) + cal_params = list( + self.get_sigan_params( + self.iteration_params[0], self.sensor.signal_analyzer + ).keys() + ) + logger.debug(f"cal_params: {cal_params}") + cal_data = dict() + last_cal_datetime = get_datetime_str_now() + clock_rate_lookup_by_sample_rate = [] + self.sensor.sensor_calibration = SensorCalibration( + calibration_parameters=cal_params, + calibration_data=cal_data, + calibration_reference=onboard_cal_reference, + file_path=Path(env("ONBOARD_CALIBRATION_FILE")), + last_calibration_datetime=last_cal_datetime, + clock_rate_lookup_by_sample_rate=clock_rate_lookup_by_sample_rate, + sensor_uid=sensor_uid, + ) + elif self.sensor.sensor_calibration.file_path == env( + "ONBOARD_CALIBRATION_FILE" + ): + # Already using an onboard cal file. + logger.debug("Onboard calibration file already in use. Continuing.") + else: + # Sensor calibration file exists. Change it to an onboard cal file + logger.debug("Making new onboard cal file from existing sensor cal") + self.sensor.sensor_calibration.calibration_reference = onboard_cal_reference + self.sensor.sensor_calibration.file_path = env("ONBOARD_CALIBRATION_FILE") + self.test_required_components() detail = "" @@ -228,10 +284,8 @@ def calibrate(self, params: dict): # Get noise diode on IQ logger.debug("Acquiring IQ samples with noise diode ON") - noise_on_measurement_result = ( - self.sensor.signal_analyzer.acquire_time_domain_samples( - num_samples, num_samples_skip=nskip, cal_adjust=False - ) + noise_on_measurement_result = self.sensor.acquire_time_domain_samples( + num_samples, nskip, cal_adjust=False ) sample_rate = noise_on_measurement_result["sample_rate"] @@ -242,15 +296,14 @@ def calibrate(self, params: dict): # Get noise diode off IQ logger.debug("Acquiring IQ samples with noise diode OFF") - noise_off_measurement_result = ( - self.sensor.signal_analyzer.acquire_time_domain_samples( - num_samples, num_samples_skip=nskip, cal_adjust=False - ) + noise_off_measurement_result = self.sensor.acquire_time_domain_samples( + num_samples, nskip, cal_adjust=False ) assert ( sample_rate == noise_off_measurement_result["sample_rate"] ), "Sample rate mismatch" - sigan_params = {k: v for k, v in params.items() if k in SIGAN_SETTINGS_KEYS} + sigan_params = self.get_sigan_params(params, self.sensor.signal_analyzer) + logger.debug(f"sigan_params: {sigan_params}") # Apply IIR filtering to both captures if configured if self.iir_apply: # Estimate of IIR filter ENBW does NOT account for passband ripple in sensor transfer function! @@ -259,24 +312,22 @@ def calibrate(self, params: dict): noise_on_data = sosfilt(self.iir_sos, noise_on_measurement_result["data"]) noise_off_data = sosfilt(self.iir_sos, noise_off_measurement_result["data"]) else: - if self.sensor.signal_analyzer.sensor_calibration.is_default: - raise Exception( - "Calibrations without IIR filter cannot be performed with default calibration." - ) - logger.debug("Skipping IIR filtering") # Get ENBW from sensor calibration - assert set( - self.sensor.signal_analyzer.sensor_calibration.calibration_parameters - ) <= set( + assert set(self.sensor.sensor_calibration.calibration_parameters) <= set( sigan_params.keys() ), f"Action parameters do not include all required calibration parameters" cal_args = [ sigan_params[k] - for k in self.sensor.signal_analyzer.sensor_calibration.calibration_parameters + for k in self.sensor.sensor_calibration.calibration_parameters ] - self.sensor.signal_analyzer.recompute_sensor_calibration_data(cal_args) - enbw_hz = self.sensor.signal_analyzer.sensor_calibration_data["enbw"] + self.sensor.recompute_sensor_calibration_data(cal_args) + if "enbw" not in self.sensor.sensor_calibration_data: + raise Exception( + "Unable to perform Y-Factor calibration without IIR filtering when no" + " ENBW is provided in the sensor calibration file." + ) + enbw_hz = self.sensor.sensor_calibration_data["enbw"] noise_on_data = noise_on_measurement_result["data"] noise_off_data = noise_off_measurement_result["data"] @@ -294,7 +345,7 @@ def calibrate(self, params: dict): ) # Update sensor calibration with results - self.sensor.signal_analyzer.sensor_calibration.update( + self.sensor.sensor_calibration.update( sigan_params, utils.get_datetime_str_now(), gain, noise_figure, temp_c ) @@ -385,3 +436,11 @@ def test_required_components(self): raise RuntimeError(msg) if not self.sensor.signal_analyzer.healthy(): trigger_api_restart.send(sender=self.__class__) + + def get_sigan_params(self, params: dict, sigan: SignalAnalyzerInterface) -> dict: + sigan_params = {} + for k, v in params.items(): + if hasattr(sigan, k): + sigan_params[k] = v + + return sigan_params diff --git a/scos_actions/actions/interfaces/action.py b/scos_actions/actions/interfaces/action.py index 7d64fd98..cee001ae 100644 --- a/scos_actions/actions/interfaces/action.py +++ b/scos_actions/actions/interfaces/action.py @@ -4,7 +4,6 @@ from typing import Optional from scos_actions.hardware.sensor import Sensor -from scos_actions.hardware.sigan_iface import SIGAN_SETTINGS_KEYS from scos_actions.metadata.sigmf_builder import SigMFBuilder from scos_actions.metadata.structs import ntia_scos, ntia_sensor from scos_actions.utils import ParameterException, get_parameter @@ -51,13 +50,12 @@ def sensor(self, value: Sensor): self._sensor = value def configure_sigan(self, params: dict): - sigan_params = {k: v for k, v in params.items() if k in SIGAN_SETTINGS_KEYS} - for key, value in sigan_params.items(): + for key, value in params.items(): if hasattr(self.sensor.signal_analyzer, key): logger.debug(f"Applying setting to sigan: {key}: {value}") setattr(self.sensor.signal_analyzer, key, value) else: - logger.warning(f"Sigan does not have attribute {key}") + logger.debug(f"Sigan does not have attribute {key}") def configure_preselector(self, params: dict): preselector = self.sensor.preselector diff --git a/scos_actions/actions/interfaces/measurement_action.py b/scos_actions/actions/interfaces/measurement_action.py index 62f8fedd..e7ad088a 100644 --- a/scos_actions/actions/interfaces/measurement_action.py +++ b/scos_actions/actions/interfaces/measurement_action.py @@ -38,36 +38,46 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): def create_capture_segment( self, sample_start: int, - start_time: str, - center_frequency_Hz: float, - duration_ms: int, - overload: bool, sigan_settings: Optional[ntia_sensor.SiganSettings], + measurement_result: dict, ) -> CaptureSegment: capture_segment = CaptureSegment( sample_start=sample_start, - frequency=center_frequency_Hz, - datetime=start_time, - duration=duration_ms, - overload=overload, + frequency=measurement_result["frequency"], + datetime=measurement_result["capture_time"], + duration=measurement_result["duration_ms"], + overload=measurement_result["overload"], sigan_settings=sigan_settings, ) - sigan_cal = self.sensor.signal_analyzer.sigan_calibration_data - sensor_cal = self.sensor.signal_analyzer.sensor_calibration_data - # Rename compression point keys if they exist - # then set calibration metadata if it exists - if sensor_cal is not None: - if "1db_compression_point" in sensor_cal: - sensor_cal["compression_point"] = sensor_cal.pop( - "1db_compression_point" - ) - capture_segment.sensor_calibration = ntia_sensor.Calibration(**sensor_cal) - if sigan_cal is not None: - if "1db_compression_point" in sigan_cal: - sigan_cal["compression_point"] = sigan_cal.pop("1db_compression_point") - capture_segment.sigan_calibration = ntia_sensor.Calibration(**sigan_cal) + # Set calibration metadata if it exists + cal_meta = self.get_calibration(measurement_result) + if cal_meta is not None: + capture_segment.sensor_calibration = cal_meta return capture_segment + def get_calibration(self, measurement_result: dict) -> ntia_sensor.Calibration: + cal_meta = None + if ( + self.sensor.sensor_calibration_data is not None + and measurement_result["applied_calibration"] is not None + ): + cal_meta = ntia_sensor.Calibration( + datetime=self.sensor.sensor_calibration_data["datetime"], + gain=round(measurement_result["applied_calibration"]["gain"], 3), + noise_figure=round( + measurement_result["applied_calibration"]["noise_figure"], 3 + ), + temperature=round( + self.sensor.sensor_calibration_data["temperature"], 1 + ), + reference=measurement_result["reference"], + ) + if "compression_point" in measurement_result["applied_calibration"]: + cal_meta.compression_point = measurement_result["applied_calibration"][ + "compression_point" + ] + return cal_meta + def create_metadata( self, measurement_result: dict, @@ -154,17 +164,22 @@ def send_signals(self, task_id, metadata, measurement_data): ) def acquire_data( - self, num_samples: int, nskip: int = 0, cal_adjust: bool = True + self, + num_samples: int, + nskip: int = 0, + cal_adjust: bool = True, + cal_params: Optional[dict] = None, ) -> dict: logger.debug( f"Acquiring {num_samples} IQ samples, skipping the first {nskip} samples" + f" and {'' if cal_adjust else 'not '}applying gain adjustment based" + " on calibration data" ) - measurement_result = self.sensor.signal_analyzer.acquire_time_domain_samples( + measurement_result = self.sensor.acquire_time_domain_samples( num_samples, num_samples_skip=nskip, cal_adjust=cal_adjust, + cal_params=cal_params, ) return measurement_result diff --git a/scos_actions/actions/tests/test_acquire_single_freq_fft.py b/scos_actions/actions/tests/test_acquire_single_freq_fft.py index 0419d7cc..e4d9c9fb 100644 --- a/scos_actions/actions/tests/test_acquire_single_freq_fft.py +++ b/scos_actions/actions/tests/test_acquire_single_freq_fft.py @@ -1,7 +1,6 @@ from scos_actions.actions.tests.utils import check_metadata_fields from scos_actions.discover import test_actions as actions -from scos_actions.hardware.mocks.mock_sigan import MockSignalAnalyzer -from scos_actions.hardware.sensor import Sensor +from scos_actions.hardware.mocks.mock_sensor import MockSensor from scos_actions.signals import measurement_action_completed SINGLE_FREQUENCY_FFT_ACQUISITION = { @@ -29,9 +28,8 @@ def callback(sender, **kwargs): measurement_action_completed.connect(callback) action = actions["test_single_frequency_m4s_action"] assert action.description - sensor = Sensor(signal_analyzer=MockSignalAnalyzer(), capabilities={}) action( - sensor=sensor, + sensor=MockSensor(), schedule_entry=SINGLE_FREQUENCY_FFT_ACQUISITION, task_id=1, ) @@ -84,11 +82,3 @@ def callback(sender, **kwargs): ] ] ) - - -def test_num_samples_skip(): - action = actions["test_single_frequency_m4s_action"] - assert action.description - sensor = Sensor(signal_analyzer=MockSignalAnalyzer(), capabilities={}) - action(sensor, SINGLE_FREQUENCY_FFT_ACQUISITION, 1) - assert action.sensor.signal_analyzer._num_samples_skip == action.parameters["nskip"] diff --git a/scos_actions/actions/tests/test_monitor_sigan.py b/scos_actions/actions/tests/test_monitor_sigan.py index 9a00ea71..38085efd 100644 --- a/scos_actions/actions/tests/test_monitor_sigan.py +++ b/scos_actions/actions/tests/test_monitor_sigan.py @@ -1,6 +1,6 @@ from scos_actions.discover import test_actions as actions +from scos_actions.hardware.mocks.mock_sensor import MockSensor from scos_actions.hardware.mocks.mock_sigan import MockSignalAnalyzer -from scos_actions.hardware.sensor import Sensor from scos_actions.signals import trigger_api_restart MONITOR_SIGAN_SCHEDULE = { @@ -23,10 +23,10 @@ def callback(sender, **kwargs): action = actions["test_monitor_sigan"] mock_sigan = MockSignalAnalyzer() mock_sigan._is_available = False - sensor = Sensor(signal_analyzer=mock_sigan, capabilities={}) + sensor = MockSensor(signal_analyzer=mock_sigan) action(sensor, MONITOR_SIGAN_SCHEDULE, 1) assert _api_restart_triggered == True # signal sent - mock_sigan._is_available = True + sensor.signal_analyzer._is_available = True def test_monitor_sigan_not_healthy(): @@ -40,7 +40,7 @@ def callback(sender, **kwargs): action = actions["test_monitor_sigan"] mock_sigan = MockSignalAnalyzer() mock_sigan.times_to_fail_recv = 6 - sensor = Sensor(signal_analyzer=mock_sigan, capabilities={}) + sensor = MockSensor(signal_analyzer=mock_sigan) action(sensor, MONITOR_SIGAN_SCHEDULE, 1) assert _api_restart_triggered == True # signal sent @@ -57,6 +57,6 @@ def callback(sender, **kwargs): mock_sigan = MockSignalAnalyzer() mock_sigan._is_available = True mock_sigan.set_times_to_fail_recv(0) - sensor = Sensor(signal_analyzer=mock_sigan, capabilities={}) + sensor = MockSensor(signal_analyzer=mock_sigan) action(sensor, MONITOR_SIGAN_SCHEDULE, 1) assert _api_restart_triggered == False # signal not sent diff --git a/scos_actions/actions/tests/test_single_freq_tdomain_iq.py b/scos_actions/actions/tests/test_single_freq_tdomain_iq.py index 67d86495..774ce91e 100644 --- a/scos_actions/actions/tests/test_single_freq_tdomain_iq.py +++ b/scos_actions/actions/tests/test_single_freq_tdomain_iq.py @@ -2,8 +2,8 @@ from scos_actions.actions.tests.utils import check_metadata_fields from scos_actions.discover import test_actions as actions +from scos_actions.hardware.mocks.mock_sensor import MockSensor from scos_actions.hardware.mocks.mock_sigan import MockSignalAnalyzer -from scos_actions.hardware.sensor import Sensor from scos_actions.signals import measurement_action_completed SINGLE_TIMEDOMAIN_IQ_ACQUISITION = { @@ -31,7 +31,7 @@ def callback(sender, **kwargs): measurement_action_completed.connect(callback) action = actions["test_single_frequency_iq_action"] assert action.description - sensor = Sensor(signal_analyzer=MockSignalAnalyzer(), capabilities={}) + sensor = MockSensor() action(sensor, SINGLE_TIMEDOMAIN_IQ_ACQUISITION, 1) assert _data.any() assert _metadata @@ -62,16 +62,7 @@ def test_required_components(): action = actions["test_single_frequency_m4s_action"] mock_sigan = MockSignalAnalyzer() mock_sigan._is_available = False - sensor = Sensor(signal_analyzer=mock_sigan, capabilities={}) + sensor = MockSensor(signal_analyzer=mock_sigan) with pytest.raises(RuntimeError): action(sensor, SINGLE_TIMEDOMAIN_IQ_ACQUISITION, 1) mock_sigan._is_available = True - - -def test_num_samples_skip(): - action = actions["test_single_frequency_iq_action"] - assert action.description - mock_sigan = MockSignalAnalyzer() - sensor = Sensor(signal_analyzer=mock_sigan, capabilities={}) - action(sensor, SINGLE_TIMEDOMAIN_IQ_ACQUISITION, 1) - assert action.sensor.signal_analyzer._num_samples_skip == action.parameters["nskip"] diff --git a/scos_actions/actions/tests/test_stepped_freq_tdomain_iq.py b/scos_actions/actions/tests/test_stepped_freq_tdomain_iq.py index e06920cc..2183d0bf 100644 --- a/scos_actions/actions/tests/test_stepped_freq_tdomain_iq.py +++ b/scos_actions/actions/tests/test_stepped_freq_tdomain_iq.py @@ -1,6 +1,5 @@ from scos_actions.discover import test_actions as actions -from scos_actions.hardware.mocks.mock_sigan import MockSignalAnalyzer -from scos_actions.hardware.sensor import Sensor +from scos_actions.hardware.mocks.mock_sensor import MockSensor from scos_actions.signals import measurement_action_completed SINGLE_TIMEDOMAIN_IQ_MULTI_RECORDING_ACQUISITION = { @@ -34,8 +33,7 @@ def callback(sender, **kwargs): measurement_action_completed.connect(callback) action = actions["test_multi_frequency_iq_action"] assert action.description - mock_sigan = MockSignalAnalyzer() - sensor = Sensor(signal_analyzer=mock_sigan, capabilities={}) + sensor = MockSensor() action(sensor, SINGLE_TIMEDOMAIN_IQ_MULTI_RECORDING_ACQUISITION, 1) for i in range(_count): assert _datas[i].any() @@ -43,21 +41,3 @@ def callback(sender, **kwargs): assert _task_ids[i] == 1 assert _recording_ids[i] == i + 1 assert _count == 10 - - -def test_num_samples_skip(): - action = actions["test_multi_frequency_iq_action"] - assert action.description - mock_sigan = MockSignalAnalyzer() - sensor = Sensor(signal_analyzer=mock_sigan, capabilities={}) - action(sensor, SINGLE_TIMEDOMAIN_IQ_MULTI_RECORDING_ACQUISITION, 1) - if isinstance(action.parameters["nskip"], list): - assert ( - action.sensor.signal_analyzer._num_samples_skip - == action.parameters["nskip"][-1] - ) - else: - assert ( - action.sensor.signal_analyzer._num_samples_skip - == action.parameters["nskip"] - ) diff --git a/scos_actions/actions/tests/test_sync_gps.py b/scos_actions/actions/tests/test_sync_gps.py index 024dbcee..0a98c154 100644 --- a/scos_actions/actions/tests/test_sync_gps.py +++ b/scos_actions/actions/tests/test_sync_gps.py @@ -4,9 +4,7 @@ import pytest from scos_actions.discover import test_actions -from scos_actions.hardware.mocks.mock_gps import MockGPS -from scos_actions.hardware.mocks.mock_sigan import MockSignalAnalyzer -from scos_actions.hardware.sensor import Sensor +from scos_actions.hardware.mocks.mock_sensor import MockSensor from scos_actions.signals import location_action_completed SYNC_GPS = { @@ -30,8 +28,7 @@ def callback(sender, **kwargs): location_action_completed.connect(callback) action = test_actions["test_sync_gps"] - sigan = MockSignalAnalyzer() - sensor = Sensor(signal_analyzer=sigan, capabilities={}, gps=MockGPS(sigan)) + sensor = MockSensor() if sys.platform == "linux": action(sensor, SYNC_GPS, 1) assert _latitude diff --git a/scos_actions/calibration/calibration.py b/scos_actions/calibration/calibration.py deleted file mode 100644 index 2506d825..00000000 --- a/scos_actions/calibration/calibration.py +++ /dev/null @@ -1,214 +0,0 @@ -import json -import logging -from dataclasses import dataclass -from pathlib import Path -from typing import Dict, List, Union - -from scos_actions.signal_processing.calibration import CalibrationException - -logger = logging.getLogger(__name__) - - -@dataclass -class Calibration: - last_calibration_datetime: str - calibration_parameters: List[str] - calibration_data: dict - clock_rate_lookup_by_sample_rate: List[Dict[str, float]] - is_default: bool - file_path: Path - - def __post_init__(self): - # Convert key names in calibration_data to strings - # This means that formatting will always match between - # native types provided in Python and data loaded from JSON - self.calibration_data = json.loads(json.dumps(self.calibration_data)) - - def get_clock_rate(self, sample_rate: Union[float, int]) -> Union[float, int]: - """Find the clock rate (Hz) using the given sample_rate (samples per second)""" - for mapping in self.clock_rate_lookup_by_sample_rate: - if mapping["sample_rate"] == sample_rate: - return mapping["clock_frequency"] - return sample_rate - - def get_calibration_dict(self, cal_params: List[Union[float, int, bool]]) -> dict: - """ - Get calibration data closest to the specified parameter values. - - :param cal_params: List of calibration parameter values. For example, - if ``calibration_parameters`` are ``["sample_rate", "gain"]``, - then the input to this method could be ``["15360000.0", "40"]``. - :return: The calibration data corresponding to the input parameter values. - """ - - cal_data = self.calibration_data - for i, setting_value in enumerate(cal_params): - setting = self.calibration_parameters[i] - logger.debug(f"Looking up calibration for {setting} at {setting_value}") - cal_data = filter_by_parameter(cal_data, setting_value) - logger.debug(f"Got calibration data: {cal_data}") - - return cal_data - - def update( - self, - params: dict, - calibration_datetime_str: str, - gain_dB: float, - noise_figure_dB: float, - temp_degC: float, - ) -> None: - """ - Update the calibration data by overwriting or adding an entry. - - This method updates the instance variables of the ``Calibration`` - object and additionally writes these changes to the specified - output file. - - :param params: Parameters used for calibration. This must include - entries for all of the ``Calibration.calibration_parameters`` - Example: ``{"sample_rate": 14000000.0, "attenuation": 10.0}`` - :param calibration_datetime_str: Calibration datetime string, - as returned by ``scos_actions.utils.get_datetime_str_now()`` - :param gain_dB: Gain value from calibration, in dB. - :param noise_figure_dB: Noise figure value for calibration, in dB. - :param temp_degC: Temperature at calibration time, in degrees Celsius. - :param file_path: File path for saving the updated calibration data. - :raises Exception: - """ - cal_data = self.calibration_data - self.last_calibration_datetime = calibration_datetime_str - if len(self.calibration_parameters) == 0: - self.calibration_parameters = list(params.keys()) - # Ensure all required calibration parameters were used - elif not set(params.keys()) >= set(self.calibration_parameters): - raise Exception( - "Not enough parameters specified to update calibration.\n" - + f"Required parameters are {self.calibration_parameters}" - ) - - # Get calibration entry by parameters used - for parameter in self.calibration_parameters: - value = str(params[parameter]).lower() - logger.debug(f"Updating calibration at {parameter} = {value}") - try: - cal_data = cal_data[value] - except KeyError: - logger.debug( - f"Creating required calibration data field for {parameter} = {value}" - ) - cal_data[value] = {} - cal_data = cal_data[value] - - # Update calibration data - cal_data.update( - { - "datetime": calibration_datetime_str, - "gain": gain_dB, - "noise_figure": noise_figure_dB, - "temperature": temp_degC, - } - ) - - # Write updated calibration data to file - cal_dict = { - "last_calibration_datetime": self.last_calibration_datetime, - "calibration_parameters": self.calibration_parameters, - "clock_rate_lookup_by_sample_rate": self.clock_rate_lookup_by_sample_rate, - "calibration_data": self.calibration_data, - } - with open(self.file_path, "w") as outfile: - outfile.write(json.dumps(cal_dict)) - - -def load_from_json(fname: Path, is_default: bool) -> Calibration: - """ - Load a calibration from a JSON file. - - The JSON file must contain top-level fields: - ``last_calibration_datetime`` - ``calibration_parameters`` - ``calibration_data`` - ``clock_rate_lookup_by_sample_rate`` - - :param fname: The ``Path`` to the JSON calibration file. - :param is_default: If True, the loaded calibration file - is treated as the default calibration file. - :raises Exception: If the provided file does not include - the required keys. - :return: The ``Calibration`` object generated from the file. - """ - with open(fname) as file: - calibration = json.load(file) - # Check that the required fields are in the dict - required_keys = { - "last_calibration_datetime", - "calibration_data", - "clock_rate_lookup_by_sample_rate", - "calibration_parameters", - } - - if not calibration.keys() >= required_keys: - raise Exception( - "Loaded calibration dictionary is missing required fields." - + f"Existing fields: {set(calibration.keys())}\n" - + f"Required fields: {required_keys}\n" - ) - # Create and return the Calibration object - return Calibration( - calibration["last_calibration_datetime"], - calibration["calibration_parameters"], - calibration["calibration_data"], - calibration["clock_rate_lookup_by_sample_rate"], - is_default, - fname, - ) - - -def filter_by_parameter(calibrations: dict, value: Union[float, int, bool]) -> dict: - """ - Select a certain element by the value of a top-level key in a dictionary. - - This method should be recursively called to select calibration - data matching a set of calibration parameters. The ordering of - nested dictionaries should match the ordering of the required - calibration parameters in the calibration file. - - If ``value`` is a float or bool, ``str(value).lower()`` is used - as the dictionary key. If ``value`` is an int, and the previous - approach does not work, ``str(float(value))`` is attempted. This - allows for value ``1`` to match a key ``"1.0"``. - - :param calibrations: Calibration data dictionary. - :param value: The parameter value for filtering. This value should - exist as a top-level key in ``calibrations``. - :raises CalibrationException: If ``value`` cannot be matched to a - top-level key in ``calibrations``, or if ``calibrations`` is not - a dict. - :return: The value of ``calibrations[value]``, which should be a dict. - """ - try: - filtered_data = calibrations.get(str(value).lower(), None) - if filtered_data is None and isinstance(value, int): - # Try equivalent float for ints, i.e., match "1.0" to 1 - filtered_data = calibrations.get(str(float(value)), None) - if filtered_data is None and isinstance(value, float) and value.is_integer(): - # Check for, e.g., key '25' if value is '25.0' - filtered_data = calibrations.get(str(int(value)), None) - if filtered_data is None: - raise KeyError - else: - return filtered_data - except AttributeError as e: - # calibrations does not have ".get()" - # Generally means that calibrations is None or not a dict - msg = f"Provided calibration data is not a dict: {calibrations}" - raise CalibrationException(msg) - except KeyError as e: - msg = ( - f"Could not locate calibration data at {value}" - + f"\nAttempted lookup using key '{str(value).lower()}'" - + f"{f'and {float(value)}' if isinstance(value, int) else ''}" - + f"\nUsing calibration data: {calibrations}" - ) - raise CalibrationException(msg) diff --git a/scos_actions/calibration/differential_calibration.py b/scos_actions/calibration/differential_calibration.py new file mode 100644 index 00000000..de877f8a --- /dev/null +++ b/scos_actions/calibration/differential_calibration.py @@ -0,0 +1,35 @@ +from dataclasses import dataclass + +from scos_actions.calibration.interfaces.calibration import Calibration + + +@dataclass +class DifferentialCalibration(Calibration): + """ + Dataclass implementation for "differential calibration" handling. + + A differential calibration provides loss values which represent excess loss + between the ``SensorCalibration.calibration_reference`` reference point and + another reference point. A typical usage would be for calibrating out measured + cable losses which exist between the antenna and the Y-factor calibration terminal. + At present, this is measured manually using a calibration probe consisting of a + calibrated noise source and a programmable attenuator. + + The ``DifferentialCalibration.calibration_data`` entries should be dictionaries + containing the key ``"loss"`` and a corresponding value in decibels (dB). A positive + value of ``"loss"`` indicates a LOSS going FROM ``DifferentialCalibration.calibration_reference`` + TO ``SensorCalibration.calibration_reference``. + """ + + def update(self): + """ + SCOS Sensor should not update differential calibration files. + + Instead, these should be generated through an external calibration + process. This class should only be used to read JSON files, and never + to update their entries. Therefore, no ``update`` method is implemented. + + If, at some point in the future, functionality is added to automate these + calibrations, this function may be implemented. + """ + raise NotImplementedError diff --git a/scos_actions/tests/resources/__init__.py b/scos_actions/calibration/interfaces/__init__.py similarity index 100% rename from scos_actions/tests/resources/__init__.py rename to scos_actions/calibration/interfaces/__init__.py diff --git a/scos_actions/calibration/interfaces/calibration.py b/scos_actions/calibration/interfaces/calibration.py new file mode 100644 index 00000000..f3ea522c --- /dev/null +++ b/scos_actions/calibration/interfaces/calibration.py @@ -0,0 +1,141 @@ +import dataclasses +import json +import logging +from abc import abstractmethod +from pathlib import Path +from typing import List, get_origin + +from scos_actions.calibration.utils import ( + CalibrationParametersMissingException, + filter_by_parameter, +) + +logger = logging.getLogger(__name__) + + +@dataclasses.dataclass +class Calibration: + """ + Base class to handle calibrated gains, noise figures, compression points, and losses. + The calibration_parameters defined the settings used to perform calibrations and the + order in which calibrations may be accessed in the calibration_data dictionary. + For example, if calibration_parameters where [frequency, sample_rate] then the + calibration for a particular frequency and sample rate would be accessed in + the calibration_data dictionary by the string value of the frequency and + sample rate, like calibration_data["3555000000.0"]["14000000.0"]. The + calibration_reference indicates the reference point for the calibration, e.d., + antenna terminal, or noise source output. The file_path determines where + updates (if allowed) will be saved. + """ + + calibration_parameters: List[str] + calibration_data: dict + calibration_reference: str + file_path: Path + + def __post_init__(self): + self._validate_fields() + # Convert key names in data to strings + # This means that formatting will always match between + # native types provided in Python and data loaded from JSON + self.calibration_data = json.loads(json.dumps(self.calibration_data)) + + def _validate_fields(self) -> None: + """Loosely check that the input types are as expected.""" + for f_name, f_def in self.__dataclass_fields__.items(): + # Note that nested types are not checked: i.e., "List[str]" + # will surely be a list, but may not be filled with strings. + f_type = get_origin(f_def.type) or f_def.type + actual_value = getattr(self, f_name) + if not isinstance(actual_value, f_type): + c_name = self.__class__.__name__ + actual_type = type(actual_value) + raise TypeError( + f"{c_name} field {f_name} must be {f_type}, not {actual_type}" + ) + + def get_calibration_dict(self, params: dict) -> dict: + """ + Get calibration data entry at the specified parameter values. + + :param params: Parameters used for calibration. This must include + entries for all of the ``Calibration.calibration_parameters`` + Example: ``{"sample_rate": 14000000.0, "attenuation": 10.0}`` + :return: The calibration data corresponding to the input parameter values. + """ + # Check that input includes all required calibration parameters + if not set(params.keys()) >= set(self.calibration_parameters): + raise CalibrationParametersMissingException( + params, self.calibration_parameters + ) + cal_data = self.calibration_data + for p_name in self.calibration_parameters: + p_value = params[p_name] + logger.debug(f"Looking up calibration data at {p_name}={p_value}") + cal_data = filter_by_parameter(cal_data, p_value) + + logger.debug(f"Got calibration data: {cal_data}") + + return cal_data + + @abstractmethod + def update(self): + """Update the calibration data""" + raise NotImplementedError + + @classmethod + def from_json(cls, fname: Path): + """ + Load a calibration from a JSON file. + + The JSON file must contain top-level fields + with names identical to the dataclass fields for + the class being constructed. + + :param fname: The ``Path`` to the JSON calibration file. + :raises Exception: If the provided file does not include + the required keys. + :return: The ``Calibration`` object generated from the file. + """ + with open(fname) as file: + calibration = json.load(file) + cal_file_keys = set(calibration.keys()) + + # Check that only the required fields are in the dict + required_keys = {f.name for f in dataclasses.fields(cls)} + required_keys -= {"file_path"} # not required in JSON + if cal_file_keys == required_keys: + pass + elif cal_file_keys >= required_keys: + extra_keys = cal_file_keys - required_keys + logger.warning( + f"Loaded calibration file contains fields which will be ignored: {extra_keys}" + ) + for k in extra_keys: + calibration.pop(k, None) + else: + raise Exception( + "Loaded calibration dictionary is missing required fields.\n" + + f"Existing fields: {cal_file_keys}\n" + + f"Required fields: {required_keys}\n" + + f"Missing fields: {required_keys - cal_file_keys}" + ) + + # Create and return the Calibration object + return cls(file_path=fname, **calibration) + + def to_json(self) -> None: + """ + Save the calibration to a JSON file. + + The JSON file will be located at ``self.file_path`` and will + contain a copy of ``self.__dict__``, except for the ``file_path`` + key/value pair. This includes all dataclass fields, with their + parameter names as JSON key names. + """ + dict_to_json = self.__dict__.copy() + # Remove keys which should not save to JSON + dict_to_json.pop("file_path", None) + with open(self.file_path, "w") as outfile: + outfile.write(json.dumps(dict_to_json)) + logger.debug("Finished updating calibration file.") diff --git a/scos_actions/calibration/sensor_calibration.py b/scos_actions/calibration/sensor_calibration.py new file mode 100644 index 00000000..1ce985ee --- /dev/null +++ b/scos_actions/calibration/sensor_calibration.py @@ -0,0 +1,138 @@ +import logging +from dataclasses import dataclass +from datetime import datetime +from typing import Dict, List, Union + +from environs import Env + +from scos_actions.calibration.interfaces.calibration import Calibration +from scos_actions.calibration.utils import CalibrationEntryMissingException +from scos_actions.utils import get_datetime_str_now, parse_datetime_iso_format_str + +logger = logging.getLogger(__name__) + + +@dataclass +class SensorCalibration(Calibration): + """ + Extends the ``Calibration`` class to represent calibration + data that may be updated. Within SCOS Sensor,``SensorCalibration`` + instances are used to handle calibration files generated prior + to deployment through a lab-based calibration as well as the result + of calibrations that are performed by the sensor in the field. This + class provides an implementation for the update method to allow calibration + data to be updated with new values. + """ + + last_calibration_datetime: str + clock_rate_lookup_by_sample_rate: List[Dict[str, float]] + sensor_uid: str + + def get_clock_rate(self, sample_rate: Union[float, int]) -> Union[float, int]: + """Find the clock rate (Hz) using the given sample_rate (samples per second)""" + for mapping in self.clock_rate_lookup_by_sample_rate: + if mapping["sample_rate"] == sample_rate: + return mapping["clock_frequency"] + return sample_rate + + def update( + self, + params: dict, + calibration_datetime_str: str, + gain_dB: float, + noise_figure_dB: float, + temp_degC: float, + ) -> None: + """ + Update the calibration data by overwriting or adding an entry. + + This updates the instance variables of the ``SensorCalibration`` + object and additionally writes these changes to file specified + by the instance's file_path property. + + :param params: Parameters used for calibration. This must include + entries for all of the ``Calibration.calibration_parameters`` + Example: ``{"sample_rate": 14000000.0, "attenuation": 10.0}`` + :param calibration_datetime_str: Calibration datetime string, + as returned by ``scos_actions.utils.get_datetime_str_now()`` + :param gain_dB: Gain value from calibration, in dB. + :param noise_figure_dB: Noise figure value for calibration, in dB. + :param temp_degC: Temperature at calibration time, in degrees Celsius. + """ + logger.debug(f"Updating calibration file for params {params}") + try: + # Get existing calibration data entry which will be updated + data_entry = self.get_calibration_dict(params) + except CalibrationEntryMissingException: + # Existing entry does not exist for these parameters. Make one. + data_entry = self.calibration_data + for p_name in self.calibration_parameters: + p_val = str(params[p_name]).lower() + try: + data_entry = data_entry[p_val] + except KeyError: + logger.debug( + f"Creating calibration data field for {p_name}={p_val}" + ) + data_entry[p_val] = {} + data_entry = data_entry[p_val] + except Exception as e: + logger.exception("Failed to update calibration data.") + raise e + + # Update last calibration datetime + self.last_calibration_datetime = calibration_datetime_str + + # Update calibration data entry (updates entry in self.calibration_data) + data_entry.update( + { + "datetime": calibration_datetime_str, + "gain": gain_dB, + "noise_figure": noise_figure_dB, + "temperature": temp_degC, + } + ) + + # Write updated calibration data to file + self.to_json() + + def expired(self) -> bool: + env = Env() + time_limit = env.int("CALIBRATION_EXPIRATION_LIMIT", default=None) + logger.debug("Checking if calibration has expired.") + now_string = get_datetime_str_now() + now = parse_datetime_iso_format_str(now_string) + if time_limit is None: + return False + elif self.calibration_data is None: + return True + elif len(self.calibration_data) == 0: + return True + elif date_expired(self.last_calibration_datetime, now, time_limit): + return True + else: + cal_data = self.calibration_data + return has_expired_cal_data(cal_data, now, time_limit) + + +def has_expired_cal_data(cal_data: dict, now: datetime, time_limit: int) -> bool: + expired = False + if "datetime" in cal_data: + expired = expired or date_expired(cal_data["datetime"], now, time_limit) + + for key, value in cal_data.items(): + if isinstance(value, dict): + expired = expired or has_expired_cal_data(value, now, time_limit) + return expired + + +def date_expired(cal_date: str, now: datetime, time_limit: int): + cal_datetime = parse_datetime_iso_format_str(cal_date) + elapsed = now - cal_datetime + logger.debug(f"{cal_datetime} is {elapsed} seconds old") + if elapsed.total_seconds() > time_limit: + logger.debug( + f"Calibration at {cal_date} has expired at {elapsed.total_seconds()} seconds old." + ) + return True + return False diff --git a/scos_actions/calibration/tests/test_calibration.py b/scos_actions/calibration/tests/test_calibration.py index 9e503acd..20c160b8 100644 --- a/scos_actions/calibration/tests/test_calibration.py +++ b/scos_actions/calibration/tests/test_calibration.py @@ -1,322 +1,125 @@ -"""Test aspects of ScaleFactors.""" +"""Test the Calibration base dataclass.""" -import datetime +import dataclasses import json -import random -from copy import deepcopy -from math import isclose from pathlib import Path +from typing import List import pytest -from scos_actions.calibration.calibration import ( - Calibration, - filter_by_parameter, - load_from_json, -) -from scos_actions.signal_processing.calibration import CalibrationException -from scos_actions.tests.resources.utils import easy_gain -from scos_actions.utils import get_datetime_str_now, parse_datetime_iso_format_str +from scos_actions.calibration.interfaces.calibration import Calibration +from scos_actions.calibration.sensor_calibration import SensorCalibration +from scos_actions.calibration.tests.utils import recursive_check_keys -class TestCalibrationFile: - # Ensure we load the test file - setup_complete = False - - def rand_index(self, l): - """Get a random index for a list""" - return random.randint(0, len(l) - 1) - - def check_duplicate(self, sr, f, g): - """Check if a set of points was already tested""" - for pt in self.pytest_points: - duplicate_f = f == pt["frequency"] - duplicate_g = g == pt["setting_value"] - duplicate_sr = sr == pt["sample_rate"] - if duplicate_f and duplicate_g and duplicate_sr: - return True - - def run_pytest_point(self, sr, f, g, reason, sr_m=False, f_m=False, g_m=False): - """Test the calculated value against the algorithm - Parameters: - sr, f, g -> Set values for the mock USRP - reason: Test case string for failure reference - sr_m, f_m, g_m -> Set values to use when calculating the expected value - May differ in from actual set points in edge cases - such as tuning in divisions or uncalibrated sample rate""" - # Check that the setup was completed - assert self.setup_complete, "Setup was not completed" - - # If this point was tested before, skip it (triggering a new one) - if self.check_duplicate(sr, f, g): - return False - - # If the point doesn't have modified inputs, use the algorithm ones - if not f_m: - f_m = f - if not g_m: - g_m = g - if not sr_m: - sr_m = sr - - # Calculate what the scale factor should be - calc_gain_sigan = easy_gain(sr_m, f_m, g_m) - - # Get the scale factor from the algorithm - interp_cal_data = self.sample_cal.get_calibration_dict([sr, f, g]) - interp_gain_siggan = interp_cal_data["gain"] - - # Save the point so we don't duplicate - self.pytest_points.append( - { - "sample_rate": int(sr), - "frequency": f, - "setting_value": g, - "gain": calc_gain_sigan, - "test": reason, - } - ) - - # Check if the point was calculated correctly - tolerance = 1e-5 - msg = "Scale factor not correctly calculated!\r\n" - msg = f"{msg} Expected value: {calc_gain_sigan}\r\n" - msg = f"{msg} Calculated value: {interp_gain_siggan}\r\n" - msg = f"{msg} Tolerance: {tolerance}\r\n" - msg = f"{msg} Test: {reason}\r\n" - msg = f"{msg} Sample Rate: {sr / 1e6}({sr_m / 1e6})\r\n" - msg = f"{msg} Frequency: {f / 1e6}({f_m / 1e6})\r\n" - msg = f"{msg} Gain: {g}({g_m})\r\n" - msg = ( - "{} Formula: -1 * (Gain - Frequency[GHz] - Sample Rate[MHz])\r\n".format( - msg - ) - ) - if not isclose(calc_gain_sigan, interp_gain_siggan, abs_tol=tolerance): - interp_cal_data = self.sample_cal.get_calibration_dict([sr, f, g]) - - assert isclose(calc_gain_sigan, interp_gain_siggan, abs_tol=tolerance), msg - return True - +class TestBaseCalibration: @pytest.fixture(autouse=True) - def setup_calibration_file(self, tmpdir): - """Create the dummy calibration file in the pytest temp directory""" - - # Only setup once - if self.setup_complete: - return - - # Create and save the temp directory and file - self.tmpdir = tmpdir.strpath - self.calibration_file = "{}".format(tmpdir.join("dummy_cal_file.json")) - - # Setup variables - self.dummy_noise_figure = 10 - self.dummy_compression = -20 - self.test_repeat_times = 3 - - # Sweep variables - self.sample_rates = [10e6, 15.36e6, 40e6] - self.gain_min = 40 - self.gain_max = 60 - self.gain_step = 10 - gains = list(range(self.gain_min, self.gain_max, self.gain_step)) + [ - self.gain_max - ] - self.frequency_min = 1000000000 - self.frequency_max = 3400000000 - self.frequency_step = 200000000 - frequencies = list( - range(self.frequency_min, self.frequency_max, self.frequency_step) - ) + [self.frequency_max] - frequencies = sorted(frequencies) - - # Start with blank cal data dicts + def setup_calibration_file(self, tmp_path: Path): + """Create a dummy calibration file in the pytest temp directory.""" + # Create some dummy calibration data + self.cal_params = ["frequency", "gain"] + self.frequencies = [3555e6, 3565e6, 3575e6] + self.gains = [10.0, 20.0, 30.0] cal_data = {} - - # Add the simple stuff to new cal format - cal_data["last_calibration_datetime"] = get_datetime_str_now() - cal_data["sensor_uid"] = "SAMPLE_CALIBRATION" - - # Add SR/CF lookup table - cal_data["clock_rate_lookup_by_sample_rate"] = [] - for sr in self.sample_rates: - cr = sr - while cr <= 40e6: - cr *= 2 - cr /= 2 - cal_data["clock_rate_lookup_by_sample_rate"].append( - {"sample_rate": int(sr), "clock_frequency": int(cr)} - ) - - # Create the JSON architecture for the calibration data - cal_data["calibration_data"] = {} - cal_data["calibration_parameters"] = ["sample_rate", "frequency", "gain"] - for k in range(len(self.sample_rates)): - cal_data_f = {} - for i in range(len(frequencies)): - cal_data_g = {} - for j in range(len(gains)): - # Create the scale factor that ensures easy interpolation - gain_sigan = easy_gain( - self.sample_rates[k], frequencies[i], gains[j] - ) - - # Create the data point - cal_data_point = { - "gain": gain_sigan, - "noise_figure": self.dummy_noise_figure, - "1dB_compression_point": self.dummy_compression, - } - - # Add the generated dicts to the parent lists - cal_data_g[gains[j]] = deepcopy(cal_data_point) - cal_data_f[frequencies[i]] = deepcopy(cal_data_g) - - cal_data["calibration_data"][self.sample_rates[k]] = deepcopy(cal_data_f) - - # Write the new json file - with open(self.calibration_file, "w+") as file: - json.dump(cal_data, file, indent=4) - - # Load the data back in - self.sample_cal = load_from_json(self.calibration_file, False) - - # Create a list of previous points to ensure that we don't repeat - self.pytest_points = [] - - # Create sweep lists for test points - self.srs = self.sample_rates - self.gi_s = list(range(self.gain_min, self.gain_max, self.gain_step)) - self.fi_s = list( - range(self.frequency_min, self.frequency_max, self.frequency_step) + for frequency in self.frequencies: + cal_data[frequency] = {} + for gain in self.gains: + cal_data[frequency][gain] = { + "gain": gain * 1.1, + "noise_figure": gain / 5.0, + "1dB_compression_point": -50 + gain, + } + self.cal_data = cal_data + self.dummy_file_path = tmp_path / "dummy_cal.json" + self.dummy_default_file_path = tmp_path / "dummy_default_cal.json" + + self.sample_cal = Calibration( + calibration_parameters=self.cal_params, + calibration_data=self.cal_data, + calibration_reference="testing", + file_path=self.dummy_file_path, ) - self.g_s = self.gi_s + [self.gain_max] - self.f_s = self.fi_s + [self.frequency_max] - - # Don't repeat test setup - self.setup_complete = True - - def test_filter_by_parameter_out_of_range(self): - calibrations = {200.0: {"some_cal_data"}, 300.0: {"more cal data"}} - with pytest.raises(CalibrationException) as e_info: - cal = filter_by_parameter(calibrations, 400.0) - assert ( - e_info.value.args[0] - == f"Could not locate calibration data at 400.0" - + f"\nAttempted lookup using key '400.0'" - + f"\nUsing calibration data: {calibrations}" - ) - def test_filter_by_parameter_in_range_requires_match(self): - calibrations = { - 200.0: {"Gain": "Gain at 200.0"}, - 300.0: {"Gain": "Gain at 300.0"}, - } - with pytest.raises(CalibrationException) as e_info: - cal = filter_by_parameter(calibrations, 150.0) - assert e_info.value.args[0] == ( - f"Could not locate calibration data at 150.0" - + f"\nAttempted lookup using key '150.0'" - + f"\nUsing calibration data: {calibrations}" - ) - - def test_get_calibration_dict_exact_match_lookup(self): - calibration_datetime = datetime.datetime.now() - calibration_params = ["sample_rate", "frequency"] - calibration_data = { - 100.0: {200.0: {"NF": "NF at 100, 200", "Gain": "Gain at 100, 200"}}, - 200.0: {100.0: {"NF": "NF at 200, 100", "Gain": "Gain at 200, 100"}}, - } - clock_rate_lookup_by_sample_rate = {} - cal = Calibration( - calibration_datetime, - calibration_params, - calibration_data, - clock_rate_lookup_by_sample_rate, - False, - Path(""), + self.sample_default_cal = Calibration( + calibration_parameters=self.cal_params, + calibration_data=self.cal_data, + calibration_reference="testing", + file_path=self.dummy_default_file_path, ) - cal_data = cal.get_calibration_dict([100.0, 200.0]) - assert cal_data["NF"] == "NF at 100, 200" - def test_get_calibration_dict_within_range(self): - calibration_datetime = datetime.datetime.now() - calibration_params = calibration_params = ["sample_rate", "frequency"] - calibration_data = { - 100.0: {200: {"NF": "NF at 100, 200"}, 300.0: "Cal data at 100,300"}, - 200.0: {100.0: {"NF": "NF at 200, 100"}}, - } - clock_rate_lookup_by_sample_rate = {} - test_cal_path = Path("test_calibration.json") - cal = Calibration( - calibration_datetime, - calibration_params, - calibration_data, - clock_rate_lookup_by_sample_rate, - False, - test_cal_path, + def test_calibration_data_key_name_conversion(self): + """On post-init, all calibration_data key names should be converted to strings.""" + recursive_check_keys(self.sample_cal.calibration_data) + recursive_check_keys(self.sample_default_cal.calibration_data) + + def test_calibration_dataclass_fields(self): + """Check that the dataclass is set up as expected.""" + fields = {f.name: f.type for f in dataclasses.fields(Calibration)} + # Note: does not check field order + assert fields == { + "calibration_parameters": List[str], + "calibration_reference": str, + "calibration_data": dict, + "file_path": Path, + }, "Calibration class fields have changed" + + def test_field_validator(self): + """Check that the input field type validator works.""" + with pytest.raises(TypeError): + _ = Calibration([], {}, "", False, False) + with pytest.raises(TypeError): + _ = Calibration([], {}, "", 100, Path("")) + with pytest.raises(TypeError): + _ = Calibration([], {}, 5, False, Path("")) + with pytest.raises(TypeError): + _ = Calibration([], [10, 20], "", False, Path("")) + with pytest.raises(TypeError): + _ = Calibration({"test": 1}, {}, "", False, Path("")) + + def test_get_calibration_dict(self): + """Check the get_calibration_dict method with all dummy data.""" + for f in self.frequencies: + for g in self.gains: + assert json.loads( + json.dumps(self.cal_data[f][g]) + ) == self.sample_cal.get_calibration_dict({"frequency": f, "gain": g}) + + def test_to_and_from_json(self, tmp_path: Path): + """Test the ``from_json`` factory method.""" + # First save the calibration data to temporary files + self.sample_cal.to_json() + self.sample_default_cal.to_json() + # Then load and compare + assert self.sample_cal == Calibration.from_json(self.dummy_file_path) + assert self.sample_default_cal == Calibration.from_json( + self.dummy_default_file_path ) - with pytest.raises(CalibrationException) as e_info: - cal_data = cal.get_calibration_dict([100.0, 250.0]) - assert e_info.value.args[0] == ( - f"Could not locate calibration data at 250.0" - + f"\nAttempted lookup using key '250.0'" - + f"\nUsing calibration data: {cal.calibration_data}" - ) - def test_sf_bound_points(self): - """Test SF determination at boundary points""" - self.run_pytest_point( - self.srs[0], self.frequency_min, self.gain_min, "Testing boundary points" - ) - self.run_pytest_point( - self.srs[0], self.frequency_max, self.gain_max, "Testing boundary points" + # from_json should ignore extra keys in the loaded file, but not fail + # Test this by trying to load a SensorCalibration as a Calibration + sensor_cal = SensorCalibration( + self.sample_cal.calibration_parameters, + self.sample_cal.calibration_data, + "testing", + tmp_path / "testing.json", + "dt_str", + [], + "uid", ) - - def test_sf_no_interpolation_points(self): - """Test points without interpolation""" - for i in range(4 * self.test_repeat_times): - while True: - g = self.g_s[self.rand_index(self.g_s)] - f = self.f_s[self.rand_index(self.f_s)] - if self.run_pytest_point( - self.srs[0], f, g, "Testing no interpolation points" - ): - break - - def test_update(self): - calibration_datetime = get_datetime_str_now() - calibration_params = ["sample_rate", "frequency"] - calibration_data = {100.0: {200.0: {"noise_figure": 0, "gain": 0}}} - clock_rate_lookup_by_sample_rate = {} - test_cal_path = Path("test_calibration.json") - cal = Calibration( - calibration_datetime, - calibration_params, - calibration_data, - clock_rate_lookup_by_sample_rate, - False, - test_cal_path, - ) - action_params = {"sample_rate": 100.0, "frequency": 200.0} - update_time = get_datetime_str_now() - cal.update(action_params, update_time, 30.0, 5.0, 21) - cal_from_file = load_from_json(test_cal_path, False) - test_cal_path.unlink() - file_utc_time = parse_datetime_iso_format_str(cal.last_calibration_datetime) - cal_time_utc = parse_datetime_iso_format_str(update_time) - assert file_utc_time.year == cal_time_utc.year - assert file_utc_time.month == cal_time_utc.month - assert file_utc_time.day == cal_time_utc.day - assert file_utc_time.hour == cal_time_utc.hour - assert file_utc_time.minute == cal_time_utc.minute - assert cal.calibration_data["100.0"]["200.0"]["gain"] == 30.0 - assert cal.calibration_data["100.0"]["200.0"]["noise_figure"] == 5.0 - assert cal_from_file.calibration_data["100.0"]["200.0"]["gain"] == 30.0 - assert cal_from_file.calibration_data["100.0"]["200.0"]["noise_figure"] == 5.0 - - def test_filter_by_paramter_integer(self): - calibrations = {"200.0": {"some_cal_data"}, 300.0: {"more cal data"}} - filtered_data = filter_by_parameter(calibrations, 200) - assert filtered_data is calibrations["200.0"] + sensor_cal.to_json() + loaded_cal = Calibration.from_json(tmp_path / "testing.json") + loaded_cal.file_path = self.sample_cal.file_path # Force these to be the same + assert loaded_cal == self.sample_cal + + # from_json should fail if required fields are missing + # Create an incorrect JSON file + almost_a_cal = {"calibration_parameters": []} + with open(tmp_path / "almost_a_cal.json", "w") as outfile: + outfile.write(json.dumps(almost_a_cal)) + with pytest.raises(Exception): + almost = Calibration.from_json(tmp_path / "almost_a_cal.json") + + def test_update_not_implemented(self): + """Ensure the update abstract method is not implemented in the base class""" + with pytest.raises(NotImplementedError): + self.sample_cal.update() diff --git a/scos_actions/calibration/tests/test_differential_calibration.py b/scos_actions/calibration/tests/test_differential_calibration.py new file mode 100644 index 00000000..5c9c80ca --- /dev/null +++ b/scos_actions/calibration/tests/test_differential_calibration.py @@ -0,0 +1,44 @@ +"""Test the DifferentialCalibration dataclass.""" + +import json +from pathlib import Path + +import pytest + +from scos_actions.calibration.differential_calibration import DifferentialCalibration + + +class TestDifferentialCalibration: + @pytest.fixture(autouse=True) + def setup_differential_calibration_file(self, tmp_path: Path): + dict_to_json = { + "calibration_parameters": ["frequency"], + "calibration_reference": "antenna input", + "calibration_data": {3555e6: 11.5}, + } + self.valid_file_path = tmp_path / "sample_diff_cal.json" + self.invalid_file_path = tmp_path / "sample_diff_cal_invalid.json" + + self.sample_diff_cal = DifferentialCalibration( + file_path=self.valid_file_path, **dict_to_json + ) + + with open(self.valid_file_path, "w") as f: + f.write(json.dumps(dict_to_json)) + + dict_to_json.pop("calibration_reference", None) + + with open(self.invalid_file_path, "w") as f: + f.write(json.dumps(dict_to_json)) + + def test_from_json(self): + """Check from_json functionality with valid and invalid dummy data.""" + diff_cal = DifferentialCalibration.from_json(self.valid_file_path) + assert diff_cal == self.sample_diff_cal + with pytest.raises(Exception): + _ = DifferentialCalibration.from_json(self.invalid_file_path) + + def test_update_not_implemented(self): + """Check that the update method is not implemented.""" + with pytest.raises(NotImplementedError): + self.sample_diff_cal.update() diff --git a/scos_actions/calibration/tests/test_sensor_calibration.py b/scos_actions/calibration/tests/test_sensor_calibration.py new file mode 100644 index 00000000..bd4c610c --- /dev/null +++ b/scos_actions/calibration/tests/test_sensor_calibration.py @@ -0,0 +1,314 @@ +"""Test the SensorCalibration dataclass.""" + +import dataclasses +import datetime +import json +import random +from copy import deepcopy +from pathlib import Path +from typing import Dict, List + +import pytest + +from scos_actions.calibration.interfaces.calibration import Calibration +from scos_actions.calibration.sensor_calibration import ( + SensorCalibration, + has_expired_cal_data, +) +from scos_actions.calibration.tests.utils import recursive_check_keys +from scos_actions.calibration.utils import CalibrationException +from scos_actions.utils import get_datetime_str_now, parse_datetime_iso_format_str + + +class TestSensorCalibrationFile: + # Ensure we load the test file + setup_complete = False + + def rand_index(self, l): + """Get a random index for a list""" + return random.randint(0, len(l) - 1) + + def check_duplicate(self, sr, f, g): + """Check if a set of points was already tested""" + for pt in self.pytest_points: + duplicate_f = f == pt["frequency"] + duplicate_g = g == pt["setting_value"] + duplicate_sr = sr == pt["sample_rate"] + if duplicate_f and duplicate_g and duplicate_sr: + return True + + @pytest.fixture(autouse=True) + def setup_calibration_file(self, tmp_path: Path): + """ + Create the dummy calibration file in the pytest temp directory + + The gain values in each calibration data entry are set up as being + equal to the gain setting minus ``self.dummy_gain_scale_factor`` + """ + + # Only setup once + if self.setup_complete: + return + + # Create and save the temp directory and file + self.calibration_file = tmp_path / "dummy_cal_file.json" + + # Setup variables + self.dummy_gain_scale_factor = 5 # test data gain values are (gain setting - 5) + self.dummy_noise_figure = 10 + self.dummy_compression = -20 + self.test_repeat_times = 3 + + # Sweep variables + self.sample_rates = [10e6, 15.36e6, 40e6] + self.gain_min = 40 + self.gain_max = 60 + self.gain_step = 10 + gains = list(range(self.gain_min, self.gain_max, self.gain_step)) + [ + self.gain_max + ] + self.frequency_min = 1000000000 + self.frequency_max = 3400000000 + self.frequency_step = 200000000 + frequencies = list( + range(self.frequency_min, self.frequency_max, self.frequency_step) + ) + [self.frequency_max] + frequencies = sorted(frequencies) + + # Start with blank cal data dicts + cal_data = {} + + # Add the simple stuff to new cal format + cal_data["last_calibration_datetime"] = get_datetime_str_now() + cal_data["sensor_uid"] = "SAMPLE_CALIBRATION" + cal_data["calibration_reference"] = "TESTING" + + # Add SR/CF lookup table + cal_data["clock_rate_lookup_by_sample_rate"] = [] + for sr in self.sample_rates: + cr = sr + while cr <= 40e6: + cr *= 2 + cr /= 2 + cal_data["clock_rate_lookup_by_sample_rate"].append( + {"sample_rate": int(sr), "clock_frequency": int(cr)} + ) + + # Create the JSON architecture for the calibration data + cal_data["calibration_data"] = {} + cal_data["calibration_parameters"] = ["sample_rate", "frequency", "gain"] + for k in range(len(self.sample_rates)): + cal_data_f = {} + for i in range(len(frequencies)): + cal_data_g = {} + for j in range(len(gains)): + # Create the data point + cal_data_point = { + "gain": gains[j] - self.dummy_gain_scale_factor, + "noise_figure": self.dummy_noise_figure, + "1dB_compression_point": self.dummy_compression, + } + + # Add the generated dicts to the parent lists + cal_data_g[gains[j]] = deepcopy(cal_data_point) + cal_data_f[frequencies[i]] = deepcopy(cal_data_g) + + cal_data["calibration_data"][self.sample_rates[k]] = deepcopy(cal_data_f) + + # Write the new json file + with open(self.calibration_file, "w+") as file: + json.dump(cal_data, file, indent=4) + + # Load the data back in + self.sample_cal = SensorCalibration.from_json(self.calibration_file) + + # Create a list of previous points to ensure that we don't repeat + self.pytest_points = [] + + # Create sweep lists for test points + self.srs = self.sample_rates + self.gi_s = list(range(self.gain_min, self.gain_max, self.gain_step)) + self.fi_s = list( + range(self.frequency_min, self.frequency_max, self.frequency_step) + ) + self.g_s = self.gi_s + [self.gain_max] + self.f_s = self.fi_s + [self.frequency_max] + + # Don't repeat test setup + self.setup_complete = True + + def test_calibration_data_key_name_conversion(self): + """On post-init, all calibration_data key names should be converted to strings.""" + recursive_check_keys(self.sample_cal.calibration_data) + + def test_sensor_calibration_dataclass_fields(self): + """Check that the dataclass fields are as expected.""" + fields = { + f.name: f.type + for f in dataclasses.fields(SensorCalibration) + if f not in dataclasses.fields(Calibration) + } + # Note: does not check field order + assert fields == { + "last_calibration_datetime": str, + "clock_rate_lookup_by_sample_rate": List[Dict[str, float]], + "sensor_uid": str, + } + + def test_field_validator(self): + """Check that the input field type validator works.""" + # only check fields not inherited from Calibration base class + with pytest.raises(TypeError): + _ = SensorCalibration([], {}, False, Path(""), "dt_str", [], 10) + with pytest.raises(TypeError): + _ = SensorCalibration([], {}, False, Path(""), "dt_str", {}, "uid") + with pytest.raises(TypeError): + _ = SensorCalibration( + [], {}, False, Path(""), datetime.datetime.now(), [], "uid" + ) + + def test_get_clock_rate(self): + """Test the get_clock_rate method""" + # Test getting a clock rate by sample rate + assert self.sample_cal.get_clock_rate(10e6) == 40e6 + # If there isn't an entry, the sample rate should be returned + assert self.sample_cal.get_clock_rate(-999) == -999 + + def test_get_calibration_dict_exact_match_lookup(self): + calibration_datetime = get_datetime_str_now() + calibration_params = ["sample_rate", "frequency"] + calibration_data = { + 100.0: {200.0: {"NF": "NF at 100, 200", "Gain": "Gain at 100, 200"}}, + 200.0: {100.0: {"NF": "NF at 200, 100", "Gain": "Gain at 200, 100"}}, + } + cal = SensorCalibration( + calibration_parameters=calibration_params, + calibration_data=calibration_data, + calibration_reference="testing", + file_path=Path(""), + last_calibration_datetime=calibration_datetime, + clock_rate_lookup_by_sample_rate=[], + sensor_uid="TESTING", + ) + cal_data = cal.get_calibration_dict({"sample_rate": 100.0, "frequency": 200.0}) + assert cal_data["NF"] == "NF at 100, 200" + + def test_get_calibration_dict_within_range(self): + calibration_datetime = get_datetime_str_now() + calibration_params = calibration_params = ["sample_rate", "frequency"] + calibration_data = { + 100.0: {200: {"NF": "NF at 100, 200"}, 300.0: "Cal data at 100,300"}, + 200.0: {100.0: {"NF": "NF at 200, 100"}}, + } + cal = SensorCalibration( + calibration_parameters=calibration_params, + calibration_data=calibration_data, + calibration_reference="testing", + file_path=Path("test_calibration.json"), + last_calibration_datetime=calibration_datetime, + clock_rate_lookup_by_sample_rate=[], + sensor_uid="TESTING", + ) + + lookup_fail_value = 250.0 + with pytest.raises(CalibrationException) as e_info: + _ = cal.get_calibration_dict( + {"sample_rate": 100.0, "frequency": lookup_fail_value} + ) + assert e_info.value.args[0] == ( + f"Could not locate calibration data at {lookup_fail_value}" + + "\nAttempted lookup using keys: " + + f"\n\tstr({lookup_fail_value}).lower() = {str(lookup_fail_value).lower()}" + + f"\n\tstr(int({lookup_fail_value})) = {int(lookup_fail_value)}" + + f"\nUsing calibration data: {cal.calibration_data['100.0']}" + ) + + def test_update(self): + calibration_datetime = "2024-03-17T19:16:55.172Z" + calibration_params = ["sample_rate", "frequency"] + calibration_data = {100.0: {200.0: {"noise_figure": 0, "gain": 0}}} + test_cal_path = Path("test_calibration.json") + cal = SensorCalibration( + calibration_parameters=calibration_params, + calibration_data=calibration_data, + calibration_reference="testing", + file_path=test_cal_path, + last_calibration_datetime=calibration_datetime, + clock_rate_lookup_by_sample_rate=[], + sensor_uid="TESTING", + ) + action_params = {"sample_rate": 100.0, "frequency": 200.0} + update_time = get_datetime_str_now() + cal.update(action_params, update_time, 30.0, 5.0, 21) + cal_from_file = SensorCalibration.from_json(test_cal_path) + test_cal_path.unlink() + file_utc_time = parse_datetime_iso_format_str(cal.last_calibration_datetime) + cal_time_utc = parse_datetime_iso_format_str(update_time) + assert file_utc_time.year == cal_time_utc.year + assert file_utc_time.month == cal_time_utc.month + assert file_utc_time.day == cal_time_utc.day + assert file_utc_time.hour == cal_time_utc.hour + assert file_utc_time.minute == cal_time_utc.minute + assert cal.calibration_data["100.0"]["200.0"]["gain"] == 30.0 + assert cal.calibration_data["100.0"]["200.0"]["noise_figure"] == 5.0 + assert cal_from_file.calibration_data["100.0"]["200.0"]["gain"] == 30.0 + assert cal_from_file.calibration_data["100.0"]["200.0"]["noise_figure"] == 5.0 + + def test_has_expired_cal_data_not_expired(self): + cal_date = "2024-03-14T15:48:38.039Z" + now_date = "2024-03-14T15:49:38.039Z" + cal_data = { + "3550": { + "20.0": { + "false": {"datetime": cal_date}, + }, + } + } + expired = has_expired_cal_data( + cal_data, parse_datetime_iso_format_str(now_date), 100 + ) + assert expired == False + + def test_has_expired_cal_data_expired(self): + cal_date = "2024-03-14T15:48:38.039Z" + now_date = "2024-03-14T15:49:38.039Z" + cal_data = { + "3550": { + "20.0": { + "false": {"datetime": cal_date}, + }, + } + } + expired = has_expired_cal_data( + cal_data, parse_datetime_iso_format_str(now_date), 30 + ) + assert expired == True + + def test_has_expired_cal_data_multipledates_expired(self): + cal_date_1 = "2024-03-14T15:48:38.039Z" + cal_date_2 = "2024-03-14T15:40:38.039Z" + now_date = "2024-03-14T15:49:38.039Z" + cal_data = { + "3550": { + "20.0": { + "false": {"datetime": cal_date_1}, + }, + "true": {"datetime": cal_date_2}, + } + } + expired = has_expired_cal_data( + cal_data, parse_datetime_iso_format_str(now_date), 100 + ) + assert expired == True + cal_data = { + "3550": { + "20.0": { + "false": {"datetime": cal_date_2}, + }, + "true": {"datetime": cal_date_1}, + } + } + expired = has_expired_cal_data( + cal_data, parse_datetime_iso_format_str(now_date), 100 + ) + assert expired == True diff --git a/scos_actions/calibration/tests/test_utils.py b/scos_actions/calibration/tests/test_utils.py new file mode 100644 index 00000000..8b6eefcf --- /dev/null +++ b/scos_actions/calibration/tests/test_utils.py @@ -0,0 +1,52 @@ +import pytest + +from scos_actions.calibration.utils import CalibrationException, filter_by_parameter + + +class TestCalibrationUtils: + def test_filter_by_parameter_out_of_range(self): + calibrations = {200.0: {"some_cal_data"}, 300.0: {"more cal data"}} + + # Also checks error output when missing value is an integer + test_value = 400 + with pytest.raises(CalibrationException) as e_info: + _ = filter_by_parameter(calibrations, test_value) + assert ( + e_info.value.args[0] + == f"Could not locate calibration data at {test_value}" + + "\nAttempted lookup using keys: " + + f"\n\tstr({test_value}).lower() = {str(test_value).lower()}" + + f"\n\tstr(float({test_value})) = {float(test_value)}" + + f"\nUsing calibration data: {calibrations}" + ) + + def test_filter_by_parameter_in_range_requires_match(self): + calibrations = { + 200.0: {"Gain": "Gain at 200.0"}, + 300.0: {"Gain": "Gain at 300.0"}, + } + + # Check looking up a missing value with a float + test_value = 150.0 + with pytest.raises(CalibrationException) as e_info: + _ = filter_by_parameter(calibrations, test_value) + assert e_info.value.args[0] == ( + f"Could not locate calibration data at {test_value}" + + "\nAttempted lookup using keys: " + + f"\n\tstr({test_value}).lower() = {str(test_value).lower()}" + + f"\n\tstr(int({test_value})) = {int(test_value)}" + + f"\nUsing calibration data: {calibrations}" + ) + + def test_filter_by_paramter_integer(self): + calibrations = {"200.0": {"some_cal_data"}, 300.0: {"more cal data"}} + filtered_data = filter_by_parameter(calibrations, 200) + assert filtered_data is calibrations["200.0"] + + def test_filter_by_parameter_type_error(self): + calibrations = [300.0, 400.0] + with pytest.raises(CalibrationException) as e_info: + _ = filter_by_parameter(calibrations, 300.0) + assert e_info.value.args[0] == ( + f"Provided calibration data is not a dict: {calibrations}" + ) diff --git a/scos_actions/calibration/tests/utils.py b/scos_actions/calibration/tests/utils.py new file mode 100644 index 00000000..a2ad6e51 --- /dev/null +++ b/scos_actions/calibration/tests/utils.py @@ -0,0 +1,10 @@ +"""Utility functions used for scos_sensor.calibration unit tests.""" + + +def recursive_check_keys(d: dict): + """Recursively checks a dict to check that all keys are strings""" + for k, v in d.items(): + if isinstance(v, dict): + recursive_check_keys(v) + else: + assert isinstance(k, str) diff --git a/scos_actions/calibration/utils.py b/scos_actions/calibration/utils.py new file mode 100644 index 00000000..8e6a21dd --- /dev/null +++ b/scos_actions/calibration/utils.py @@ -0,0 +1,79 @@ +from typing import Union + + +class CalibrationException(Exception): + """Basic exception handling for calibration functions.""" + + def __init__(self, msg): + super().__init__(msg) + + +class CalibrationEntryMissingException(CalibrationException): + """Raised when filter_by_parameter cannot locate calibration data.""" + + def __init__(self, msg): + super().__init__(msg) + + +class CalibrationParametersMissingException(CalibrationException): + """Raised when a dictionary does not contain all calibration parameters as keys.""" + + def __init__(self, provided_dict: dict, required_keys: list): + msg = ( + "Missing required parameters to lookup calibration data.\n" + + f"Required parameters are {required_keys}\n" + + f"Provided parameters are {list(provided_dict.keys())}" + ) + super().__init__(msg) + + +def filter_by_parameter(calibrations: dict, value: Union[float, int, bool]) -> dict: + """ + Select a certain element by the value of a top-level key in a dictionary. + + This method should be recursively called to select calibration + data matching a set of calibration parameters. The ordering of + nested dictionaries should match the ordering of the required + calibration parameters in the calibration file. + + If ``value`` is a float or bool, ``str(value).lower()`` is used + as the dictionary key. If ``value`` is an int, and the previous + approach does not work, ``str(float(value))`` is attempted. This + allows for value ``1`` to match a key ``"1.0"``, or a value of + ``1.0`` to match a key ``"1"``. + + :param calibrations: Calibration data dictionary. + :param value: The parameter value for filtering. This value should + exist as a top-level key in ``calibrations``. + :raises CalibrationException: If ``value`` cannot be matched to a + top-level key in ``calibrations``, or if ``calibrations`` is not + a dict. + :return: The value of ``calibrations[value]``, which should be a dict. + """ + try: + filtered_data = calibrations.get(str(value).lower(), None) + attempts = f"\n\tstr({value}).lower() = {str(value).lower()}" + if filtered_data is None and isinstance(value, int): + # Try equivalent float for ints, i.e., match "1.0" to 1 + filtered_data = calibrations.get(str(float(value)), None) + attempts += f"\n\tstr(float({value})) = {str(float(value))}" + if filtered_data is None and isinstance(value, float) and value.is_integer(): + # Check for, e.g., key '25' if value is '25.0' + filtered_data = calibrations.get(str(int(value)), None) + attempts += f"\n\tstr(int({value})) = {str(int(value))}" + if filtered_data is None: + raise KeyError + else: + return filtered_data + except AttributeError: + # calibrations does not have ".get()" + # Generally means that calibrations is None or not a dict + msg = f"Provided calibration data is not a dict: {calibrations}" + raise CalibrationException(msg) + except KeyError: + msg = ( + f"Could not locate calibration data at {value}" + + f"\nAttempted lookup using keys: {attempts}" + + f"\nUsing calibration data: {calibrations}" + ) + raise CalibrationEntryMissingException(msg) diff --git a/scos_actions/hardware/mocks/mock_sensor.py b/scos_actions/hardware/mocks/mock_sensor.py new file mode 100644 index 00000000..529919de --- /dev/null +++ b/scos_actions/hardware/mocks/mock_sensor.py @@ -0,0 +1,59 @@ +import logging + +from scos_actions.hardware.mocks.mock_gps import MockGPS +from scos_actions.hardware.mocks.mock_sigan import MockSignalAnalyzer +from scos_actions.hardware.sensor import Sensor +from scos_actions.utils import get_datetime_str_now + +_mock_sensor_cal_data = { + "datetime": get_datetime_str_now(), + "gain": 0, + "enbw": None, + "noise_figure": None, + "1db_compression_point": None, + "temperature": 26.85, +} + +_mock_differential_cal_data = {"loss": 10.0} + +_mock_capabilities = {"sensor": {}} + +_mock_location = {"x": -999, "y": -999, "z": -999, "description": "Testing"} + +logger = logging.getLogger(__name__) + + +class MockSensor(Sensor): + def __init__( + self, + signal_analyzer=MockSignalAnalyzer(), + gps=MockGPS(), + preselector=None, + switches={}, + location=_mock_location, + capabilities=_mock_capabilities, + sensor_cal=None, + differential_cal=None, + ): + if (sensor_cal is not None) or (differential_cal is not None): + logger.warning( + "Calibration object provided to mock sensor will not be used to query calibration data." + ) + super().__init__( + signal_analyzer=signal_analyzer, + gps=gps, + preselector=preselector, + switches=switches, + location=location, + capabilities=capabilities, + sensor_cal=sensor_cal, + differential_cal=differential_cal, + ) + + @property + def sensor_calibration_data(self): + return _mock_sensor_cal_data + + @property + def differential_calibration_data(self): + return _mock_differential_cal_data diff --git a/scos_actions/hardware/mocks/mock_sigan.py b/scos_actions/hardware/mocks/mock_sigan.py index f102d03d..018c1e30 100644 --- a/scos_actions/hardware/mocks/mock_sigan.py +++ b/scos_actions/hardware/mocks/mock_sigan.py @@ -8,7 +8,6 @@ from scos_actions import __package__ as SCOS_ACTIONS_NAME from scos_actions import __version__ as SCOS_ACTIONS_VERSION -from scos_actions.calibration.calibration import Calibration from scos_actions.hardware.sigan_iface import SignalAnalyzerInterface from scos_actions.utils import get_datetime_str_now @@ -30,31 +29,11 @@ class MockSignalAnalyzer(SignalAnalyzerInterface): def __init__( self, - sensor_cal: Optional[Calibration] = None, - sigan_cal: Optional[Calibration] = None, switches: Optional[dict] = None, randomize_values: bool = False, ): - super().__init__(sensor_cal, sigan_cal, switches) - # Define the default calibration dicts - self.DEFAULT_SIGAN_CALIBRATION = { - "datetime": get_datetime_str_now(), - "gain": 0, # Defaults to gain setting - "enbw": None, # Defaults to sample rate - "noise_figure": 0, - "1db_compression_point": 100, - "temperature": 26.85, - } - - self.DEFAULT_SENSOR_CALIBRATION = { - "datetime": get_datetime_str_now(), - "gain": 0, # Defaults to sigan gain - "enbw": None, # Defaults to sigan enbw - "noise_figure": None, # Defaults to sigan noise figure - "1db_compression_point": None, # Defaults to sigan compression + preselector gain - "temperature": 26.85, - } - self.auto_dc_offset = False + super().__init__(switches) + self._model = "Mock Signal Analyzer" self._frequency = 700e6 self._sample_rate = 10e6 self.clock_rate = 40e6 @@ -62,8 +41,6 @@ def __init__( self._attenuation = 0 self._preamp_enable = False self._reference_level = -30 - self._overload = False - self._capture_time = None self._is_available = True self._plugin_version = SCOS_ACTIONS_VERSION self._plugin_name = SCOS_ACTIONS_NAME @@ -76,8 +53,6 @@ def __init__( self.times_failed_recv = 0 self.randomize_values = randomize_values - self.sensor_calibration_data = self.DEFAULT_SENSOR_CALIBRATION - self.sigan_calibration_data = self.DEFAULT_SIGAN_CALIBRATION @property def is_available(self): @@ -87,19 +62,6 @@ def is_available(self): def plugin_version(self): return self._plugin_version - @property - def plugin_name(self) -> str: - """Returns the current package name of scos-actions.""" - return self._plugin_name - - @property - def firmware_version(self): - return self._firmware_version - - @property - def api_version(self): - return self._api_version - @property def sample_rate(self): return self._sample_rate @@ -155,81 +117,47 @@ def connect(self): pass def acquire_time_domain_samples( - self, num_samples, num_samples_skip=0, retries=5, cal_adjust=True - ): + self, num_samples: int, num_samples_skip: int = 0 + ) -> dict: logger.warning("Using mock signal analyzer!") - self.sigan_overload = False - self._capture_time = None - self._num_samples_skip = num_samples_skip + overload = False + capture_time = None # Try to acquire the samples - max_retries = retries data = [] - while True: - if self.times_failed_recv < self.times_to_fail_recv: - self.times_failed_recv += 1 - data = np.ones(0, dtype=np.complex64) - else: - self._capture_time = get_datetime_str_now() - if self.randomize_values: - i = np.random.normal(0.5, 0.5, num_samples) - q = np.random.normal(0.5, 0.5, num_samples) - rand_iq = np.empty(num_samples, dtype=np.complex64) - rand_iq.real = i - rand_iq.imag = q - data = rand_iq - else: - data = np.ones(num_samples, dtype=np.complex64) - - data_len = len(data) - if not len(data) == num_samples: - if retries > 0: - msg = "Signal analyzer error: requested {} samples, but got {}." - logger.warning(msg.format(num_samples + num_samples_skip, data_len)) - logger.warning(f"Retrying {retries} more times.") - retries = retries - 1 - else: - err = "Failed to acquire correct number of samples " - err += f"{max_retries} times in a row." - raise RuntimeError(err) + if self.times_failed_recv < self.times_to_fail_recv: + self.times_failed_recv += 1 + data = np.ones(0, dtype=np.complex64) + else: + capture_time = get_datetime_str_now() + if self.randomize_values: + i = np.random.normal(0.5, 0.5, num_samples) + q = np.random.normal(0.5, 0.5, num_samples) + rand_iq = np.empty(num_samples, dtype=np.complex64) + rand_iq.real = i + rand_iq.imag = q + data = rand_iq else: - logger.debug(f"Successfully acquired {num_samples} samples.") - return { - "data": data, - "overload": self._overload, - "frequency": self._frequency, - "gain": self._gain, - "attenuation": self._attenuation, - "preamp_enable": self._preamp_enable, - "reference_level": self._reference_level, - "sample_rate": self._sample_rate, - "capture_time": self._capture_time, - } + data = np.ones(num_samples, dtype=np.complex64) + + if (data_len := len(data)) != num_samples: + err = "Failed to acquire correct number of samples: " + err += f"got {data_len} instead of {num_samples}" + raise RuntimeError(err) + else: + logger.debug(f"Successfully acquired {num_samples} samples.") + return { + "data": data, + "overload": overload, + "frequency": self._frequency, + "gain": self._gain, + "attenuation": self._attenuation, + "preamp_enable": self._preamp_enable, + "reference_level": self._reference_level, + "sample_rate": self._sample_rate, + "capture_time": capture_time, + } def set_times_to_fail_recv(self, n): self.times_to_fail_recv = n self.times_failed_recv = 0 - - @property - def last_calibration_time(self): - return get_datetime_str_now() - - def update_calibration(self, params): - pass - - def recompute_sensor_calibration_data(self, cal_args: list) -> None: - if self.sensor_calibration is not None: - self.sensor_calibration_data.update( - self.sensor_calibration.get_calibration_dict(cal_args) - ) - else: - logger.warning("Sensor calibration does not exist.") - - def recompute_sigan_calibration_data(self, cal_args: list) -> None: - """Set the sigan calibration data based on the current tuning""" - if self.sigan_calibration is not None: - self.sigan_calibration_data.update( - self.sigan_calibration.get_calibration_dict(cal_args) - ) - else: - logger.warning("Sigan calibration does not exist.") diff --git a/scos_actions/hardware/sensor.py b/scos_actions/hardware/sensor.py index b9e14806..47198b93 100644 --- a/scos_actions/hardware/sensor.py +++ b/scos_actions/hardware/sensor.py @@ -2,13 +2,19 @@ import hashlib import json import logging -from typing import Dict, Optional +from typing import Any, Dict, List, Optional from its_preselector.preselector import Preselector from its_preselector.web_relay import WebRelay -from .gps_iface import GPSInterface -from .sigan_iface import SignalAnalyzerInterface +from scos_actions.calibration.differential_calibration import DifferentialCalibration +from scos_actions.calibration.interfaces.calibration import Calibration +from scos_actions.calibration.sensor_calibration import SensorCalibration +from scos_actions.hardware.gps_iface import GPSInterface +from scos_actions.hardware.sigan_iface import SignalAnalyzerInterface +from scos_actions.utils import convert_string_to_millisecond_iso_format + +logger = logging.getLogger(__name__) class Sensor: @@ -33,6 +39,8 @@ def __init__( preselector: Optional[Preselector] = None, switches: Optional[Dict[str, WebRelay]] = {}, location: Optional[dict] = None, + sensor_cal: Optional[SensorCalibration] = None, + differential_cal: Optional[DifferentialCalibration] = None, ): self.signal_analyzer = signal_analyzer self.gps = gps @@ -40,15 +48,19 @@ def __init__( self.switches = switches self.location = location self.capabilities = capabilities + self._sensor_calibration_data = {} + self._sensor_calibration = sensor_cal + self._differential_calibration_data = {} + self._differential_calibration = differential_cal # There is no setter for start_time property self._start_time = datetime.datetime.utcnow() @property - def signal_analyzer(self) -> SignalAnalyzerInterface: + def signal_analyzer(self) -> Optional[SignalAnalyzerInterface]: return self._signal_analyzer @signal_analyzer.setter - def signal_analyzer(self, sigan: SignalAnalyzerInterface): + def signal_analyzer(self, sigan: Optional[SignalAnalyzerInterface]): self._signal_analyzer = sigan @property @@ -66,7 +78,7 @@ def gps(self, gps: GPSInterface): self._gps = gps @property - def preselector(self) -> Preselector: + def preselector(self) -> Optional[Preselector]: """ RF front end that may include calibration sources, filters, and/or amplifiers. """ @@ -92,7 +104,7 @@ def switches(self, switches: Dict[str, WebRelay]): self._switches = switches @property - def location(self) -> dict: + def location(self) -> Optional[dict]: """ The GeoJSON dictionary of the sensor's location. """ @@ -106,7 +118,7 @@ def location(self, loc: dict): self._location = loc @property - def capabilities(self) -> dict: + def capabilities(self) -> Optional[dict]: """ A dictionary of the sensor's capabilities. The dictionary should include a 'sensor' key that maps to the ntia-sensor @@ -173,5 +185,194 @@ def has_configurable_preselector(self) -> bool: return False @property - def start_time(self): + def start_time(self) -> datetime.datetime: return self._start_time + + @property + def sensor_calibration(self) -> Optional[SensorCalibration]: + return self._sensor_calibration + + @sensor_calibration.setter + def sensor_calibration(self, cal: Optional[SensorCalibration]): + self._sensor_calibration = cal + + @property + def differential_calibration(self) -> Optional[DifferentialCalibration]: + return self._differential_calibration + + @differential_calibration.setter + def differential_calibration(self, cal: Optional[DifferentialCalibration]): + self._differential_calibration = cal + + @property + def last_calibration_time(self) -> str: + """A datetime string for the most recent sensor calibration.""" + return convert_string_to_millisecond_iso_format( + self.sensor_calibration.last_calibration_datetime + ) + + @property + def sensor_calibration_data(self) -> Dict[str, Any]: + """Sensor calibration data for the current sensor settings.""" + return self._sensor_calibration_data + + @property + def differential_calibration_data(self) -> Dict[str, float]: + """Differential calibration data for the current sensor settings.""" + return self._differential_calibration_data + + def recompute_calibration_data(self, params: dict) -> None: + """ + Set the differential_calibration_data and sensor_calibration_data + based on the specified ``params``. + """ + recomputed = False + if self.differential_calibration is not None: + self._differential_calibration_data.update( + self.differential_calibration.get_calibration_dict(params) + ) + recomputed = True + else: + logger.debug("No differential calibration available to recompute") + + if self.sensor_calibration is not None: + self._sensor_calibration_data.update( + self.sensor_calibration.get_calibration_dict(params) + ) + recomputed = True + else: + logger.debug("No sensor calibration available to recompute") + + if not recomputed: + logger.warning("Failed to recompute calibration data") + + def acquire_time_domain_samples( + self, + num_samples: int, + num_samples_skip: int = 0, + retries: int = 5, + cal_adjust: bool = True, + cal_params: Optional[dict] = None, + ) -> dict: + """ + Acquire time-domain IQ samples from the signal analyzer. + + Signal analyzer settings, preselector state, etc. should already be + set before calling this function. + + Gain adjustment can be applied to acquired samples using ``cal_adjust``. + If ``True``, the samples acquired from the signal analyzer will be + scaled based on the calibrated ``gain`` and ``loss`` values in + the ``SensorCalibration`` and ``DifferentialCalibration.`` + If no ``DifferentialCalibration`` exists, "calibration terminal" + will be the value of the "reference" key in the + returned dict. If a ``DifferentialCalibration`` exists, the gain and + noise figure will be adjusted with the loss specified in the + ``DifferentialCalibration`` and the "reference" will be set to the + calibration_reference of the ``DifferentialCalibration``. + + :param num_samples: Number of samples to acquire + :param num_samples_skip: Number of samples to skip + :param retries: Maximum number of retries on failure + :param cal_adjust: If True, use available calibration data to scale the samples. + :param cal_params: A dictionary with keys for all of the calibration parameters. + May contain additional keys. Example: ``{"sample_rate": 14000000.0, "gain": 10.0}`` + Must be specified if ``cal_adjust`` is ``True``. Otherwise, ignored. + :return: dictionary containing data, sample_rate, frequency, capture_time, etc + :raises Exception: If the sample acquisition fails, or the sensor has + no signal analyzer. + """ + logger.debug("***********************************\n") + logger.debug("Sensor.acquire_time_domain_samples starting") + logger.debug(f"Number of retries = {retries}") + logger.debug("*************************************\n") + + max_retries = retries + # Acquire samples from signal analyzer + if self.signal_analyzer is not None: + while True: + try: + measurement_result = ( + self.signal_analyzer.acquire_time_domain_samples( + num_samples, num_samples_skip + ) + ) + break + except BaseException as e: + retries -= 1 + logger.info("Error while acquiring samples from signal analyzer.") + if retries == 0: + logger.exception( + "Failed to acquire samples from signal analyzer. " + + f"Tried {max_retries} times." + ) + raise e + else: + msg = "Failed to acquire samples: sensor has no signal analyzer" + logger.error(msg) + raise Exception(msg) + + # Apply gain adjustment based on calibration + if cal_adjust: + if cal_params is None: + raise ValueError( + "Data scaling cannot occur without specified calibration parameters." + ) + if self.sensor_calibration is not None: + logger.debug("Scaling samples. Fetching calibration data.") + self.recompute_calibration_data(cal_params) + if self.differential_calibration is not None: + logger.debug( + f"USING DIFF. CAL: {self.differential_calibration.calibration_data}" + ) + if self.sensor_calibration is not None: + logger.debug( + f"USING SENSOR CAL: {self.sensor_calibration.calibration_data}" + ) + calibrated_gain__db = self.sensor_calibration_data["gain"] + calibrated_nf__db = self.sensor_calibration_data["noise_figure"] + logger.debug(f"Using sensor gain: {calibrated_gain__db} dB") + measurement_result["reference"] = ( + self.sensor_calibration.calibration_reference + ) + if self.differential_calibration is not None: + # Also apply differential calibration correction + differential_loss = self.differential_calibration_data["loss"] + logger.debug(f"Using differential loss: {differential_loss} dB") + calibrated_gain__db -= differential_loss + calibrated_nf__db += differential_loss + measurement_result["reference"] = ( + self.differential_calibration.calibration_reference + ) + + else: + # No differential calibration exists + logger.debug("No differential calibration was applied") + + linear_gain = 10.0 ** (calibrated_gain__db / 20.0) + logger.debug(f"Applying total gain of {calibrated_gain__db}") + measurement_result["data"] /= linear_gain + + # Metadata: record the gain and noise figure based on the actual + # scaling which was used. + measurement_result["applied_calibration"] = { + "gain": calibrated_gain__db, + "noise_figure": calibrated_nf__db, + } + if "compression_point" in self.sensor_calibration_data: + measurement_result["applied_calibration"]["compression_point"] = ( + self.sensor_calibration_data["compression_point"] + ) + applied_cal = measurement_result["applied_calibration"] + logger.debug(f"Setting applied_calibration to: {applied_cal}") + else: + # No sensor calibration exists + msg = "Unable to scale samples without sensor calibration data" + logger.error(msg) + raise Exception(msg) + else: + # Set the data reference in the measurement_result + measurement_result["reference"] = "signal analyzer input" + measurement_result["applied_calibration"] = None + + return measurement_result diff --git a/scos_actions/hardware/sigan_iface.py b/scos_actions/hardware/sigan_iface.py index 0b2f1d05..5f0df68f 100644 --- a/scos_actions/hardware/sigan_iface.py +++ b/scos_actions/hardware/sigan_iface.py @@ -5,45 +5,21 @@ from its_preselector.web_relay import WebRelay -from scos_actions.calibration.calibration import Calibration from scos_actions.hardware.utils import power_cycle_sigan -from scos_actions.utils import convert_string_to_millisecond_iso_format logger = logging.getLogger(__name__) -# All setting names for all supported sigans -SIGAN_SETTINGS_KEYS = [ - "sample_rate", - "frequency", - "gain", - "attenuation", - "reference_level", - "preamp_enable", -] - - class SignalAnalyzerInterface(ABC): def __init__( self, - sensor_cal: Optional[Calibration] = None, - sigan_cal: Optional[Calibration] = None, switches: Optional[Dict[str, WebRelay]] = None, ): - self.sensor_calibration_data = {} - self.sigan_calibration_data = {} - self._sensor_calibration = sensor_cal - self._sigan_calibration = sigan_cal self._model = "Unknown" + self._api_version = "Unknown" + self._firmware_version = "Unknown" self.switches = switches - @property - def last_calibration_time(self) -> str: - """Returns the last calibration time from calibration data.""" - return convert_string_to_millisecond_iso_format( - self.sensor_calibration.last_calibration_datetime - ) - @property @abstractmethod def is_available(self) -> bool: @@ -65,28 +41,25 @@ def plugin_name(self) -> str: @property def firmware_version(self) -> str: """Returns the version of the signal analyzer firmware.""" - return "Unknown" + return self._firmware_version @property def api_version(self) -> str: """Returns the version of the underlying signal analyzer API.""" - return "Unknown" + return self._api_version @abstractmethod def acquire_time_domain_samples( self, num_samples: int, num_samples_skip: int = 0, - retries: int = 5, - cal_adjust: bool = True, ) -> dict: """ - Acquire time domain IQ samples + Acquire time domain IQ samples, scaled to Volts at + the signal analyzer input. :param num_samples: Number of samples to acquire :param num_samples_skip: Number of samples to skip - :param retries: Maximum number of retries on failure - :param cal_adjust: If True, scale IQ samples based on calibration data. :return: dictionary containing data, sample_rate, frequency, capture_time, etc """ pass @@ -104,9 +77,7 @@ def healthy(self, num_samples: int = 56000) -> bool: if not self.is_available: return False try: - measurement_result = self.acquire_time_domain_samples( - num_samples, cal_adjust=False - ) + measurement_result = self.acquire_time_domain_samples(num_samples) data = measurement_result["data"] except Exception as e: logger.exception("Unable to acquire samples from device.") @@ -142,25 +113,6 @@ def power_cycle_and_connect(self, sleep_time: float = 2.0) -> None: ) return - def recompute_sensor_calibration_data(self, cal_args: list) -> None: - self.sensor_calibration_data = {} - if self.sensor_calibration is not None: - self.sensor_calibration_data.update( - self.sensor_calibration.get_calibration_dict(cal_args) - ) - else: - logger.warning("Sensor calibration does not exist.") - - def recompute_sigan_calibration_data(self, cal_args: list) -> None: - self.sigan_calibration_data = {} - """Set the sigan calibration data based on the current tuning""" - if self.sigan_calibration is not None: - self.sigan_calibration_data.update( - self.sigan_calibration.get_calibration_dict(cal_args) - ) - else: - logger.warning("Sigan calibration does not exist.") - def get_status(self) -> dict: return {"model": self._model, "healthy": self.healthy()} @@ -171,19 +123,3 @@ def model(self) -> str: @model.setter def model(self, value: str): self._model = value - - @property - def sensor_calibration(self) -> Calibration: - return self._sensor_calibration - - @sensor_calibration.setter - def sensor_calibration(self, cal: Calibration): - self._sensor_calibration = cal - - @property - def sigan_calibration(self) -> Calibration: - return self._sigan_calibration - - @sigan_calibration.setter - def sigan_calibration(self, cal: Calibration): - self._sigan_calibration = cal diff --git a/scos_actions/hardware/tests/test_sensor.py b/scos_actions/hardware/tests/test_sensor.py index 597f88ce..1c3eac95 100644 --- a/scos_actions/hardware/tests/test_sensor.py +++ b/scos_actions/hardware/tests/test_sensor.py @@ -1,74 +1,122 @@ +import datetime + +import pytest from its_preselector.controlbyweb_web_relay import ControlByWebWebRelay from its_preselector.web_relay_preselector import WebRelayPreselector from scos_actions.hardware.mocks.mock_gps import MockGPS +from scos_actions.hardware.mocks.mock_sensor import ( + MockSensor, + _mock_capabilities, + _mock_differential_cal_data, + _mock_location, + _mock_sensor_cal_data, +) from scos_actions.hardware.mocks.mock_sigan import MockSignalAnalyzer -from scos_actions.hardware.sensor import Sensor -def test_sensor(): - sigan = MockSignalAnalyzer() - sensor = Sensor(signal_analyzer=sigan, capabilities={}, gps=MockGPS(sigan)) - assert sensor is not None - assert sensor.signal_analyzer is not None - assert sensor.gps is not None +@pytest.fixture +def mock_sensor(): + sensor = MockSensor() + return sensor + + +def test_mock_sensor_defaults(mock_sensor): + assert isinstance(mock_sensor.signal_analyzer, MockSignalAnalyzer) + assert isinstance(mock_sensor.gps, MockGPS) + assert mock_sensor.preselector is None + assert mock_sensor.switches == {} + assert mock_sensor.location == _mock_location + assert mock_sensor.capabilities == _mock_capabilities + assert mock_sensor.sensor_calibration is None + assert mock_sensor.differential_calibration is None + assert mock_sensor.has_configurable_preselector is False + assert mock_sensor.has_configurable_preselector is False + assert mock_sensor.sensor_calibration_data == _mock_sensor_cal_data + assert mock_sensor.differential_calibration_data == _mock_differential_cal_data + assert isinstance(mock_sensor.start_time, datetime.datetime) + + +def test_set_get_sigan(mock_sensor): + mock_sigan = MockSignalAnalyzer() + mock_sensor.signal_analyzer = mock_sigan + assert mock_sensor.signal_analyzer == mock_sigan + + +def test_set_get_gps(mock_sensor): + mock_gps = MockGPS() + mock_sensor.gps = mock_gps + assert mock_sensor.gps == mock_gps + + +def test_set_get_preselector(mock_sensor): + mock_preselector = WebRelayPreselector( + {}, {"name": "mock_preselector", "base_url": "url"} + ) + mock_sensor.preselector = mock_preselector + assert mock_sensor.preselector == mock_preselector + + +def test_set_get_switches(mock_sensor): + mock_switches = { + "mock": ControlByWebWebRelay({"name": "mock_switch", "base_url": "url"}) + } + mock_sensor.switches = mock_switches + assert mock_sensor.switches == mock_switches + + +def test_set_get_location(mock_sensor): + mock_location = {"x": 0, "y": 0, "z": 0, "description": "Test"} + mock_sensor.location = mock_location + assert mock_sensor.location == mock_location -def test_set_get_preselector(): - preselector = WebRelayPreselector({}, {"name": "preselector", "base_url": "url"}) - sensor = Sensor(signal_analyzer=MockSignalAnalyzer(), capabilities={}) - sensor.preselector = preselector - assert sensor.preselector == preselector +def test_set_get_capabilities(mock_sensor): + mock_capabilities = {"fake": "capabilities"} + mock_sensor.capabilities = mock_capabilities + assert mock_sensor.capabilities == mock_capabilities -def test_set_get_gps(): - sigan = MockSignalAnalyzer() - gps = MockGPS() - sensor = Sensor(signal_analyzer=sigan, capabilities={}) - sensor.gps = gps - assert sensor.gps == gps +def test_set_get_sensor_calibration(mock_sensor): + assert mock_sensor.sensor_calibration is None -def test_set_get_switches(): - switches = {"spu": ControlByWebWebRelay({"name": "spu", "base_url": "url"})} - sensor = Sensor(signal_analyzer=MockSignalAnalyzer(), capabilities={}) - sensor.switches = switches - assert sensor.switches == switches +def test_set_get_differential_calibration(mock_sensor): + assert mock_sensor.differential_calibration is None -def test_has_configurable_preselector_in_capabilities(): +def test_has_configurable_preselector_in_capabilities(mock_sensor): capabilities = { "sensor": { "preselector": {"rf_paths": [{"name": "antenna"}, {"name": "noise_diode"}]} } } - sensor = Sensor(signal_analyzer=MockSignalAnalyzer(), capabilities=capabilities) - assert sensor.has_configurable_preselector == True + mock_sensor.capabilities = capabilities + assert mock_sensor.has_configurable_preselector == True -def test_has_configurable_preselector_in_preselector(): - sensor = Sensor(signal_analyzer=MockSignalAnalyzer(), capabilities={}) - sensor.preselector = WebRelayPreselector( +def test_has_configurable_preselector_in_preselector(mock_sensor): + mock_sensor.preselector = WebRelayPreselector( {}, {"name": "preselector", "base_url": "url"} ) - sensor.preselector.rf_paths = [{"name": "antenna"}, {"name": "noise_diode"}] - assert sensor.has_configurable_preselector == True + mock_sensor.preselector.rf_paths = [{"name": "antenna"}, {"name": "noise_diode"}] + assert mock_sensor.has_configurable_preselector == True -def test_has_configurable_preselector_not_configurable(): +def test_has_configurable_preselector_not_configurable(mock_sensor): capabilities = {"sensor": {"preselector": {"rf_paths": [{"name": "antenna"}]}}} - sensor = Sensor(signal_analyzer=MockSignalAnalyzer(), capabilities=capabilities) - assert sensor.has_configurable_preselector == False + mock_sensor.capabilities = capabilities + assert mock_sensor.has_configurable_preselector == False -def test_hash_set_when_not_present(): +def test_hash_set_when_not_present(mock_sensor): capabilities = {"sensor": {"preselector": {"rf_paths": [{"name": "antenna"}]}}} - sensor = Sensor(signal_analyzer=MockSignalAnalyzer(), capabilities=capabilities) - assert "sensor_sha512" in sensor.capabilities["sensor"] - assert sensor.capabilities["sensor"]["sensor_sha512"] is not None + mock_sensor.capabilities = capabilities + assert "sensor_sha512" in mock_sensor.capabilities["sensor"] + assert mock_sensor.capabilities["sensor"]["sensor_sha512"] is not None -def test_hash_not_overwritten(): +def test_hash_not_overwritten(mock_sensor): capabilities = {"sensor": {"sensor_sha512": "some hash"}} - sensor = Sensor(signal_analyzer=MockSignalAnalyzer(), capabilities=capabilities) - assert sensor.capabilities["sensor"]["sensor_sha512"] == "some hash" + mock_sensor.capabilities = capabilities + assert mock_sensor.capabilities["sensor"]["sensor_sha512"] == "some hash" diff --git a/scos_actions/hardware/tests/test_sigan.py b/scos_actions/hardware/tests/test_sigan.py index c82ffeec..4fce6562 100644 --- a/scos_actions/hardware/tests/test_sigan.py +++ b/scos_actions/hardware/tests/test_sigan.py @@ -1,8 +1,19 @@ +import pytest + from scos_actions.hardware.mocks.mock_sigan import MockSignalAnalyzer -def test_sigan_default_cal(): +def test_mock_sigan(): sigan = MockSignalAnalyzer() - sigan.recompute_sensor_calibration_data([]) - sensor_cal = sigan.sensor_calibration_data - assert sensor_cal["gain"] == 0 + # Test default values are available as properties + assert sigan.model == sigan._model + assert sigan.frequency == sigan._frequency + assert sigan.sample_rate == sigan._sample_rate + assert sigan.gain == sigan._gain + assert sigan.attenuation == sigan._attenuation + assert sigan.preamp_enable == sigan._preamp_enable + assert sigan.reference_level == sigan._reference_level + assert sigan.is_available == sigan._is_available + assert sigan.plugin_version == sigan._plugin_version + assert sigan.firmware_version == sigan._firmware_version + assert sigan.api_version == sigan._api_version diff --git a/scos_actions/hardware/utils.py b/scos_actions/hardware/utils.py index 49642da7..8a286e6d 100644 --- a/scos_actions/hardware/utils.py +++ b/scos_actions/hardware/utils.py @@ -1,6 +1,6 @@ import logging import subprocess -from typing import Dict +from typing import Dict, Tuple, Union import psutil from its_preselector.web_relay import WebRelay @@ -66,7 +66,7 @@ def get_current_cpu_temperature(fahrenheit: bool = False) -> float: raise e -def get_disk_smart_data(disk: str) -> dict: +def get_disk_smart_data(disk: str) -> Union[dict, str]: """ Get selected SMART data for the chosen disk. @@ -81,7 +81,8 @@ def get_disk_smart_data(disk: str) -> dict: https://nvmexpress.org/wp-content/uploads/NVM-Express-1_4-2019.06.10-Ratified.pdf :param disk: The desired disk, e.g., ``/dev/nvme0n1``. - :return: A dictionary containing the retrieved data from the SMART report. + :return: A dictionary containing the retrieved data from the SMART report, or + the string "Unavailable" if ``smartctl`` fails to run. """ try: report = subprocess.check_output(["smartctl", "-a", disk]).decode("utf-8") diff --git a/scos_actions/metadata/sigmf_builder.py b/scos_actions/metadata/sigmf_builder.py index e9f6b2fd..1e6106df 100644 --- a/scos_actions/metadata/sigmf_builder.py +++ b/scos_actions/metadata/sigmf_builder.py @@ -27,7 +27,7 @@ }, { "name": "ntia-diagnostics", - "version": "2.0.0", + "version": "2.2.0", "optional": True, }, { @@ -266,7 +266,7 @@ def set_collection(self, collection: str) -> None: """ self.sigmf_md.set_global_field("core:collection", collection) - ### ntia-algorithm v2.0.0 ### + ### ntia-algorithm v2.0.1 ### def set_data_products(self, data_products: List[Graph]) -> None: """ @@ -311,7 +311,7 @@ def set_classification(self, classification: str) -> None: """ self.sigmf_md.set_global_field("ntia-core:classification", classification) - ### ntia-diagnostics v1.0.0 ### + ### ntia-diagnostics v2.2.0 ### def set_diagnostics(self, diagnostics: Diagnostics) -> None: """ diff --git a/scos_actions/metadata/structs/capture.py b/scos_actions/metadata/structs/capture.py index 16f12280..abcff4fb 100644 --- a/scos_actions/metadata/structs/capture.py +++ b/scos_actions/metadata/structs/capture.py @@ -14,6 +14,8 @@ "duration": "ntia-sensor:duration", "overload": "ntia-sensor:overload", "sensor_calibration": "ntia-sensor:sensor_calibration", + # sigan_calibration is unused by SCOS Sensor but still defined + # in the ntia-sensor extension as of v2.0.0 "sigan_calibration": "ntia-sensor:sigan_calibration", "sigan_settings": "ntia-sensor:sigan_settings", } diff --git a/scos_actions/metadata/structs/ntia_diagnostics.py b/scos_actions/metadata/structs/ntia_diagnostics.py index def0cbd5..a0ef7c8d 100644 --- a/scos_actions/metadata/structs/ntia_diagnostics.py +++ b/scos_actions/metadata/structs/ntia_diagnostics.py @@ -129,16 +129,20 @@ class Computer(msgspec.Struct, **SIGMF_OBJECT_KWARGS): :param cpu_mean_clock: Mean sampled clock speed, in MHz. :param cpu_uptime: Number of days since the computer started. :param action_cpu_usage: CPU utilization during action execution, as a percentage. + :param action_runtime: Total action execution time, in seconds. :param system_load_5m: Number of processes in a runnable state over the previous 5 minutes as a percentage of the number of CPUs. :param memory_usage: Average percent of memory used during action execution. :param cpu_overheating: Whether the CPU is overheating. :param cpu_temp: CPU temperature, in degrees Celsius. - :param scos_start: The time at which the SCOS API container started. Must be + :param software_start: The time at which the sensor software started. Must be an ISO 8601 formatted string. - :param scos_uptime: Number of days since the SCOS API container started. + :param software_uptime: Number of days since the sensor software started. :param ssd_smart_data: Information provided by the drive Self-Monitoring, Analysis, and Reporting Technology. + :param ntp_active: True if NTP service is active on the computer. + :param ntp_sync: True if the system clock is synchronized with NTP. + :param disk_usage: Total computer disk usage, as a percentage. """ cpu_min_clock: Optional[float] = None @@ -154,6 +158,9 @@ class Computer(msgspec.Struct, **SIGMF_OBJECT_KWARGS): software_start: Optional[str] = None software_uptime: Optional[float] = None ssd_smart_data: Optional[SsdSmartData] = None + ntp_active: Optional[bool] = None + ntp_sync: Optional[bool] = None + disk_usage: Optional[float] = None class ScosPlugin(msgspec.Struct, **SIGMF_OBJECT_KWARGS): diff --git a/scos_actions/settings.py b/scos_actions/settings.py index bc7e9a93..158de256 100644 --- a/scos_actions/settings.py +++ b/scos_actions/settings.py @@ -1,5 +1,4 @@ import logging -from os import path from pathlib import Path from environs import Env diff --git a/scos_actions/signal_processing/calibration.py b/scos_actions/signal_processing/calibration.py index 785685d3..e1a4c847 100644 --- a/scos_actions/signal_processing/calibration.py +++ b/scos_actions/signal_processing/calibration.py @@ -3,29 +3,23 @@ import numpy as np from its_preselector.preselector import Preselector +from numpy.typing import NDArray from scipy.constants import Boltzmann +from scos_actions.calibration.utils import CalibrationException from scos_actions.signal_processing.unit_conversion import ( convert_celsius_to_fahrenheit, convert_celsius_to_kelvins, convert_dB_to_linear, convert_linear_to_dB, - convert_watts_to_dBm, ) logger = logging.getLogger(__name__) -class CalibrationException(Exception): - """Basic exception handling for calibration functions.""" - - def __init__(self, msg): - super().__init__(msg) - - def y_factor( - pwr_noise_on_watts: np.ndarray, - pwr_noise_off_watts: np.ndarray, + pwr_noise_on_watts: NDArray, + pwr_noise_off_watts: NDArray, enr_linear: float, enbw_hz: float, temp_kelvins: float = 300.0, @@ -49,16 +43,16 @@ def y_factor( :return: A tuple (noise_figure, gain) containing the calculated noise figure and gain, both in dB, from the Y-factor method. """ - mean_on_dBm = convert_watts_to_dBm(np.mean(pwr_noise_on_watts)) - mean_off_dBm = convert_watts_to_dBm(np.mean(pwr_noise_off_watts)) + mean_on_dBW = convert_linear_to_dB(np.mean(pwr_noise_on_watts)) + mean_off_dBW = convert_linear_to_dB(np.mean(pwr_noise_off_watts)) if logger.isEnabledFor(logging.DEBUG): logger.debug(f"ENR: {convert_linear_to_dB(enr_linear)} dB") logger.debug(f"ENBW: {enbw_hz} Hz") - logger.debug(f"Mean power on: {mean_on_dBm:.2f} dBm") - logger.debug(f"Mean power off: {mean_off_dBm:.2f} dBm") - y = convert_dB_to_linear(mean_on_dBm - mean_off_dBm) + logger.debug(f"Mean power on: {mean_on_dBW+30:.2f} dBm") + logger.debug(f"Mean power off: {mean_off_dBW+30:.2f} dBm") + y = convert_dB_to_linear(mean_on_dBW - mean_off_dBW) noise_factor = enr_linear / (y - 1.0) - gain_dB = mean_on_dBm - convert_watts_to_dBm( + gain_dB = mean_on_dBW - convert_linear_to_dB( Boltzmann * temp_kelvins * enbw_hz * (enr_linear + noise_factor) ) noise_figure_dB = convert_linear_to_dB(noise_factor) diff --git a/scos_actions/tests/resources/utils.py b/scos_actions/tests/resources/utils.py deleted file mode 100644 index ff26be2c..00000000 --- a/scos_actions/tests/resources/utils.py +++ /dev/null @@ -1,15 +0,0 @@ -def easy_gain(sample_rate, frequency, gain): - """Create an easily interpolated calibration gain value for testing. - - :type sample_rate: float - :param sample_rate: Sample rate in samples per second - - :type frequency: float - :param frequency: Frequency in hertz - - :type gain: int - :param gain: Signal analyzer gain setting in dB - - :rtype: float - """ - return gain + (sample_rate / 1e6) + (frequency / 1e9) diff --git a/scos_actions/utils.py b/scos_actions/utils.py index e70a4324..182474ce 100644 --- a/scos_actions/utils.py +++ b/scos_actions/utils.py @@ -1,5 +1,6 @@ import json import logging +import shutil from datetime import datetime from pathlib import Path @@ -124,3 +125,11 @@ def get_days_up(start_time): days = elapsed.days fractional_day = elapsed.seconds / (60 * 60 * 24) return round(days + fractional_day, 4) + + +def get_disk_usage() -> float: + """Return the total disk usage as a percentage.""" + usage = shutil.disk_usage("/") + percent_used = round(100 * usage.used / usage.total) + logger.debug(f"{percent_used} disk used") + return round(percent_used, 2)