From 921dcdfc5056c2b293986be1a3de8914d25b8054 Mon Sep 17 00:00:00 2001 From: Justin Haze Date: Mon, 30 Sep 2024 08:12:31 -0600 Subject: [PATCH] debugging data product --- .../actions/acquire_sea_data_product.py | 54 ++++++++++--------- 1 file changed, 28 insertions(+), 26 deletions(-) diff --git a/scos_actions/actions/acquire_sea_data_product.py b/scos_actions/actions/acquire_sea_data_product.py index 2b8e0af9..b2116151 100644 --- a/scos_actions/actions/acquire_sea_data_product.py +++ b/scos_actions/actions/acquire_sea_data_product.py @@ -362,7 +362,7 @@ def run(self, iq: ray.ObjectRef) -> np.ndarray: # compute statistics first by cycle mean_power = power_bins.mean(axis=0) max_power = power_bins.max(axis=0) - del power_bins + #del power_bins # then do the detector pfp = np.array( @@ -438,7 +438,7 @@ def run(self, iqdata: np.ndarray) -> list: # Compute PSD, PVT, PFP, and APD concurrently. # Do not wait until they finish. Yield references to their results. yield [worker.run.remote(iqdata) for worker in self.workers] - del iqdata + #del iqdata class NasctnSeaDataProduct(Action): @@ -556,7 +556,7 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): dp_procs.append( iq_processors[i % NUM_ACTORS].run.remote(measurement_result["data"]) ) - del measurement_result["data"] + #del measurement_result["data"] toc = perf_counter() logger.debug(f"IQ data delivered for processing in {toc-tic:.2f} s") # Create capture segment with channel-specific metadata before sigan is reconfigured @@ -589,31 +589,33 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): # Retrieve object references for channel data channel_data_refs = ray.get(channel_data_process) channel_data = [] - for i, data_ref in enumerate(channel_data_refs): - # Now block until the data is ready - data = ray.get(data_ref) - if i == 1: - # Power-vs-Time results, a tuple of arrays - data, summaries = data # Split the tuple - max_max_ch_pwrs.append(DATA_TYPE(summaries[0])) - med_mean_ch_pwrs.append(DATA_TYPE(summaries[1])) - mean_ch_pwrs.append(DATA_TYPE(summaries[2])) - median_ch_pwrs.append(DATA_TYPE(summaries[3])) - del summaries - if i == 3: # Separate condition is intentional - # APD result: append instead of extend, - # since the result is a single 1D array - channel_data.append(data) - else: - # For 2D arrays (PSD, PVT, PFP) - channel_data.extend(data) + for object_ref in channel_data_refs: # only one + data_arr = ray.get(object_ref) + for i, data_ref in enumerate(data_arr): + # Now block until the data is ready + data = ray.get(data_ref) + if i == 1: + # Power-vs-Time results, a tuple of arrays + data, summaries = data # Split the tuple + max_max_ch_pwrs.append(DATA_TYPE(summaries[0])) + med_mean_ch_pwrs.append(DATA_TYPE(summaries[1])) + mean_ch_pwrs.append(DATA_TYPE(summaries[2])) + median_ch_pwrs.append(DATA_TYPE(summaries[3])) + #del summaries + if i == 3: # Separate condition is intentional + # APD result: append instead of extend, + # since the result is a single 1D array + channel_data.append(data) + else: + # For 2D arrays (PSD, PVT, PFP) + channel_data.extend(data) toc = perf_counter() logger.debug(f"Waited {toc-tic} s for channel data") all_data.extend(NasctnSeaDataProduct.transform_data(channel_data)) for ray_actor in iq_processors: ray.kill(ray_actor) result_toc = perf_counter() - del dp_procs, iq_processors, channel_data, channel_data_refs + #del dp_procs, iq_processors, channel_data, channel_data_refs logger.debug(f"Got all processed data in {result_toc-result_tic:.2f} s") # Build metadata and convert data to compressed bytes @@ -735,9 +737,9 @@ def capture_diagnostics( switch_diag["door_closed"] = not bool(all_switch_status["door_state"]) # Read preselector sensors - ps_diag = sensor.preselector.get_status() - del ps_diag["name"] - del ps_diag["healthy"] + #ps_diag = sensor.preselector.get_status() + #del ps_diag["name"] + #del ps_diag["healthy"] # Read computer performance metrics cpu_diag = { # Start with CPU min/max/mean speeds @@ -813,7 +815,7 @@ def capture_diagnostics( logger.debug(f"Got all diagnostics in {toc-tic} s") diagnostics = { "datetime": utils.get_datetime_str_now(), - "preselector": ntia_diagnostics.Preselector(**ps_diag), + #"preselector": ntia_diagnostics.Preselector(**ps_diag), "spu": ntia_diagnostics.SPU(**switch_diag), "computer": ntia_diagnostics.Computer(**cpu_diag), "software": ntia_diagnostics.Software(**software_diag),