Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…A-178_test_actions_loading
  • Loading branch information
jhazentia committed Mar 27, 2024
2 parents d06098f + 2763dab commit 7ca107a
Show file tree
Hide file tree
Showing 38 changed files with 1,578 additions and 968 deletions.
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ repos:
- id: end-of-file-fixer
- id: trailing-whitespace
- repo: https://github.com/asottile/pyupgrade
rev: v3.15.1
rev: v3.15.2
hooks:
- id: pyupgrade
args: ["--py38-plus"]
Expand All @@ -30,7 +30,7 @@ repos:
types: [file, python]
args: ["--profile", "black", "--filter-files", "--gitignore"]
- repo: https://github.com/psf/black
rev: 24.2.0
rev: 24.3.0
hooks:
- id: black
types: [file, python]
Expand Down
71 changes: 48 additions & 23 deletions scos_actions/actions/acquire_sea_data_product.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,11 @@
create_statistical_detector,
)
from scos_actions.signals import measurement_action_completed, trigger_api_restart
from scos_actions.utils import convert_datetime_to_millisecond_iso_format, get_days_up
from scos_actions.utils import (
convert_datetime_to_millisecond_iso_format,
get_days_up,
get_disk_usage,
)

env = Env()
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -110,7 +114,6 @@
FFT_WINDOW = get_fft_window(FFT_WINDOW_TYPE, FFT_SIZE)
FFT_WINDOW_ECF = get_fft_window_correction(FFT_WINDOW, "energy")
IMPEDANCE_OHMS = 50.0
DATA_REFERENCE_POINT = "noise source output"
NUM_ACTORS = 3 # Number of ray actors to initialize

# Create power detectors
Expand Down Expand Up @@ -451,6 +454,7 @@ class NasctnSeaDataProduct(Action):
def __init__(self, parameters: dict):
super().__init__(parameters)
# Assume preselector is present
self.total_channel_data_length = None
rf_path_name = utils.get_parameter(RF_PATH, self.parameters)
self.rf_path = {self.PRESELECTOR_PATH_KEY: rf_path_name}

Expand Down Expand Up @@ -508,6 +512,7 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int):
action_start_tic = perf_counter()
# Ray should have already been initialized within scos-sensor,
# but check and initialize just in case.

if not ray.is_initialized():
logger.info("Initializing ray.")
logger.info("Set RAY_INIT=true to avoid initializing within " + __name__)
Expand All @@ -527,8 +532,6 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int):
self.iteration_params,
)
self.create_global_sensor_metadata(self.sensor)
self.create_global_data_product_metadata()

# Initialize remote supervisor actors for IQ processing
tic = perf_counter()
# This uses iteration_params[0] because
Expand All @@ -540,10 +543,15 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int):
logger.debug(f"Spawned {NUM_ACTORS} supervisor actors in {toc-tic:.2f} s")

# Collect all IQ data and spawn data product computation processes
dp_procs, cpu_speed = [], []
dp_procs, cpu_speed, reference_points = [], [], []
capture_tic = perf_counter()

for i, parameters in enumerate(self.iteration_params):
measurement_result = self.capture_iq(parameters)
if i == 0:
self.create_global_data_product_metadata(
measurement_result["reference"]
)
# Start data product processing but do not block next IQ capture
tic = perf_counter()

Expand All @@ -554,16 +562,22 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int):
toc = perf_counter()
logger.debug(f"IQ data delivered for processing in {toc-tic:.2f} s")
# Create capture segment with channel-specific metadata before sigan is reconfigured
tic = perf_counter()
self.create_capture_segment(i, measurement_result)
toc = perf_counter()
logger.debug(f"Created capture metadata in {toc-tic:.2f} s")
# Query CPU speed for later averaging in diagnostics metadata
cpu_speed.append(get_current_cpu_clock_speed())
# Append list of data reference points; later we require these to be identical
reference_points.append(measurement_result["reference"])
capture_toc = perf_counter()
logger.debug(
f"Collected all IQ data and started all processing in {capture_toc-capture_tic:.2f} s"
)

# Create data product metadata: requires all data reference points
# to be identical.
assert (
len(set(reference_points)) == 1
), "Channel data were scaled to different reference points. Cannot build metadata."

# Collect processed data product results
all_data, max_max_ch_pwrs, med_mean_ch_pwrs, mean_ch_pwrs, median_ch_pwrs = (
[],
Expand Down Expand Up @@ -639,15 +653,11 @@ def capture_iq(self, params: dict) -> dict:
self.cal_adjust = utils.get_parameter(CAL_ADJUST, params)
logger.debug(f"cal_adjust={self.cal_adjust}")
# Collect IQ data
measurement_result = self.sensor.signal_analyzer.acquire_time_domain_samples(
num_samples, nskip, cal_adjust=self.cal_adjust
measurement_result = self.sensor.acquire_time_domain_samples(
num_samples, nskip, cal_params=params, cal_adjust=self.cal_adjust
)
# Store some metadata with the IQ
measurement_result.update(params)
if self.cal_adjust:
measurement_result["sensor_cal"] = (
self.sensor.signal_analyzer.sensor_calibration_data
)
toc = perf_counter()
logger.debug(
f"IQ Capture ({duration_ms} ms @ {(params[FREQUENCY]/1e6):.1f} MHz) completed in {toc-tic:.2f} s."
Expand Down Expand Up @@ -788,6 +798,11 @@ def capture_diagnostics(
cpu_diag["ssd_smart_data"] = ntia_diagnostics.SsdSmartData(**smart_data)
except:
logger.warning("Failed to get SSD SMART data")
try: # Disk usage
disk_usage = get_disk_usage()
cpu_diag["disk_usage"] = disk_usage
except:
logger.warning("Failed to get disk usage")

# Get software versions
software_diag = {
Expand Down Expand Up @@ -989,7 +1004,7 @@ def test_required_components(self):
trigger_api_restart.send(sender=self.__class__)
return None

def create_global_data_product_metadata(self) -> None:
def create_global_data_product_metadata(self, data_products_reference: str) -> None:
p = self.parameters
num_iq_samples = int(p[SAMPLE_RATE] * p[DURATION_MS] * 1e-3)
iir_obj = ntia_algorithm.DigitalFilter(
Expand Down Expand Up @@ -1034,7 +1049,7 @@ def create_global_data_product_metadata(self) -> None:
x_step=[p[SAMPLE_RATE] / FFT_SIZE],
y_units="dBm/Hz",
processing=[dft_obj.id],
reference=DATA_REFERENCE_POINT,
reference=data_products_reference,
description=(
"Results of statistical detectors (max, mean, median, 25th_percentile, 75th_percentile, "
+ "90th_percentile, 95th_percentile, 99th_percentile, 99.9th_percentile, 99.99th_percentile) "
Expand All @@ -1054,7 +1069,7 @@ def create_global_data_product_metadata(self) -> None:
x_stop=[pvt_x_axis__s[-1]],
x_step=[pvt_x_axis__s[1] - pvt_x_axis__s[0]],
y_units="dBm",
reference=DATA_REFERENCE_POINT,
reference=data_products_reference,
description=(
"Max- and mean-detected channel power vs. time, with "
+ f"an integration time of {p[TD_BIN_SIZE_MS]} ms. "
Expand All @@ -1081,7 +1096,7 @@ def create_global_data_product_metadata(self) -> None:
x_stop=[pfp_x_axis__s[-1]],
x_step=[pfp_x_axis__s[1] - pfp_x_axis__s[0]],
y_units="dBm",
reference=DATA_REFERENCE_POINT,
reference=data_products_reference,
description=(
"Channelized periodic frame power statistics reported over"
+ f" a {p[PFP_FRAME_PERIOD_MS]} ms frame period, with frame resolution"
Expand All @@ -1104,6 +1119,7 @@ def create_global_data_product_metadata(self) -> None:
y_start=[apd_y_axis__dBm[0]],
y_stop=[apd_y_axis__dBm[-1]],
y_step=[apd_y_axis__dBm[1] - apd_y_axis__dBm[0]],
reference=data_products_reference,
description=(
f"Estimate of the APD, using a {p[APD_BIN_SIZE_DB]} dB "
+ "bin size for amplitude values. The data payload includes"
Expand All @@ -1122,6 +1138,7 @@ def create_global_data_product_metadata(self) -> None:
+ pfp_length * len(PFP_M3_DETECTOR) * 2
+ apd_graph.length
)
logger.debug(f"Total channel length:{self.total_channel_data_length}")

def create_capture_segment(
self,
Expand All @@ -1144,12 +1161,20 @@ def create_capture_segment(
)
if self.cal_adjust:
capture_segment.sensor_calibration = ntia_sensor.Calibration(
datetime=measurement_result["sensor_cal"]["datetime"],
gain=round(measurement_result["sensor_cal"]["gain"], 3),
noise_figure=round(measurement_result["sensor_cal"]["noise_figure"], 3),
temperature=round(measurement_result["sensor_cal"]["temperature"], 1),
reference=DATA_REFERENCE_POINT,
datetime=self.sensor.sensor_calibration_data["datetime"],
gain=round(measurement_result["applied_calibration"]["gain"], 3),
noise_figure=round(
measurement_result["applied_calibration"]["noise_figure"], 3
),
temperature=round(
self.sensor.sensor_calibration_data["temperature"], 1
),
reference=measurement_result["reference"],
)
if "compression_point" in measurement_result["applied_calibration"]:
capture_segment.sensor_calibration.compression_point = measurement_result[
"applied_calibration"
]["compression_point"]
self.sigmf_builder.add_capture(capture_segment)

def get_sigmf_builder(
Expand Down
17 changes: 6 additions & 11 deletions scos_actions/actions/acquire_single_freq_fft.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@
from numpy import float32, ndarray

from scos_actions.actions.interfaces.measurement_action import MeasurementAction
from scos_actions.hardware.mocks.mock_gps import MockGPS
from scos_actions.metadata.structs import ntia_algorithm
from scos_actions.signal_processing.fft import (
get_fft,
Expand Down Expand Up @@ -153,10 +152,6 @@ def __init__(self, parameters: dict):
self.classification = get_parameter(CLASSIFICATION, self.parameters)
self.cal_adjust = get_parameter(CAL_ADJUST, self.parameters)
assert isinstance(self.cal_adjust, bool)
if self.cal_adjust:
self.data_reference = "calibration terminal"
else:
self.data_reference = "signal analyzer input"
# FFT setup
self.fft_detector = create_statistical_detector(
"M4sDetector", ["min", "max", "mean", "median", "sample"]
Expand All @@ -169,7 +164,7 @@ def __init__(self, parameters: dict):
def execute(self, schedule_entry: dict, task_id: int) -> dict:
# Acquire IQ data and generate M4S result
measurement_result = self.acquire_data(
self.num_samples, self.nskip, self.cal_adjust
self.num_samples, self.nskip, self.cal_adjust, cal_params=self.parameters
)
# Actual sample rate may differ from configured value
sample_rate_Hz = measurement_result["sample_rate"]
Expand All @@ -184,13 +179,13 @@ def execute(self, schedule_entry: dict, task_id: int) -> dict:
# Build capture metadata
sigan_settings = self.get_sigan_settings(measurement_result)
logger.debug(f"sigan settings:{sigan_settings}")
measurement_result["duration_ms"] = round(
(self.num_samples / sample_rate_Hz) * 1000
)
measurement_result["capture_segment"] = self.create_capture_segment(
sample_start=0,
start_time=measurement_result["capture_time"],
center_frequency_Hz=self.frequency_Hz,
duration_ms=round((self.num_samples / sample_rate_Hz) * 1000),
overload=measurement_result["overload"],
sigan_settings=sigan_settings,
measurement_result=measurement_result,
)

return measurement_result
Expand Down Expand Up @@ -267,7 +262,7 @@ def create_metadata(self, measurement_result: dict, recording: int = None) -> No
x_stop=[frequencies[-1]],
x_step=[frequencies[1] - frequencies[0]],
y_units="dBm",
reference=self.data_reference,
reference=measurement_result["reference"],
description=(
"Results of min, max, mean, and median statistical detectors, "
+ f"along with a random sampling, from a set of {self.nffts} "
Expand Down
10 changes: 4 additions & 6 deletions scos_actions/actions/acquire_single_freq_tdomain_iq.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@

from scos_actions import utils
from scos_actions.actions.interfaces.measurement_action import MeasurementAction
from scos_actions.hardware.mocks.mock_gps import MockGPS
from scos_actions.utils import get_parameter

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -84,7 +83,9 @@ def execute(self, schedule_entry: dict, task_id: int) -> dict:
# Use the sigan's actual reported instead of requested sample rate
sample_rate = self.sensor.signal_analyzer.sample_rate
num_samples = int(sample_rate * self.duration_ms * 1e-3)
measurement_result = self.acquire_data(num_samples, self.nskip, self.cal_adjust)
measurement_result = self.acquire_data(
num_samples, self.nskip, self.cal_adjust, cal_params=self.parameters
)
end_time = utils.get_datetime_str_now()
measurement_result.update(self.parameters)
measurement_result["end_time"] = end_time
Expand All @@ -94,11 +95,8 @@ def execute(self, schedule_entry: dict, task_id: int) -> dict:
logger.debug(f"sigan settings:{sigan_settings}")
measurement_result["capture_segment"] = self.create_capture_segment(
sample_start=0,
start_time=measurement_result["capture_time"],
center_frequency_Hz=self.frequency_Hz,
duration_ms=self.duration_ms,
overload=measurement_result["overload"],
sigan_settings=sigan_settings,
measurement_result=measurement_result,
)
return measurement_result

Expand Down
30 changes: 5 additions & 25 deletions scos_actions/actions/acquire_stepped_freq_tdomain_iq.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,39 +100,19 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int):
cal_adjust = get_parameter(CAL_ADJUST, measurement_params)
sample_rate = self.sensor.signal_analyzer.sample_rate
num_samples = int(sample_rate * duration_ms * 1e-3)
measurement_result = super().acquire_data(num_samples, nskip, cal_adjust)
measurement_result = super().acquire_data(
num_samples, nskip, cal_adjust, cal_params=measurement_params
)
measurement_result.update(measurement_params)
end_time = utils.get_datetime_str_now()
measurement_result["end_time"] = end_time
measurement_result["task_id"] = task_id
measurement_result["name"] = self.name
measurement_result["classification"] = self.classification
sigan_settings = self.get_sigan_settings(measurement_result)
capture_segment = CaptureSegment(
sample_start=0,
global_index=saved_samples,
frequency=measurement_params[FREQUENCY],
datetime=measurement_result["capture_time"],
duration=duration_ms,
overload=measurement_result["overload"],
sigan_settings=sigan_settings,
capture_segment = self.create_capture_segment(
0, sigan_settings, measurement_result
)
sigan_cal = self.sensor.signal_analyzer.sigan_calibration_data
sensor_cal = self.sensor.signal_analyzer.sensor_calibration_data
if sigan_cal is not None:
if "1db_compression_point" in sigan_cal:
sigan_cal["compression_point"] = sigan_cal.pop(
"1db_compression_point"
)
capture_segment.sigan_calibration = ntia_sensor.Calibration(**sigan_cal)
if sensor_cal is not None:
if "1db_compression_point" in sensor_cal:
sensor_cal["compression_point"] = sensor_cal.pop(
"1db_compression_point"
)
capture_segment.sensor_calibration = ntia_sensor.Calibration(
**sensor_cal
)
measurement_result["capture_segment"] = capture_segment

self.create_metadata(measurement_result, recording_id)
Expand Down
Loading

0 comments on commit 7ca107a

Please sign in to comment.