Skip to content

Commit

Permalink
Require signal analyzer and capabilities in Sensor constructor. Add d…
Browse files Browse the repository at this point in the history
…ocstrings and test cases. Modify has_configurable_preselector to return True if the sensor definition includes a preselector with multiple rf_paths or if the preselector has multiple rf_paths.
  • Loading branch information
dboulware committed Jan 26, 2024
1 parent 0581b95 commit 74b3760
Show file tree
Hide file tree
Showing 10 changed files with 181 additions and 45 deletions.
4 changes: 3 additions & 1 deletion sample_debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ def callback(sender, **kwargs):
measurement_action_completed.connect(callback)

action = SingleFrequencyFftAcquisition(parameters)
sensor = Sensor(signal_analyzer=MockSignalAnalyzer(randomize_values=True))
sensor = Sensor(
signal_analyzer=MockSignalAnalyzer(randomize_values=True), capabilities={}
)
action(sensor, schedule_entry_json, 1)
print("metadata:")
print(json.dumps(_metadata, indent=4))
Expand Down
4 changes: 2 additions & 2 deletions scos_actions/actions/tests/test_acquire_single_freq_fft.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def callback(sender, **kwargs):
measurement_action_completed.connect(callback)
action = actions["test_single_frequency_m4s_action"]
assert action.description
sensor = Sensor(signal_analyzer=MockSignalAnalyzer())
sensor = Sensor(signal_analyzer=MockSignalAnalyzer(), capabilities={})
action(
sensor=sensor,
schedule_entry=SINGLE_FREQUENCY_FFT_ACQUISITION,
Expand Down Expand Up @@ -89,6 +89,6 @@ def callback(sender, **kwargs):
def test_num_samples_skip():
action = actions["test_single_frequency_m4s_action"]
assert action.description
sensor = Sensor(signal_analyzer=MockSignalAnalyzer())
sensor = Sensor(signal_analyzer=MockSignalAnalyzer(), capabilities={})
action(sensor, SINGLE_FREQUENCY_FFT_ACQUISITION, 1)
assert action.sensor.signal_analyzer._num_samples_skip == action.parameters["nskip"]
6 changes: 3 additions & 3 deletions scos_actions/actions/tests/test_monitor_sigan.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def callback(sender, **kwargs):
action = actions["test_monitor_sigan"]
mock_sigan = MockSignalAnalyzer()
mock_sigan._is_available = False
sensor = Sensor(signal_analyzer=mock_sigan)
sensor = Sensor(signal_analyzer=mock_sigan, capabilities={})
action(sensor, MONITOR_SIGAN_SCHEDULE, 1)
assert _api_restart_triggered == True # signal sent
mock_sigan._is_available = True
Expand All @@ -40,7 +40,7 @@ def callback(sender, **kwargs):
action = actions["test_monitor_sigan"]
mock_sigan = MockSignalAnalyzer()
mock_sigan.times_to_fail_recv = 6
sensor = Sensor(signal_analyzer=mock_sigan)
sensor = Sensor(signal_analyzer=mock_sigan, capabilities={})
action(sensor, MONITOR_SIGAN_SCHEDULE, 1)
assert _api_restart_triggered == True # signal sent

Expand All @@ -57,6 +57,6 @@ def callback(sender, **kwargs):
mock_sigan = MockSignalAnalyzer()
mock_sigan._is_available = True
mock_sigan.set_times_to_fail_recv(0)
sensor = Sensor(signal_analyzer=mock_sigan)
sensor = Sensor(signal_analyzer=mock_sigan, capabilities={})
action(sensor, MONITOR_SIGAN_SCHEDULE, 1)
assert _api_restart_triggered == False # signal not sent
6 changes: 3 additions & 3 deletions scos_actions/actions/tests/test_single_freq_tdomain_iq.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def callback(sender, **kwargs):
measurement_action_completed.connect(callback)
action = actions["test_single_frequency_iq_action"]
assert action.description
sensor = Sensor(signal_analyzer=MockSignalAnalyzer())
sensor = Sensor(signal_analyzer=MockSignalAnalyzer(), capabilities={})
action(sensor, SINGLE_TIMEDOMAIN_IQ_ACQUISITION, 1)
assert _data.any()
assert _metadata
Expand Down Expand Up @@ -62,7 +62,7 @@ def test_required_components():
action = actions["test_single_frequency_m4s_action"]
mock_sigan = MockSignalAnalyzer()
mock_sigan._is_available = False
sensor = Sensor(signal_analyzer=mock_sigan)
sensor = Sensor(signal_analyzer=mock_sigan, capabilities={})
with pytest.raises(RuntimeError):
action(sensor, SINGLE_TIMEDOMAIN_IQ_ACQUISITION, 1)
mock_sigan._is_available = True
Expand All @@ -72,6 +72,6 @@ def test_num_samples_skip():
action = actions["test_single_frequency_iq_action"]
assert action.description
mock_sigan = MockSignalAnalyzer()
sensor = Sensor(signal_analyzer=mock_sigan)
sensor = Sensor(signal_analyzer=mock_sigan, capabilities={})
action(sensor, SINGLE_TIMEDOMAIN_IQ_ACQUISITION, 1)
assert action.sensor.signal_analyzer._num_samples_skip == action.parameters["nskip"]
4 changes: 2 additions & 2 deletions scos_actions/actions/tests/test_stepped_freq_tdomain_iq.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def callback(sender, **kwargs):
action = actions["test_multi_frequency_iq_action"]
assert action.description
mock_sigan = MockSignalAnalyzer()
sensor = Sensor(signal_analyzer=mock_sigan)
sensor = Sensor(signal_analyzer=mock_sigan, capabilities={})
action(sensor, SINGLE_TIMEDOMAIN_IQ_MULTI_RECORDING_ACQUISITION, 1)
for i in range(_count):
assert _datas[i].any()
Expand All @@ -49,7 +49,7 @@ def test_num_samples_skip():
action = actions["test_multi_frequency_iq_action"]
assert action.description
mock_sigan = MockSignalAnalyzer()
sensor = Sensor(signal_analyzer=mock_sigan)
sensor = Sensor(signal_analyzer=mock_sigan, capabilities={})
action(sensor, SINGLE_TIMEDOMAIN_IQ_MULTI_RECORDING_ACQUISITION, 1)
if isinstance(action.parameters["nskip"], list):
assert (
Expand Down
5 changes: 4 additions & 1 deletion scos_actions/actions/tests/test_sync_gps.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from scos_actions.discover import test_actions
from scos_actions.hardware.mocks.mock_gps import MockGPS
from scos_actions.hardware.mocks.mock_sigan import MockSignalAnalyzer
from scos_actions.hardware.sensor import Sensor
from scos_actions.signals import location_action_completed

Expand All @@ -29,7 +30,9 @@ def callback(sender, **kwargs):

location_action_completed.connect(callback)
action = test_actions["test_sync_gps"]
sensor = Sensor(gps=MockGPS())
sensor = Sensor(
signal_analyzer=MockSignalAnalyzer(), capabilities={}, gps=MockGPS()
)
if sys.platform == "linux":
action(sensor, SYNC_GPS, 1)
assert _latitude
Expand Down
6 changes: 3 additions & 3 deletions scos_actions/hardware/mocks/mock_sigan.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
from typing import Optional

import numpy as np

from scos_actions import __version__ as SCOS_ACTIONS_VERSION
from scos_actions.calibration.calibration import Calibration
from scos_actions.hardware.sigan_iface import SignalAnalyzerInterface
from scos_actions.utils import get_datetime_str_now

from scos_actions import __version__ as SCOS_ACTIONS_VERSION

logger = logging.getLogger(__name__)

tune_result_params = ["actual_dsp_freq", "actual_rf_freq"]
Expand All @@ -33,7 +33,7 @@ def __init__(
switches: Optional[dict] = None,
randomize_values: bool = False,
):
super().__init__(sensor_cal, sigan_cal)
super().__init__(sensor_cal, sigan_cal, switches)
# Define the default calibration dicts
self.DEFAULT_SIGAN_CALIBRATION = {
"datetime": get_datetime_str_now(),
Expand Down
119 changes: 95 additions & 24 deletions scos_actions/hardware/sensor.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,38 @@
import datetime
import hashlib
import json
from typing import Dict
import logging
from typing import Dict, Optional

from its_preselector.preselector import Preselector
from its_preselector.web_relay import WebRelay

from .gps_iface import GPSInterface
from .mocks.mock_sigan import MockSignalAnalyzer
from .sigan_iface import SignalAnalyzerInterface


class Sensor:
"""
Software representation of the physical RF sensor. The Sensor may include a GPSInterface,
Preselector, a dictionary of WebRelays, a location specified in GeoJSON, and a dictionary
of the sensor capabilities. The capabilities should include a 'sensor' key that maps to
the metadata definition of the Sensor(
https://github.com/NTIA/sigmf-ns-ntia/blob/master/ntia-sensor.sigmf-ext.md#01-the-sensor-object),
and an 'action' key that maps to a list of ntia-scos action objects
(https://github.com/NTIA/sigmf-ns-ntia/blob/master/ntia-scos.sigmf-ext.md#02-the-action-object)
The Sensor instance is passed into Actions __call__ methods to perform an action.
"""

logger = logging.getLogger(__name__)

def __init__(
self,
signal_analyzer: SignalAnalyzerInterface = None,
gps: GPSInterface = None,
preselector: Preselector = None,
switches: Dict[str, WebRelay] = {},
location: dict = None,
capabilities: dict = None,
signal_analyzer: SignalAnalyzerInterface,
capabilities: dict,
gps: Optional[GPSInterface] = None,
preselector: Optional[Preselector] = None,
switches: Optional[Dict[str, WebRelay]] = {},
location: Optional[dict] = None,
):
self.signal_analyzer = signal_analyzer
self.gps = gps
Expand All @@ -40,22 +53,38 @@ def signal_analyzer(self, sigan: SignalAnalyzerInterface):

@property
def gps(self) -> GPSInterface:
"""
The sensor's Global Positioning System.
"""
return self._gps

@gps.setter
def gps(self, gps: GPSInterface):
"""
Set the sensor's Global Positioning System.
"""
self._gps = gps

@property
def preselector(self) -> Preselector:
"""
RF front end that may include calibration sources, filters, and/or amplifiers.
"""
return self._preselector

@preselector.setter
def preselector(self, preselector: Preselector):
"""
Set the RF front end that may include calibration sources, filters, and/or amplifiers.
"""
self._preselector = preselector

@property
def switches(self) -> Dict[str, WebRelay]:
"""
Dictionary of WebRelays, indexed by name. WebRelays may enable/disable other
components within the sensor and/or provide a variety of sensors.
"""
return self._switches

@switches.setter
Expand All @@ -64,42 +93,84 @@ def switches(self, switches: Dict[str, WebRelay]):

@property
def location(self) -> dict:
"""
The GeoJSON dictionary of the sensor's location.
"""
return self._location

@location.setter
def location(self, loc: dict):
"""
Set the GeoJSON location of the sensor.
"""
self._location = loc

@property
def capabilities(self) -> dict:
"""
A dictionary of the sensor's capabilities. The dictionary should
include a 'sensor' key that maps to the ntia-sensor
(https://github.com/NTIA/sigmf-ns-ntia/blob/master/ntia-sensor.sigmf-ext.md)
object and an actions key that maps to a list of ntia-scos action objects
(https://github.com/NTIA/sigmf-ns-ntia/blob/master/ntia-scos.sigmf-ext.md#02-the-action-object)
"""
return self._capabilities

@capabilities.setter
def capabilities(self, capabilities: dict):
"""
Set the dictionary of the sensor's capabilities. The dictionary should
include a 'sensor' key that links to the ntia-sensor
(https://github.com/NTIA/sigmf-ns-ntia/blob/master/ntia-sensor.sigmf-ext.md)
object and an actions key that links to a list of ntia-scos action objects
(https://github.com/NTIA/sigmf-ns-ntia/blob/master/ntia-scos.sigmf-ext.md#02-the-action-object)
"""
if capabilities is not None:
if "sensor_sha512" not in capabilities["sensor"]:
if (
"sensor" in capabilities
and "sensor_sha512" not in capabilities["sensor"]
):
sensor_def = json.dumps(capabilities["sensor"], sort_keys=True)
SENSOR_DEFINITION_HASH = hashlib.sha512(
sensor_definition_hash = hashlib.sha512(
sensor_def.encode("UTF-8")
).hexdigest()
capabilities["sensor"]["sensor_sha512"] = SENSOR_DEFINITION_HASH
self._capabilities = capabilities
else:
self._capabilities = None
capabilities["sensor"]["sensor_sha512"] = sensor_definition_hash
self._capabilities = capabilities

@property
def has_configurable_preselector(self) -> bool:
if self._capabilities is None:
return False
"""
Checks if the preselector has multiple rf paths.
Returns: True if either the Preselector object or the sensor definition contain multiple rf_paths, False
otherwise.
"""
if (
self.preselector is not None
and self.preselector.rf_paths is not None
and len(self.preselector.rf_paths) > 0
):
self.logger.debug(
"Preselector is configurable: found multiple rf_paths in preselector object."
)
return True
elif (
self.capabilities
and len(
self.capabilities.get("sensor", {})
.get("preselector", {})
.get("rf_paths", [])
)
> 1
):
self.logger.debug(
"Preselector is configurable: found multiple rf_paths in sensor definition."
)
return True
else:
sensor_definition = self._capabilities["sensor"]
if (
"preselector" in sensor_definition
and "rf_paths" in sensor_definition["preselector"]
):
return True
else:
return False
self.logger.debug(
"Preselector is not configurable: Neither sensor definition or preselector object contained multiple rf_paths."
)
return False

@property
def start_time(self):
Expand Down
Loading

0 comments on commit 74b3760

Please sign in to comment.