diff --git a/scos_actions/actions/acquire_sea_data_product.py b/scos_actions/actions/acquire_sea_data_product.py index 5addda5e..8d1391ee 100644 --- a/scos_actions/actions/acquire_sea_data_product.py +++ b/scos_actions/actions/acquire_sea_data_product.py @@ -538,7 +538,7 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): logger.debug(f"Spawned {NUM_ACTORS} supervisor actors in {toc-tic:.2f} s") # Collect all IQ data and spawn data product computation processes - psd_refs, pvt_refs, pfp_refs, apd_refs, cpu_speed, reference_points = [], [], [], [], [], [] + data_products_refs, cpu_speed, reference_points = [], [], [] capture_tic = perf_counter() for i, parameters in enumerate(self.iteration_params): @@ -549,11 +549,7 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): ) # Start data product processing but do not block next IQ capture tic = perf_counter() - fft_reference, pvt_reference, pfp_reference, apd_reference = iq_processors[i % NUM_ACTORS].run.remote(measurement_result["data"]) - psd_refs.append(fft_reference) - pvt_refs.append(pvt_reference) - pfp_refs.append(pfp_reference) - apd_refs.append(apd_reference) + data_products_refs.append(iq_processors[i % NUM_ACTORS].run.remote(measurement_result["data"])) del measurement_result["data"] toc = perf_counter() @@ -584,18 +580,18 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): [], ) result_tic = perf_counter() - channel_count = len(psd_refs) + channel_count = len(data_products_refs) logger.debug(f"Have {channel_count} channel results") - for index in range(len(psd_refs)): + for index in range(len(data_products_refs)): logger.debug(f"Working on channel {index}") channel_data = [] # Now block until the data is ready - apd_refs.append(apd_reference) - - psd_data = ray.get(psd_refs[index]) + dp_refs_tuple = ray.get(data_products_refs[i]) + psd_ref, pvt_ref, pfp_ref, apd_ref = dp_refs_tuple + psd_data = ray.get(psd_ref) logger.debug(f"PSD: {psd_data}") channel_data.extend(psd_data) - pvt_data = ray.get(pvt_refs[index]) + pvt_data = ray.get(pvt_ref) logger.debug(f"PVT DATA: {pvt_data}") # Power-vs-Time results, a tuple of arrays logger.debug("splitting tuple") @@ -608,13 +604,13 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): median_ch_pwrs.append(DATA_TYPE(summaries[3])) del summaries - pfp_data = ray.get(pfp_refs[index]) + pfp_data = ray.get(pfp_ref) logger.debug(f"PFP: {pfp_data}") channel_data.extend(pfp_data) # APD result: append instead of extend, # since the result is a single 1D array - apd_data = ray.get(apd_refs[index]) + apd_data = ray.get(apd_ref) logger.debug(f"APD: {apd_data}") channel_data.append(apd_data)