Skip to content

Commit

Permalink
handle tuple returned from IQ processor.
Browse files Browse the repository at this point in the history
  • Loading branch information
dboulware committed Apr 12, 2024
1 parent 0332a70 commit c0223d0
Showing 1 changed file with 10 additions and 14 deletions.
24 changes: 10 additions & 14 deletions scos_actions/actions/acquire_sea_data_product.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int):
logger.debug(f"Spawned {NUM_ACTORS} supervisor actors in {toc-tic:.2f} s")

# Collect all IQ data and spawn data product computation processes
psd_refs, pvt_refs, pfp_refs, apd_refs, cpu_speed, reference_points = [], [], [], [], [], []
data_products_refs, cpu_speed, reference_points = [], [], []
capture_tic = perf_counter()

for i, parameters in enumerate(self.iteration_params):
Expand All @@ -549,11 +549,7 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int):
)
# Start data product processing but do not block next IQ capture
tic = perf_counter()
fft_reference, pvt_reference, pfp_reference, apd_reference = iq_processors[i % NUM_ACTORS].run.remote(measurement_result["data"])
psd_refs.append(fft_reference)
pvt_refs.append(pvt_reference)
pfp_refs.append(pfp_reference)
apd_refs.append(apd_reference)
data_products_refs.append(iq_processors[i % NUM_ACTORS].run.remote(measurement_result["data"]))

del measurement_result["data"]
toc = perf_counter()
Expand Down Expand Up @@ -584,18 +580,18 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int):
[],
)
result_tic = perf_counter()
channel_count = len(psd_refs)
channel_count = len(data_products_refs)
logger.debug(f"Have {channel_count} channel results")
for index in range(len(psd_refs)):
for index in range(len(data_products_refs)):
logger.debug(f"Working on channel {index}")
channel_data = []
# Now block until the data is ready
apd_refs.append(apd_reference)

psd_data = ray.get(psd_refs[index])
dp_refs_tuple = ray.get(data_products_refs[i])
psd_ref, pvt_ref, pfp_ref, apd_ref = dp_refs_tuple
psd_data = ray.get(psd_ref)
logger.debug(f"PSD: {psd_data}")
channel_data.extend(psd_data)
pvt_data = ray.get(pvt_refs[index])
pvt_data = ray.get(pvt_ref)
logger.debug(f"PVT DATA: {pvt_data}")
# Power-vs-Time results, a tuple of arrays
logger.debug("splitting tuple")
Expand All @@ -608,13 +604,13 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int):
median_ch_pwrs.append(DATA_TYPE(summaries[3]))
del summaries

pfp_data = ray.get(pfp_refs[index])
pfp_data = ray.get(pfp_ref)
logger.debug(f"PFP: {pfp_data}")
channel_data.extend(pfp_data)

# APD result: append instead of extend,
# since the result is a single 1D array
apd_data = ray.get(apd_refs[index])
apd_data = ray.get(apd_ref)
logger.debug(f"APD: {apd_data}")
channel_data.append(apd_data)

Expand Down

0 comments on commit c0223d0

Please sign in to comment.