Skip to content

Commit

Permalink
add and update action class type hints and imports
Browse files Browse the repository at this point in the history
  • Loading branch information
aromanielloNTIA committed Jan 16, 2024
1 parent d529c64 commit 54567d3
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 24 deletions.
10 changes: 5 additions & 5 deletions scos_actions/actions/acquire_sea_data_product.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@
from environs import Env
from its_preselector import __version__ as PRESELECTOR_API_VERSION
from scipy.signal import sos2tf, sosfilt

from scos_actions import __version__ as SCOS_ACTIONS_VERSION
from scos_actions import utils
from scos_actions.actions.interfaces.action import Action
from scos_actions.hardware.sensor import Sensor
from scos_actions.hardware.utils import (
Expand Down Expand Up @@ -77,6 +74,9 @@
from scos_actions.status import start_time
from scos_actions.utils import convert_datetime_to_millisecond_iso_format, get_days_up

from scos_actions import __version__ as SCOS_ACTIONS_VERSION
from scos_actions import utils

env = Env()
logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -450,7 +450,7 @@ class NasctnSeaDataProduct(Action):
:param sigan: Instance of SignalAnalyzerInterface.
"""

def __init__(self, parameters):
def __init__(self, parameters: dict):
super().__init__(parameters)
# Assume preselector is present
rf_path_name = utils.get_parameter(RF_PATH, self.parameters)
Expand Down Expand Up @@ -504,7 +504,7 @@ def __init__(self, parameters):
# Get iterable parameter list
self.iteration_params = utils.get_iterable_parameters(self.parameters)

def __call__(self, sensor, schedule_entry, task_id):
def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int):
"""This is the entrypoint function called by the scheduler."""
self._sensor = sensor
action_start_tic = perf_counter()
Expand Down
3 changes: 1 addition & 2 deletions scos_actions/actions/acquire_single_freq_fft.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@
import logging

from numpy import float32, ndarray

from scos_actions.actions.interfaces.measurement_action import MeasurementAction
from scos_actions.hardware.mocks.mock_gps import MockGPS
from scos_actions.metadata.structs import ntia_algorithm
Expand Down Expand Up @@ -143,7 +142,7 @@ class SingleFrequencyFftAcquisition(MeasurementAction):
:param sigan: Instance of SignalAnalyzerInterface.
"""

def __init__(self, parameters):
def __init__(self, parameters: dict):
super().__init__(parameters)
# Pull parameters from action config
self.fft_size = get_parameter(FFT_SIZE, self.parameters)
Expand Down
6 changes: 3 additions & 3 deletions scos_actions/actions/acquire_single_freq_tdomain_iq.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@
import logging

from numpy import complex64

from scos_actions import utils
from scos_actions.actions.interfaces.measurement_action import MeasurementAction
from scos_actions.hardware.mocks.mock_gps import MockGPS
from scos_actions.utils import get_parameter

from scos_actions import utils

logger = logging.getLogger(__name__)

# Define parameter keys
Expand Down Expand Up @@ -71,7 +71,7 @@ class SingleFrequencyTimeDomainIqAcquisition(MeasurementAction):
:param sigan: instance of SignalAnalyzerInterface.
"""

def __init__(self, parameters):
def __init__(self, parameters: dict):
super().__init__(parameters=parameters)
# Pull parameters from action config
self.nskip = get_parameter(NUM_SKIP, self.parameters)
Expand Down
10 changes: 5 additions & 5 deletions scos_actions/actions/acquire_stepped_freq_tdomain_iq.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,21 @@
import logging

import numpy as np

from scos_actions import utils
from scos_actions.actions.acquire_single_freq_tdomain_iq import (
CAL_ADJUST,
DURATION_MS,
FREQUENCY,
NUM_SKIP,
SingleFrequencyTimeDomainIqAcquisition,
)
from scos_actions.hardware.mocks.mock_gps import MockGPS
from scos_actions.hardware.sensor import Sensor
from scos_actions.metadata.structs import ntia_sensor
from scos_actions.metadata.structs.capture import CaptureSegment
from scos_actions.signals import measurement_action_completed
from scos_actions.utils import get_parameter

from scos_actions import utils

logger = logging.getLogger(__name__)


Expand All @@ -77,7 +77,7 @@ class SteppedFrequencyTimeDomainIqAcquisition(SingleFrequencyTimeDomainIqAcquisi
:param sigan: instance of SignalAnalyzerInterface
"""

def __init__(self, parameters):
def __init__(self, parameters: dict):
super().__init__(parameters=parameters)
num_center_frequencies = len(parameters[FREQUENCY])

Expand All @@ -87,7 +87,7 @@ def __init__(self, parameters):
self.sensor = None
self.num_center_frequencies = num_center_frequencies

def __call__(self, sensor, schedule_entry: dict, task_id: int):
def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int):
"""This is the entrypoint function called by the scheduler."""
self._sensor = sensor
self.test_required_components()
Expand Down
11 changes: 6 additions & 5 deletions scos_actions/actions/calibrate_y_factor.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,8 @@
import numpy as np
from scipy.constants import Boltzmann
from scipy.signal import sosfilt

from scos_actions import utils
from scos_actions.actions.interfaces.action import Action
from scos_actions.hardware.sensor import Sensor
from scos_actions.hardware.sigan_iface import SIGAN_SETTINGS_KEYS
from scos_actions.signal_processing.calibration import (
get_linear_enr,
Expand All @@ -93,6 +92,8 @@
from scos_actions.signals import trigger_api_restart
from scos_actions.utils import ParameterException, get_parameter

from scos_actions import utils

logger = logging.getLogger(__name__)

# Define parameter keys
Expand Down Expand Up @@ -134,7 +135,7 @@ class YFactorCalibration(Action):
:param sigan: instance of SignalAnalyzerInterface.
"""

def __init__(self, parameters):
def __init__(self, parameters: dict):
logger.debug("Initializing calibration action")
super().__init__(parameters)
self.iteration_params = utils.get_iterable_parameters(parameters)
Expand Down Expand Up @@ -192,7 +193,7 @@ def __init__(self, parameters):
"Only one set of IIR filter parameters may be specified (including sample rate)."
)

def __call__(self, sensor, schedule_entry: dict, task_id: int):
def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int):
"""This is the entrypoint function called by the scheduler."""
self.sensor = sensor
self.test_required_components()
Expand All @@ -206,7 +207,7 @@ def __call__(self, sensor, schedule_entry: dict, task_id: int):
detail += os.linesep + self.calibrate(p)
return detail

def calibrate(self, params):
def calibrate(self, params: dict):
# Configure signal analyzer
self.configure_sigan(params)

Expand Down
4 changes: 3 additions & 1 deletion scos_actions/actions/logger.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
"""A simple example action that logs a message."""

import logging
from typing import Optional

from scos_actions.actions.interfaces.action import Action
from scos_actions.hardware.sensor import Sensor

logger = logging.getLogger(__name__)

Expand All @@ -24,7 +26,7 @@ def __init__(self, loglvl=LOGLVL_INFO):
super().__init__(parameters={"name": "logger"})
self.loglvl = loglvl

def __call__(self, sensor, schedule_entry, task_id):
def __call__(self, sensor: Optional[Sensor], schedule_entry: dict, task_id: int):
msg = "running test {name}/{tid}"
schedule_entry_name = schedule_entry["name"]
logger.log(
Expand Down
8 changes: 7 additions & 1 deletion scos_actions/actions/monitor_sigan.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Monitor the signal analyzer."""

import logging
from typing import Optional

from scos_actions.actions.interfaces.action import Action
from scos_actions.hardware.sensor import Sensor
Expand All @@ -15,7 +16,12 @@ class MonitorSignalAnalyzer(Action):
def __init__(self, parameters={"name": "monitor_sigan"}, gps=None):
super().__init__(parameters=parameters)

def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int):
def __call__(
self,
sensor: Sensor,
schedule_entry: Optional[dict] = None,
task_id: Optional[int] = None,
):
logger.debug("Performing signal analyzer health check")
self._sensor = sensor
healthy = self.sensor.signal_analyzer.healthy()
Expand Down
5 changes: 3 additions & 2 deletions scos_actions/actions/sync_gps.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import subprocess

from scos_actions.actions.interfaces.action import Action
from scos_actions.hardware.sensor import Sensor
from scos_actions.signals import location_action_completed

logger = logging.getLogger(__name__)
Expand All @@ -12,10 +13,10 @@
class SyncGps(Action):
"""Query the GPS and synchronize time and location."""

def __init__(self, parameters):
def __init__(self, parameters: dict):
super().__init__(parameters=parameters)

def __call__(self, sensor, schedule_entry: dict, task_id: int):
def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int):
logger.debug("Syncing to GPS")
self.sensor = sensor
dt = self.sensor.gps.get_gps_time()
Expand Down

0 comments on commit 54567d3

Please sign in to comment.