diff --git a/scos_actions/actions/acquire_sea_data_product.py b/scos_actions/actions/acquire_sea_data_product.py index f85962a9..5addda5e 100644 --- a/scos_actions/actions/acquire_sea_data_product.py +++ b/scos_actions/actions/acquire_sea_data_product.py @@ -417,12 +417,7 @@ def __init__(self, params: dict, iir_sos: np.ndarray): self.apd_worker = AmplitudeProbabilityDistribution.remote( params[APD_BIN_SIZE_DB], params[APD_MIN_BIN_DBM], params[APD_MAX_BIN_DBM] ) - self.workers = [ - self.fft_worker, - self.pvt_worker, - self.pfp_worker, - self.apd_worker, - ] + del params def run(self, iqdata: np.ndarray) -> list: @@ -436,9 +431,11 @@ def run(self, iqdata: np.ndarray) -> list: # Filter IQ and place it in the object store iqdata = ray.put(sosfilt(self.iir_sos, iqdata)) # Compute PSD, PVT, PFP, and APD concurrently. - # Do not wait until they finish. Yield references to their results. - yield [worker.run.remote(iqdata) for worker in self.workers] - del iqdata + fft_reference = self.fft_worker.run.remote(iqdata) + pvt_reference = self.pvt_worker.run.remote(iqdata) + pfp_reference = self.pfp_worker.run.remote(iqdata) + apd_reference = self.apd_worker.run.remote(iqdata) + return fft_reference, pvt_reference, pfp_reference, apd_reference class NasctnSeaDataProduct(Action): @@ -541,7 +538,7 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): logger.debug(f"Spawned {NUM_ACTORS} supervisor actors in {toc-tic:.2f} s") # Collect all IQ data and spawn data product computation processes - dp_procs, cpu_speed, reference_points = [], [], [] + psd_refs, pvt_refs, pfp_refs, apd_refs, cpu_speed, reference_points = [], [], [], [], [], [] capture_tic = perf_counter() for i, parameters in enumerate(self.iteration_params): @@ -552,10 +549,12 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): ) # Start data product processing but do not block next IQ capture tic = perf_counter() + fft_reference, pvt_reference, pfp_reference, apd_reference = iq_processors[i % NUM_ACTORS].run.remote(measurement_result["data"]) + psd_refs.append(fft_reference) + pvt_refs.append(pvt_reference) + pfp_refs.append(pfp_reference) + apd_refs.append(apd_reference) - dp_procs.append( - iq_processors[i % NUM_ACTORS].run.remote(measurement_result["data"]) - ) del measurement_result["data"] toc = perf_counter() logger.debug(f"IQ data delivered for processing in {toc-tic:.2f} s") @@ -585,49 +584,48 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): [], ) result_tic = perf_counter() - logger.debug(f"Have {len(dp_procs)} results") - for channel_data_process in dp_procs: - # Retrieve object references for channel data - channel_data_refs = ray.get(channel_data_process) - logger.debug(f"channel_data_refs is {type(channel_data_refs)}: {channel_data_refs}") + channel_count = len(psd_refs) + logger.debug(f"Have {channel_count} channel results") + for index in range(len(psd_refs)): + logger.debug(f"Working on channel {index}") channel_data = [] - for i, data_ref in enumerate(channel_data_refs): - # Now block until the data is ready - logger.debug(f"{i} Requesting object {data_ref}") - data_products = ray.get(data_ref) - logger.debug(f"data products is {type(data_products)}") - for dp_num, data_product_reference in enumerate(data_products): - logger.debug(f"Getting dp {dp_num}, {data_product_reference}") - data_product = ray.get(data_product_reference) - logger.debug(f"{dp_num} data product is: {type(data_product)}" ) - if dp_num == 1: - # Power-vs-Time results, a tuple of arrays - logger.debug("splitting tuple") - data, summaries = data_product # Split the tuple - logger.debug(f"data is {type(data)}: {data}") - logger.debug(f"summaries is {type(summaries)}: {summaries}") - max_max_ch_pwrs.append(DATA_TYPE(summaries[0])) - med_mean_ch_pwrs.append(DATA_TYPE(summaries[1])) - mean_ch_pwrs.append(DATA_TYPE(summaries[2])) - median_ch_pwrs.append(DATA_TYPE(summaries[3])) - del summaries - elif dp_num == 3: # Separate condition is intentional - # APD result: append instead of extend, - # since the result is a single 1D array - logger.debug("appending data product") - channel_data.append(data_product) - else: - # For 2D arrays (PSD, PVT, PFP) - logger.debug(f"dp {dp_num} extending channel data") - channel_data.extend(data_product) + # Now block until the data is ready + apd_refs.append(apd_reference) + + psd_data = ray.get(psd_refs[index]) + logger.debug(f"PSD: {psd_data}") + channel_data.extend(psd_data) + pvt_data = ray.get(pvt_refs[index]) + logger.debug(f"PVT DATA: {pvt_data}") + # Power-vs-Time results, a tuple of arrays + logger.debug("splitting tuple") + data, summaries = pvt_data # Split the tuple + logger.debug(f"data is {type(data)}: {data}") + logger.debug(f"summaries is {type(summaries)}: {summaries}") + max_max_ch_pwrs.append(DATA_TYPE(summaries[0])) + med_mean_ch_pwrs.append(DATA_TYPE(summaries[1])) + mean_ch_pwrs.append(DATA_TYPE(summaries[2])) + median_ch_pwrs.append(DATA_TYPE(summaries[3])) + del summaries + + pfp_data = ray.get(pfp_refs[index]) + logger.debug(f"PFP: {pfp_data}") + channel_data.extend(pfp_data) + + # APD result: append instead of extend, + # since the result is a single 1D array + apd_data = ray.get(apd_refs[index]) + logger.debug(f"APD: {apd_data}") + channel_data.append(apd_data) toc = perf_counter() logger.debug(f"Waited {toc-tic} s for channel data") all_data.extend(NasctnSeaDataProduct.transform_data(channel_data)) + for ray_actor in iq_processors: ray.kill(ray_actor) result_toc = perf_counter() - del dp_procs, iq_processors, channel_data, channel_data_refs + del iq_processors, channel_data logger.debug(f"Got all processed data in {result_toc-result_tic:.2f} s") # Build metadata and convert data to compressed bytes