Skip to content

Commit

Permalink
debugging data product
Browse files Browse the repository at this point in the history
  • Loading branch information
jhazentia committed Sep 30, 2024
1 parent 5a01af5 commit 921dcdf
Showing 1 changed file with 28 additions and 26 deletions.
54 changes: 28 additions & 26 deletions scos_actions/actions/acquire_sea_data_product.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ def run(self, iq: ray.ObjectRef) -> np.ndarray:
# compute statistics first by cycle
mean_power = power_bins.mean(axis=0)
max_power = power_bins.max(axis=0)
del power_bins
#del power_bins

# then do the detector
pfp = np.array(
Expand Down Expand Up @@ -438,7 +438,7 @@ def run(self, iqdata: np.ndarray) -> list:
# Compute PSD, PVT, PFP, and APD concurrently.
# Do not wait until they finish. Yield references to their results.
yield [worker.run.remote(iqdata) for worker in self.workers]
del iqdata
#del iqdata


class NasctnSeaDataProduct(Action):
Expand Down Expand Up @@ -556,7 +556,7 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int):
dp_procs.append(
iq_processors[i % NUM_ACTORS].run.remote(measurement_result["data"])
)
del measurement_result["data"]
#del measurement_result["data"]
toc = perf_counter()
logger.debug(f"IQ data delivered for processing in {toc-tic:.2f} s")
# Create capture segment with channel-specific metadata before sigan is reconfigured
Expand Down Expand Up @@ -589,31 +589,33 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int):
# Retrieve object references for channel data
channel_data_refs = ray.get(channel_data_process)
channel_data = []
for i, data_ref in enumerate(channel_data_refs):
# Now block until the data is ready
data = ray.get(data_ref)
if i == 1:
# Power-vs-Time results, a tuple of arrays
data, summaries = data # Split the tuple
max_max_ch_pwrs.append(DATA_TYPE(summaries[0]))
med_mean_ch_pwrs.append(DATA_TYPE(summaries[1]))
mean_ch_pwrs.append(DATA_TYPE(summaries[2]))
median_ch_pwrs.append(DATA_TYPE(summaries[3]))
del summaries
if i == 3: # Separate condition is intentional
# APD result: append instead of extend,
# since the result is a single 1D array
channel_data.append(data)
else:
# For 2D arrays (PSD, PVT, PFP)
channel_data.extend(data)
for object_ref in channel_data_refs: # only one
data_arr = ray.get(object_ref)
for i, data_ref in enumerate(data_arr):
# Now block until the data is ready
data = ray.get(data_ref)
if i == 1:
# Power-vs-Time results, a tuple of arrays
data, summaries = data # Split the tuple
max_max_ch_pwrs.append(DATA_TYPE(summaries[0]))
med_mean_ch_pwrs.append(DATA_TYPE(summaries[1]))
mean_ch_pwrs.append(DATA_TYPE(summaries[2]))
median_ch_pwrs.append(DATA_TYPE(summaries[3]))
#del summaries
if i == 3: # Separate condition is intentional
# APD result: append instead of extend,
# since the result is a single 1D array
channel_data.append(data)
else:
# For 2D arrays (PSD, PVT, PFP)
channel_data.extend(data)
toc = perf_counter()
logger.debug(f"Waited {toc-tic} s for channel data")
all_data.extend(NasctnSeaDataProduct.transform_data(channel_data))
for ray_actor in iq_processors:
ray.kill(ray_actor)
result_toc = perf_counter()
del dp_procs, iq_processors, channel_data, channel_data_refs
#del dp_procs, iq_processors, channel_data, channel_data_refs
logger.debug(f"Got all processed data in {result_toc-result_tic:.2f} s")

# Build metadata and convert data to compressed bytes
Expand Down Expand Up @@ -735,9 +737,9 @@ def capture_diagnostics(
switch_diag["door_closed"] = not bool(all_switch_status["door_state"])

# Read preselector sensors
ps_diag = sensor.preselector.get_status()
del ps_diag["name"]
del ps_diag["healthy"]
#ps_diag = sensor.preselector.get_status()
#del ps_diag["name"]
#del ps_diag["healthy"]

# Read computer performance metrics
cpu_diag = { # Start with CPU min/max/mean speeds
Expand Down Expand Up @@ -813,7 +815,7 @@ def capture_diagnostics(
logger.debug(f"Got all diagnostics in {toc-tic} s")
diagnostics = {
"datetime": utils.get_datetime_str_now(),
"preselector": ntia_diagnostics.Preselector(**ps_diag),
#"preselector": ntia_diagnostics.Preselector(**ps_diag),
"spu": ntia_diagnostics.SPU(**switch_diag),
"computer": ntia_diagnostics.Computer(**cpu_diag),
"software": ntia_diagnostics.Software(**software_diag),
Expand Down

0 comments on commit 921dcdf

Please sign in to comment.