Skip to content

Commit

Permalink
return file references from IQ processor.
Browse files Browse the repository at this point in the history
  • Loading branch information
dboulware committed Apr 12, 2024
1 parent a1e95ed commit 0332a70
Showing 1 changed file with 46 additions and 48 deletions.
94 changes: 46 additions & 48 deletions scos_actions/actions/acquire_sea_data_product.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,12 +417,7 @@ def __init__(self, params: dict, iir_sos: np.ndarray):
self.apd_worker = AmplitudeProbabilityDistribution.remote(
params[APD_BIN_SIZE_DB], params[APD_MIN_BIN_DBM], params[APD_MAX_BIN_DBM]
)
self.workers = [
self.fft_worker,
self.pvt_worker,
self.pfp_worker,
self.apd_worker,
]

del params

def run(self, iqdata: np.ndarray) -> list:
Expand All @@ -436,9 +431,11 @@ def run(self, iqdata: np.ndarray) -> list:
# Filter IQ and place it in the object store
iqdata = ray.put(sosfilt(self.iir_sos, iqdata))
# Compute PSD, PVT, PFP, and APD concurrently.
# Do not wait until they finish. Yield references to their results.
yield [worker.run.remote(iqdata) for worker in self.workers]
del iqdata
fft_reference = self.fft_worker.run.remote(iqdata)
pvt_reference = self.pvt_worker.run.remote(iqdata)
pfp_reference = self.pfp_worker.run.remote(iqdata)
apd_reference = self.apd_worker.run.remote(iqdata)
return fft_reference, pvt_reference, pfp_reference, apd_reference


class NasctnSeaDataProduct(Action):
Expand Down Expand Up @@ -541,7 +538,7 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int):
logger.debug(f"Spawned {NUM_ACTORS} supervisor actors in {toc-tic:.2f} s")

# Collect all IQ data and spawn data product computation processes
dp_procs, cpu_speed, reference_points = [], [], []
psd_refs, pvt_refs, pfp_refs, apd_refs, cpu_speed, reference_points = [], [], [], [], [], []
capture_tic = perf_counter()

for i, parameters in enumerate(self.iteration_params):
Expand All @@ -552,10 +549,12 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int):
)
# Start data product processing but do not block next IQ capture
tic = perf_counter()
fft_reference, pvt_reference, pfp_reference, apd_reference = iq_processors[i % NUM_ACTORS].run.remote(measurement_result["data"])
psd_refs.append(fft_reference)
pvt_refs.append(pvt_reference)
pfp_refs.append(pfp_reference)
apd_refs.append(apd_reference)

dp_procs.append(
iq_processors[i % NUM_ACTORS].run.remote(measurement_result["data"])
)
del measurement_result["data"]
toc = perf_counter()
logger.debug(f"IQ data delivered for processing in {toc-tic:.2f} s")
Expand Down Expand Up @@ -585,49 +584,48 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int):
[],
)
result_tic = perf_counter()
logger.debug(f"Have {len(dp_procs)} results")
for channel_data_process in dp_procs:
# Retrieve object references for channel data
channel_data_refs = ray.get(channel_data_process)
logger.debug(f"channel_data_refs is {type(channel_data_refs)}: {channel_data_refs}")
channel_count = len(psd_refs)
logger.debug(f"Have {channel_count} channel results")
for index in range(len(psd_refs)):
logger.debug(f"Working on channel {index}")
channel_data = []
for i, data_ref in enumerate(channel_data_refs):
# Now block until the data is ready
logger.debug(f"{i} Requesting object {data_ref}")
data_products = ray.get(data_ref)
logger.debug(f"data products is {type(data_products)}")
for dp_num, data_product_reference in enumerate(data_products):
logger.debug(f"Getting dp {dp_num}, {data_product_reference}")
data_product = ray.get(data_product_reference)
logger.debug(f"{dp_num} data product is: {type(data_product)}" )
if dp_num == 1:
# Power-vs-Time results, a tuple of arrays
logger.debug("splitting tuple")
data, summaries = data_product # Split the tuple
logger.debug(f"data is {type(data)}: {data}")
logger.debug(f"summaries is {type(summaries)}: {summaries}")
max_max_ch_pwrs.append(DATA_TYPE(summaries[0]))
med_mean_ch_pwrs.append(DATA_TYPE(summaries[1]))
mean_ch_pwrs.append(DATA_TYPE(summaries[2]))
median_ch_pwrs.append(DATA_TYPE(summaries[3]))
del summaries
elif dp_num == 3: # Separate condition is intentional
# APD result: append instead of extend,
# since the result is a single 1D array
logger.debug("appending data product")
channel_data.append(data_product)
else:
# For 2D arrays (PSD, PVT, PFP)
logger.debug(f"dp {dp_num} extending channel data")
channel_data.extend(data_product)
# Now block until the data is ready
apd_refs.append(apd_reference)

psd_data = ray.get(psd_refs[index])
logger.debug(f"PSD: {psd_data}")
channel_data.extend(psd_data)
pvt_data = ray.get(pvt_refs[index])
logger.debug(f"PVT DATA: {pvt_data}")
# Power-vs-Time results, a tuple of arrays
logger.debug("splitting tuple")
data, summaries = pvt_data # Split the tuple
logger.debug(f"data is {type(data)}: {data}")
logger.debug(f"summaries is {type(summaries)}: {summaries}")
max_max_ch_pwrs.append(DATA_TYPE(summaries[0]))
med_mean_ch_pwrs.append(DATA_TYPE(summaries[1]))
mean_ch_pwrs.append(DATA_TYPE(summaries[2]))
median_ch_pwrs.append(DATA_TYPE(summaries[3]))
del summaries

pfp_data = ray.get(pfp_refs[index])
logger.debug(f"PFP: {pfp_data}")
channel_data.extend(pfp_data)

# APD result: append instead of extend,
# since the result is a single 1D array
apd_data = ray.get(apd_refs[index])
logger.debug(f"APD: {apd_data}")
channel_data.append(apd_data)

toc = perf_counter()
logger.debug(f"Waited {toc-tic} s for channel data")
all_data.extend(NasctnSeaDataProduct.transform_data(channel_data))

for ray_actor in iq_processors:
ray.kill(ray_actor)
result_toc = perf_counter()
del dp_procs, iq_processors, channel_data, channel_data_refs
del iq_processors, channel_data
logger.debug(f"Got all processed data in {result_toc-result_tic:.2f} s")

# Build metadata and convert data to compressed bytes
Expand Down

0 comments on commit 0332a70

Please sign in to comment.