From 54567d370d9455adf727e851e4dfc3cdb41c58c3 Mon Sep 17 00:00:00 2001 From: Anthony Romaniello Date: Tue, 16 Jan 2024 12:19:42 -0500 Subject: [PATCH] add and update action class type hints and imports --- scos_actions/actions/acquire_sea_data_product.py | 10 +++++----- scos_actions/actions/acquire_single_freq_fft.py | 3 +-- .../actions/acquire_single_freq_tdomain_iq.py | 6 +++--- .../actions/acquire_stepped_freq_tdomain_iq.py | 10 +++++----- scos_actions/actions/calibrate_y_factor.py | 11 ++++++----- scos_actions/actions/logger.py | 4 +++- scos_actions/actions/monitor_sigan.py | 8 +++++++- scos_actions/actions/sync_gps.py | 5 +++-- 8 files changed, 33 insertions(+), 24 deletions(-) diff --git a/scos_actions/actions/acquire_sea_data_product.py b/scos_actions/actions/acquire_sea_data_product.py index 9b4660bb..2c906623 100644 --- a/scos_actions/actions/acquire_sea_data_product.py +++ b/scos_actions/actions/acquire_sea_data_product.py @@ -34,9 +34,6 @@ from environs import Env from its_preselector import __version__ as PRESELECTOR_API_VERSION from scipy.signal import sos2tf, sosfilt - -from scos_actions import __version__ as SCOS_ACTIONS_VERSION -from scos_actions import utils from scos_actions.actions.interfaces.action import Action from scos_actions.hardware.sensor import Sensor from scos_actions.hardware.utils import ( @@ -77,6 +74,9 @@ from scos_actions.status import start_time from scos_actions.utils import convert_datetime_to_millisecond_iso_format, get_days_up +from scos_actions import __version__ as SCOS_ACTIONS_VERSION +from scos_actions import utils + env = Env() logger = logging.getLogger(__name__) @@ -450,7 +450,7 @@ class NasctnSeaDataProduct(Action): :param sigan: Instance of SignalAnalyzerInterface. """ - def __init__(self, parameters): + def __init__(self, parameters: dict): super().__init__(parameters) # Assume preselector is present rf_path_name = utils.get_parameter(RF_PATH, self.parameters) @@ -504,7 +504,7 @@ def __init__(self, parameters): # Get iterable parameter list self.iteration_params = utils.get_iterable_parameters(self.parameters) - def __call__(self, sensor, schedule_entry, task_id): + def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): """This is the entrypoint function called by the scheduler.""" self._sensor = sensor action_start_tic = perf_counter() diff --git a/scos_actions/actions/acquire_single_freq_fft.py b/scos_actions/actions/acquire_single_freq_fft.py index 7f4385e0..aa6f16ba 100644 --- a/scos_actions/actions/acquire_single_freq_fft.py +++ b/scos_actions/actions/acquire_single_freq_fft.py @@ -89,7 +89,6 @@ import logging from numpy import float32, ndarray - from scos_actions.actions.interfaces.measurement_action import MeasurementAction from scos_actions.hardware.mocks.mock_gps import MockGPS from scos_actions.metadata.structs import ntia_algorithm @@ -143,7 +142,7 @@ class SingleFrequencyFftAcquisition(MeasurementAction): :param sigan: Instance of SignalAnalyzerInterface. """ - def __init__(self, parameters): + def __init__(self, parameters: dict): super().__init__(parameters) # Pull parameters from action config self.fft_size = get_parameter(FFT_SIZE, self.parameters) diff --git a/scos_actions/actions/acquire_single_freq_tdomain_iq.py b/scos_actions/actions/acquire_single_freq_tdomain_iq.py index e41700e5..0eb55655 100644 --- a/scos_actions/actions/acquire_single_freq_tdomain_iq.py +++ b/scos_actions/actions/acquire_single_freq_tdomain_iq.py @@ -34,12 +34,12 @@ import logging from numpy import complex64 - -from scos_actions import utils from scos_actions.actions.interfaces.measurement_action import MeasurementAction from scos_actions.hardware.mocks.mock_gps import MockGPS from scos_actions.utils import get_parameter +from scos_actions import utils + logger = logging.getLogger(__name__) # Define parameter keys @@ -71,7 +71,7 @@ class SingleFrequencyTimeDomainIqAcquisition(MeasurementAction): :param sigan: instance of SignalAnalyzerInterface. """ - def __init__(self, parameters): + def __init__(self, parameters: dict): super().__init__(parameters=parameters) # Pull parameters from action config self.nskip = get_parameter(NUM_SKIP, self.parameters) diff --git a/scos_actions/actions/acquire_stepped_freq_tdomain_iq.py b/scos_actions/actions/acquire_stepped_freq_tdomain_iq.py index 721372eb..35cf741c 100644 --- a/scos_actions/actions/acquire_stepped_freq_tdomain_iq.py +++ b/scos_actions/actions/acquire_stepped_freq_tdomain_iq.py @@ -38,8 +38,6 @@ import logging import numpy as np - -from scos_actions import utils from scos_actions.actions.acquire_single_freq_tdomain_iq import ( CAL_ADJUST, DURATION_MS, @@ -47,12 +45,14 @@ NUM_SKIP, SingleFrequencyTimeDomainIqAcquisition, ) -from scos_actions.hardware.mocks.mock_gps import MockGPS +from scos_actions.hardware.sensor import Sensor from scos_actions.metadata.structs import ntia_sensor from scos_actions.metadata.structs.capture import CaptureSegment from scos_actions.signals import measurement_action_completed from scos_actions.utils import get_parameter +from scos_actions import utils + logger = logging.getLogger(__name__) @@ -77,7 +77,7 @@ class SteppedFrequencyTimeDomainIqAcquisition(SingleFrequencyTimeDomainIqAcquisi :param sigan: instance of SignalAnalyzerInterface """ - def __init__(self, parameters): + def __init__(self, parameters: dict): super().__init__(parameters=parameters) num_center_frequencies = len(parameters[FREQUENCY]) @@ -87,7 +87,7 @@ def __init__(self, parameters): self.sensor = None self.num_center_frequencies = num_center_frequencies - def __call__(self, sensor, schedule_entry: dict, task_id: int): + def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): """This is the entrypoint function called by the scheduler.""" self._sensor = sensor self.test_required_components() diff --git a/scos_actions/actions/calibrate_y_factor.py b/scos_actions/actions/calibrate_y_factor.py index 839afc43..2be35eac 100644 --- a/scos_actions/actions/calibrate_y_factor.py +++ b/scos_actions/actions/calibrate_y_factor.py @@ -75,9 +75,8 @@ import numpy as np from scipy.constants import Boltzmann from scipy.signal import sosfilt - -from scos_actions import utils from scos_actions.actions.interfaces.action import Action +from scos_actions.hardware.sensor import Sensor from scos_actions.hardware.sigan_iface import SIGAN_SETTINGS_KEYS from scos_actions.signal_processing.calibration import ( get_linear_enr, @@ -93,6 +92,8 @@ from scos_actions.signals import trigger_api_restart from scos_actions.utils import ParameterException, get_parameter +from scos_actions import utils + logger = logging.getLogger(__name__) # Define parameter keys @@ -134,7 +135,7 @@ class YFactorCalibration(Action): :param sigan: instance of SignalAnalyzerInterface. """ - def __init__(self, parameters): + def __init__(self, parameters: dict): logger.debug("Initializing calibration action") super().__init__(parameters) self.iteration_params = utils.get_iterable_parameters(parameters) @@ -192,7 +193,7 @@ def __init__(self, parameters): "Only one set of IIR filter parameters may be specified (including sample rate)." ) - def __call__(self, sensor, schedule_entry: dict, task_id: int): + def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): """This is the entrypoint function called by the scheduler.""" self.sensor = sensor self.test_required_components() @@ -206,7 +207,7 @@ def __call__(self, sensor, schedule_entry: dict, task_id: int): detail += os.linesep + self.calibrate(p) return detail - def calibrate(self, params): + def calibrate(self, params: dict): # Configure signal analyzer self.configure_sigan(params) diff --git a/scos_actions/actions/logger.py b/scos_actions/actions/logger.py index 52d47d91..ae2c6e60 100644 --- a/scos_actions/actions/logger.py +++ b/scos_actions/actions/logger.py @@ -1,8 +1,10 @@ """A simple example action that logs a message.""" import logging +from typing import Optional from scos_actions.actions.interfaces.action import Action +from scos_actions.hardware.sensor import Sensor logger = logging.getLogger(__name__) @@ -24,7 +26,7 @@ def __init__(self, loglvl=LOGLVL_INFO): super().__init__(parameters={"name": "logger"}) self.loglvl = loglvl - def __call__(self, sensor, schedule_entry, task_id): + def __call__(self, sensor: Optional[Sensor], schedule_entry: dict, task_id: int): msg = "running test {name}/{tid}" schedule_entry_name = schedule_entry["name"] logger.log( diff --git a/scos_actions/actions/monitor_sigan.py b/scos_actions/actions/monitor_sigan.py index f81ed716..dbc838cd 100644 --- a/scos_actions/actions/monitor_sigan.py +++ b/scos_actions/actions/monitor_sigan.py @@ -1,6 +1,7 @@ """Monitor the signal analyzer.""" import logging +from typing import Optional from scos_actions.actions.interfaces.action import Action from scos_actions.hardware.sensor import Sensor @@ -15,7 +16,12 @@ class MonitorSignalAnalyzer(Action): def __init__(self, parameters={"name": "monitor_sigan"}, gps=None): super().__init__(parameters=parameters) - def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): + def __call__( + self, + sensor: Sensor, + schedule_entry: Optional[dict] = None, + task_id: Optional[int] = None, + ): logger.debug("Performing signal analyzer health check") self._sensor = sensor healthy = self.sensor.signal_analyzer.healthy() diff --git a/scos_actions/actions/sync_gps.py b/scos_actions/actions/sync_gps.py index 872d0a28..b0f88543 100644 --- a/scos_actions/actions/sync_gps.py +++ b/scos_actions/actions/sync_gps.py @@ -4,6 +4,7 @@ import subprocess from scos_actions.actions.interfaces.action import Action +from scos_actions.hardware.sensor import Sensor from scos_actions.signals import location_action_completed logger = logging.getLogger(__name__) @@ -12,10 +13,10 @@ class SyncGps(Action): """Query the GPS and synchronize time and location.""" - def __init__(self, parameters): + def __init__(self, parameters: dict): super().__init__(parameters=parameters) - def __call__(self, sensor, schedule_entry: dict, task_id: int): + def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): logger.debug("Syncing to GPS") self.sensor = sensor dt = self.sensor.gps.get_gps_time()