From cd2b1378ba19870c1f4bf3b0492d81dfa2421f3e Mon Sep 17 00:00:00 2001 From: Doug Boulware Date: Thu, 11 Apr 2024 11:09:17 -0600 Subject: [PATCH 01/26] ray 2.10.0 --- pyproject.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index a34afe30..33699b6e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,7 +50,7 @@ dependencies = [ "numpy>=1.22.0", "psutil>=5.9.4", "python-dateutil>=2.0", - "ray>=2.6.3,<2.8.0", + "ray==2.10.0", "ruamel.yaml>=0.15", "scipy>=1.8.0", "sigmf @ git+https://github.com/NTIA/SigMF@multi-recording-archive", @@ -65,7 +65,7 @@ test = [ dev = [ "hatchling>=1.14.1,<2.0", "pre-commit>=3.3.1,<4.0", - "ray[default]>=2.4.0", + "ray[default]>=2.10.0", "scos-actions[test]", ] From 857c842c406c44da287d642219fcdcfddb7dbf64 Mon Sep 17 00:00:00 2001 From: Doug Boulware Date: Thu, 11 Apr 2024 11:52:38 -0600 Subject: [PATCH 02/26] debugging --- scos_actions/actions/acquire_sea_data_product.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/scos_actions/actions/acquire_sea_data_product.py b/scos_actions/actions/acquire_sea_data_product.py index d7b54c99..c526e3da 100644 --- a/scos_actions/actions/acquire_sea_data_product.py +++ b/scos_actions/actions/acquire_sea_data_product.py @@ -591,7 +591,9 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): channel_data = [] for i, data_ref in enumerate(channel_data_refs): # Now block until the data is ready + logger.debug(f"{i} Requesting object {data_ref}") data = ray.get(data_ref) + logger.debug("ray get returned " + type(data)) if i == 1: # Power-vs-Time results, a tuple of arrays data, summaries = data # Split the tuple From 4b4dfc56a40bd37fa3383b13de80c33e85806ee0 Mon Sep 17 00:00:00 2001 From: Doug Boulware Date: Thu, 11 Apr 2024 11:58:01 -0600 Subject: [PATCH 03/26] fix logging error. --- scos_actions/actions/acquire_sea_data_product.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scos_actions/actions/acquire_sea_data_product.py b/scos_actions/actions/acquire_sea_data_product.py index c526e3da..ad6a6e55 100644 --- a/scos_actions/actions/acquire_sea_data_product.py +++ b/scos_actions/actions/acquire_sea_data_product.py @@ -593,7 +593,7 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): # Now block until the data is ready logger.debug(f"{i} Requesting object {data_ref}") data = ray.get(data_ref) - logger.debug("ray get returned " + type(data)) + logger.debug(f"ray get returned {type(data)}") if i == 1: # Power-vs-Time results, a tuple of arrays data, summaries = data # Split the tuple From be84b572df2e523042885b1375e2c528e6542fc3 Mon Sep 17 00:00:00 2001 From: Doug Boulware Date: Thu, 11 Apr 2024 12:18:24 -0600 Subject: [PATCH 04/26] get remote objects. --- scos_actions/actions/acquire_sea_data_product.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/scos_actions/actions/acquire_sea_data_product.py b/scos_actions/actions/acquire_sea_data_product.py index ad6a6e55..d36b2f2e 100644 --- a/scos_actions/actions/acquire_sea_data_product.py +++ b/scos_actions/actions/acquire_sea_data_product.py @@ -597,6 +597,8 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): if i == 1: # Power-vs-Time results, a tuple of arrays data, summaries = data # Split the tuple + data = ray.get(data) + summaries = ray.get(summaries) max_max_ch_pwrs.append(DATA_TYPE(summaries[0])) med_mean_ch_pwrs.append(DATA_TYPE(summaries[1])) mean_ch_pwrs.append(DATA_TYPE(summaries[2])) @@ -605,10 +607,12 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): if i == 3: # Separate condition is intentional # APD result: append instead of extend, # since the result is a single 1D array - channel_data.append(data) + channel_data.append(ray.get(data)) else: # For 2D arrays (PSD, PVT, PFP) - channel_data.extend(data) + for d in data: + channel_data.append(ray.get(d)) + toc = perf_counter() logger.debug(f"Waited {toc-tic} s for channel data") all_data.extend(NasctnSeaDataProduct.transform_data(channel_data)) From 1052fc2b31df2e229645083da32ba2739d54ba58 Mon Sep 17 00:00:00 2001 From: Doug Boulware Date: Thu, 11 Apr 2024 12:33:03 -0600 Subject: [PATCH 05/26] debugging --- scos_actions/actions/acquire_sea_data_product.py | 1 + 1 file changed, 1 insertion(+) diff --git a/scos_actions/actions/acquire_sea_data_product.py b/scos_actions/actions/acquire_sea_data_product.py index d36b2f2e..bb34fa0d 100644 --- a/scos_actions/actions/acquire_sea_data_product.py +++ b/scos_actions/actions/acquire_sea_data_product.py @@ -611,6 +611,7 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): else: # For 2D arrays (PSD, PVT, PFP) for d in data: + logger.debug(f"Adding {type(d)}: {d}") channel_data.append(ray.get(d)) toc = perf_counter() From c8550a8204386f58a97532e745c464159854895d Mon Sep 17 00:00:00 2001 From: Doug Boulware Date: Thu, 11 Apr 2024 12:46:51 -0600 Subject: [PATCH 06/26] debugging --- scos_actions/actions/acquire_sea_data_product.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/scos_actions/actions/acquire_sea_data_product.py b/scos_actions/actions/acquire_sea_data_product.py index bb34fa0d..368b96af 100644 --- a/scos_actions/actions/acquire_sea_data_product.py +++ b/scos_actions/actions/acquire_sea_data_product.py @@ -597,7 +597,9 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): if i == 1: # Power-vs-Time results, a tuple of arrays data, summaries = data # Split the tuple + logger.debug(f"data is {type(data)}: {data}") data = ray.get(data) + logger.debug(f"summaries is {type(summaries)}: {summaries}") summaries = ray.get(summaries) max_max_ch_pwrs.append(DATA_TYPE(summaries[0])) med_mean_ch_pwrs.append(DATA_TYPE(summaries[1])) @@ -610,9 +612,10 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): channel_data.append(ray.get(data)) else: # For 2D arrays (PSD, PVT, PFP) - for d in data: - logger.debug(f"Adding {type(d)}: {d}") - channel_data.append(ray.get(d)) + logger.debug(f"Adding {type(d)}: {d}") + d = ray.get(d) + logger.debug(f"Remove obj was {type(d)}: {d}") + channel_data.extend(d) toc = perf_counter() logger.debug(f"Waited {toc-tic} s for channel data") From 29439141e092d51b8e684de088317b49dbe40032 Mon Sep 17 00:00:00 2001 From: Doug Boulware Date: Thu, 11 Apr 2024 12:55:48 -0600 Subject: [PATCH 07/26] debugging. --- scos_actions/actions/acquire_sea_data_product.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/scos_actions/actions/acquire_sea_data_product.py b/scos_actions/actions/acquire_sea_data_product.py index 368b96af..5ccef98e 100644 --- a/scos_actions/actions/acquire_sea_data_product.py +++ b/scos_actions/actions/acquire_sea_data_product.py @@ -612,10 +612,10 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): channel_data.append(ray.get(data)) else: # For 2D arrays (PSD, PVT, PFP) - logger.debug(f"Adding {type(d)}: {d}") - d = ray.get(d) - logger.debug(f"Remove obj was {type(d)}: {d}") - channel_data.extend(d) + for d in data: + d = ray.get(d) + logger.debug(f"Remote obj was {type(d)}: {d}") + channel_data.extend(d) toc = perf_counter() logger.debug(f"Waited {toc-tic} s for channel data") From f9b5b32148de648532734420f5dc0f5c97927056 Mon Sep 17 00:00:00 2001 From: Doug Boulware Date: Thu, 11 Apr 2024 13:13:24 -0600 Subject: [PATCH 08/26] debugging. --- scos_actions/actions/acquire_sea_data_product.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/scos_actions/actions/acquire_sea_data_product.py b/scos_actions/actions/acquire_sea_data_product.py index 5ccef98e..52278d75 100644 --- a/scos_actions/actions/acquire_sea_data_product.py +++ b/scos_actions/actions/acquire_sea_data_product.py @@ -585,9 +585,11 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): [], ) result_tic = perf_counter() + logger.debug(f"Have {len(dp_procs)} results") for channel_data_process in dp_procs: # Retrieve object references for channel data channel_data_refs = ray.get(channel_data_process) + logger.debug(f"channel_data_refs is {type(channel_data_refs)}: {channel_data_refs}") channel_data = [] for i, data_ref in enumerate(channel_data_refs): # Now block until the data is ready @@ -613,6 +615,7 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): else: # For 2D arrays (PSD, PVT, PFP) for d in data: + logger.debug(f"d is {type(d)}:{d}") d = ray.get(d) logger.debug(f"Remote obj was {type(d)}: {d}") channel_data.extend(d) From 3d964940ddf9b630c34649993f07d046e7679061 Mon Sep 17 00:00:00 2001 From: Doug Boulware Date: Thu, 11 Apr 2024 14:28:59 -0600 Subject: [PATCH 09/26] debugging --- .../actions/acquire_sea_data_product.py | 51 ++++++++++--------- 1 file changed, 26 insertions(+), 25 deletions(-) diff --git a/scos_actions/actions/acquire_sea_data_product.py b/scos_actions/actions/acquire_sea_data_product.py index 52278d75..0dac0909 100644 --- a/scos_actions/actions/acquire_sea_data_product.py +++ b/scos_actions/actions/acquire_sea_data_product.py @@ -594,31 +594,32 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): for i, data_ref in enumerate(channel_data_refs): # Now block until the data is ready logger.debug(f"{i} Requesting object {data_ref}") - data = ray.get(data_ref) - logger.debug(f"ray get returned {type(data)}") - if i == 1: - # Power-vs-Time results, a tuple of arrays - data, summaries = data # Split the tuple - logger.debug(f"data is {type(data)}: {data}") - data = ray.get(data) - logger.debug(f"summaries is {type(summaries)}: {summaries}") - summaries = ray.get(summaries) - max_max_ch_pwrs.append(DATA_TYPE(summaries[0])) - med_mean_ch_pwrs.append(DATA_TYPE(summaries[1])) - mean_ch_pwrs.append(DATA_TYPE(summaries[2])) - median_ch_pwrs.append(DATA_TYPE(summaries[3])) - del summaries - if i == 3: # Separate condition is intentional - # APD result: append instead of extend, - # since the result is a single 1D array - channel_data.append(ray.get(data)) - else: - # For 2D arrays (PSD, PVT, PFP) - for d in data: - logger.debug(f"d is {type(d)}:{d}") - d = ray.get(d) - logger.debug(f"Remote obj was {type(d)}: {d}") - channel_data.extend(d) + data_products = ray.get(data_ref) + logger.debug(f"data products is {type(data_products)}") + for dp_num, data_product_reference in enumerate(data_products): + data_product = ray.get(data_product_reference) + logger.debug(f"{dp_num} data product is: {type(data_product)}" ) + if dp_num == 1: + # Power-vs-Time results, a tuple of arrays + logger.debug("splitting tuple") + data, summaries = data_product # Split the tuple + logger.debug(f"data is {type(data)}: {data}") + logger.debug(f"summaries is {type(summaries)}: {summaries}") + summaries = ray.get(summaries) + max_max_ch_pwrs.append(DATA_TYPE(summaries[0])) + med_mean_ch_pwrs.append(DATA_TYPE(summaries[1])) + mean_ch_pwrs.append(DATA_TYPE(summaries[2])) + median_ch_pwrs.append(DATA_TYPE(summaries[3])) + del summaries + if dp_num == 3: # Separate condition is intentional + # APD result: append instead of extend, + # since the result is a single 1D array + logger.debug("appending data product") + channel_data.append(data_product) + else: + # For 2D arrays (PSD, PVT, PFP) + logger.debug(f"dp {dp_num}exending channel data") + channel_data.extend(data_product) toc = perf_counter() logger.debug(f"Waited {toc-tic} s for channel data") From df1a6d14b32443fa1c9d24d3fec6661e9c3611f0 Mon Sep 17 00:00:00 2001 From: Doug Boulware Date: Thu, 11 Apr 2024 14:36:07 -0600 Subject: [PATCH 10/26] fix summaries. --- scos_actions/actions/acquire_sea_data_product.py | 1 - 1 file changed, 1 deletion(-) diff --git a/scos_actions/actions/acquire_sea_data_product.py b/scos_actions/actions/acquire_sea_data_product.py index 0dac0909..79cae4df 100644 --- a/scos_actions/actions/acquire_sea_data_product.py +++ b/scos_actions/actions/acquire_sea_data_product.py @@ -605,7 +605,6 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): data, summaries = data_product # Split the tuple logger.debug(f"data is {type(data)}: {data}") logger.debug(f"summaries is {type(summaries)}: {summaries}") - summaries = ray.get(summaries) max_max_ch_pwrs.append(DATA_TYPE(summaries[0])) med_mean_ch_pwrs.append(DATA_TYPE(summaries[1])) mean_ch_pwrs.append(DATA_TYPE(summaries[2])) From 81e7d8a980f3156614d30b6f8c3f2042a5cc57ac Mon Sep 17 00:00:00 2001 From: Doug Boulware Date: Thu, 11 Apr 2024 14:46:37 -0600 Subject: [PATCH 11/26] fix if else --- scos_actions/actions/acquire_sea_data_product.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/scos_actions/actions/acquire_sea_data_product.py b/scos_actions/actions/acquire_sea_data_product.py index 79cae4df..4bc76337 100644 --- a/scos_actions/actions/acquire_sea_data_product.py +++ b/scos_actions/actions/acquire_sea_data_product.py @@ -597,6 +597,7 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): data_products = ray.get(data_ref) logger.debug(f"data products is {type(data_products)}") for dp_num, data_product_reference in enumerate(data_products): + logger.debug(f"Getting dp {dp_num}") data_product = ray.get(data_product_reference) logger.debug(f"{dp_num} data product is: {type(data_product)}" ) if dp_num == 1: @@ -610,14 +611,14 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): mean_ch_pwrs.append(DATA_TYPE(summaries[2])) median_ch_pwrs.append(DATA_TYPE(summaries[3])) del summaries - if dp_num == 3: # Separate condition is intentional + elif dp_num == 3: # Separate condition is intentional # APD result: append instead of extend, # since the result is a single 1D array logger.debug("appending data product") channel_data.append(data_product) else: # For 2D arrays (PSD, PVT, PFP) - logger.debug(f"dp {dp_num}exending channel data") + logger.debug(f"dp {dp_num} extending channel data") channel_data.extend(data_product) toc = perf_counter() From a1e95ed36e647c6b8a2eb1a3c596d5874261ace2 Mon Sep 17 00:00:00 2001 From: Doug Boulware Date: Thu, 11 Apr 2024 15:12:34 -0600 Subject: [PATCH 12/26] debugging --- scos_actions/actions/acquire_sea_data_product.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scos_actions/actions/acquire_sea_data_product.py b/scos_actions/actions/acquire_sea_data_product.py index 4bc76337..f85962a9 100644 --- a/scos_actions/actions/acquire_sea_data_product.py +++ b/scos_actions/actions/acquire_sea_data_product.py @@ -597,7 +597,7 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): data_products = ray.get(data_ref) logger.debug(f"data products is {type(data_products)}") for dp_num, data_product_reference in enumerate(data_products): - logger.debug(f"Getting dp {dp_num}") + logger.debug(f"Getting dp {dp_num}, {data_product_reference}") data_product = ray.get(data_product_reference) logger.debug(f"{dp_num} data product is: {type(data_product)}" ) if dp_num == 1: From 0332a706bb836fba60f0e668de53ea655d2764bb Mon Sep 17 00:00:00 2001 From: Doug Boulware Date: Fri, 12 Apr 2024 07:36:09 -0600 Subject: [PATCH 13/26] return file references from IQ processor. --- .../actions/acquire_sea_data_product.py | 94 +++++++++---------- 1 file changed, 46 insertions(+), 48 deletions(-) diff --git a/scos_actions/actions/acquire_sea_data_product.py b/scos_actions/actions/acquire_sea_data_product.py index f85962a9..5addda5e 100644 --- a/scos_actions/actions/acquire_sea_data_product.py +++ b/scos_actions/actions/acquire_sea_data_product.py @@ -417,12 +417,7 @@ def __init__(self, params: dict, iir_sos: np.ndarray): self.apd_worker = AmplitudeProbabilityDistribution.remote( params[APD_BIN_SIZE_DB], params[APD_MIN_BIN_DBM], params[APD_MAX_BIN_DBM] ) - self.workers = [ - self.fft_worker, - self.pvt_worker, - self.pfp_worker, - self.apd_worker, - ] + del params def run(self, iqdata: np.ndarray) -> list: @@ -436,9 +431,11 @@ def run(self, iqdata: np.ndarray) -> list: # Filter IQ and place it in the object store iqdata = ray.put(sosfilt(self.iir_sos, iqdata)) # Compute PSD, PVT, PFP, and APD concurrently. - # Do not wait until they finish. Yield references to their results. - yield [worker.run.remote(iqdata) for worker in self.workers] - del iqdata + fft_reference = self.fft_worker.run.remote(iqdata) + pvt_reference = self.pvt_worker.run.remote(iqdata) + pfp_reference = self.pfp_worker.run.remote(iqdata) + apd_reference = self.apd_worker.run.remote(iqdata) + return fft_reference, pvt_reference, pfp_reference, apd_reference class NasctnSeaDataProduct(Action): @@ -541,7 +538,7 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): logger.debug(f"Spawned {NUM_ACTORS} supervisor actors in {toc-tic:.2f} s") # Collect all IQ data and spawn data product computation processes - dp_procs, cpu_speed, reference_points = [], [], [] + psd_refs, pvt_refs, pfp_refs, apd_refs, cpu_speed, reference_points = [], [], [], [], [], [] capture_tic = perf_counter() for i, parameters in enumerate(self.iteration_params): @@ -552,10 +549,12 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): ) # Start data product processing but do not block next IQ capture tic = perf_counter() + fft_reference, pvt_reference, pfp_reference, apd_reference = iq_processors[i % NUM_ACTORS].run.remote(measurement_result["data"]) + psd_refs.append(fft_reference) + pvt_refs.append(pvt_reference) + pfp_refs.append(pfp_reference) + apd_refs.append(apd_reference) - dp_procs.append( - iq_processors[i % NUM_ACTORS].run.remote(measurement_result["data"]) - ) del measurement_result["data"] toc = perf_counter() logger.debug(f"IQ data delivered for processing in {toc-tic:.2f} s") @@ -585,49 +584,48 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): [], ) result_tic = perf_counter() - logger.debug(f"Have {len(dp_procs)} results") - for channel_data_process in dp_procs: - # Retrieve object references for channel data - channel_data_refs = ray.get(channel_data_process) - logger.debug(f"channel_data_refs is {type(channel_data_refs)}: {channel_data_refs}") + channel_count = len(psd_refs) + logger.debug(f"Have {channel_count} channel results") + for index in range(len(psd_refs)): + logger.debug(f"Working on channel {index}") channel_data = [] - for i, data_ref in enumerate(channel_data_refs): - # Now block until the data is ready - logger.debug(f"{i} Requesting object {data_ref}") - data_products = ray.get(data_ref) - logger.debug(f"data products is {type(data_products)}") - for dp_num, data_product_reference in enumerate(data_products): - logger.debug(f"Getting dp {dp_num}, {data_product_reference}") - data_product = ray.get(data_product_reference) - logger.debug(f"{dp_num} data product is: {type(data_product)}" ) - if dp_num == 1: - # Power-vs-Time results, a tuple of arrays - logger.debug("splitting tuple") - data, summaries = data_product # Split the tuple - logger.debug(f"data is {type(data)}: {data}") - logger.debug(f"summaries is {type(summaries)}: {summaries}") - max_max_ch_pwrs.append(DATA_TYPE(summaries[0])) - med_mean_ch_pwrs.append(DATA_TYPE(summaries[1])) - mean_ch_pwrs.append(DATA_TYPE(summaries[2])) - median_ch_pwrs.append(DATA_TYPE(summaries[3])) - del summaries - elif dp_num == 3: # Separate condition is intentional - # APD result: append instead of extend, - # since the result is a single 1D array - logger.debug("appending data product") - channel_data.append(data_product) - else: - # For 2D arrays (PSD, PVT, PFP) - logger.debug(f"dp {dp_num} extending channel data") - channel_data.extend(data_product) + # Now block until the data is ready + apd_refs.append(apd_reference) + + psd_data = ray.get(psd_refs[index]) + logger.debug(f"PSD: {psd_data}") + channel_data.extend(psd_data) + pvt_data = ray.get(pvt_refs[index]) + logger.debug(f"PVT DATA: {pvt_data}") + # Power-vs-Time results, a tuple of arrays + logger.debug("splitting tuple") + data, summaries = pvt_data # Split the tuple + logger.debug(f"data is {type(data)}: {data}") + logger.debug(f"summaries is {type(summaries)}: {summaries}") + max_max_ch_pwrs.append(DATA_TYPE(summaries[0])) + med_mean_ch_pwrs.append(DATA_TYPE(summaries[1])) + mean_ch_pwrs.append(DATA_TYPE(summaries[2])) + median_ch_pwrs.append(DATA_TYPE(summaries[3])) + del summaries + + pfp_data = ray.get(pfp_refs[index]) + logger.debug(f"PFP: {pfp_data}") + channel_data.extend(pfp_data) + + # APD result: append instead of extend, + # since the result is a single 1D array + apd_data = ray.get(apd_refs[index]) + logger.debug(f"APD: {apd_data}") + channel_data.append(apd_data) toc = perf_counter() logger.debug(f"Waited {toc-tic} s for channel data") all_data.extend(NasctnSeaDataProduct.transform_data(channel_data)) + for ray_actor in iq_processors: ray.kill(ray_actor) result_toc = perf_counter() - del dp_procs, iq_processors, channel_data, channel_data_refs + del iq_processors, channel_data logger.debug(f"Got all processed data in {result_toc-result_tic:.2f} s") # Build metadata and convert data to compressed bytes From c0223d0f88951534252fe986d33772914527e985 Mon Sep 17 00:00:00 2001 From: Doug Boulware Date: Fri, 12 Apr 2024 07:48:02 -0600 Subject: [PATCH 14/26] handle tuple returned from IQ processor. --- .../actions/acquire_sea_data_product.py | 24 ++++++++----------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/scos_actions/actions/acquire_sea_data_product.py b/scos_actions/actions/acquire_sea_data_product.py index 5addda5e..8d1391ee 100644 --- a/scos_actions/actions/acquire_sea_data_product.py +++ b/scos_actions/actions/acquire_sea_data_product.py @@ -538,7 +538,7 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): logger.debug(f"Spawned {NUM_ACTORS} supervisor actors in {toc-tic:.2f} s") # Collect all IQ data and spawn data product computation processes - psd_refs, pvt_refs, pfp_refs, apd_refs, cpu_speed, reference_points = [], [], [], [], [], [] + data_products_refs, cpu_speed, reference_points = [], [], [] capture_tic = perf_counter() for i, parameters in enumerate(self.iteration_params): @@ -549,11 +549,7 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): ) # Start data product processing but do not block next IQ capture tic = perf_counter() - fft_reference, pvt_reference, pfp_reference, apd_reference = iq_processors[i % NUM_ACTORS].run.remote(measurement_result["data"]) - psd_refs.append(fft_reference) - pvt_refs.append(pvt_reference) - pfp_refs.append(pfp_reference) - apd_refs.append(apd_reference) + data_products_refs.append(iq_processors[i % NUM_ACTORS].run.remote(measurement_result["data"])) del measurement_result["data"] toc = perf_counter() @@ -584,18 +580,18 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): [], ) result_tic = perf_counter() - channel_count = len(psd_refs) + channel_count = len(data_products_refs) logger.debug(f"Have {channel_count} channel results") - for index in range(len(psd_refs)): + for index in range(len(data_products_refs)): logger.debug(f"Working on channel {index}") channel_data = [] # Now block until the data is ready - apd_refs.append(apd_reference) - - psd_data = ray.get(psd_refs[index]) + dp_refs_tuple = ray.get(data_products_refs[i]) + psd_ref, pvt_ref, pfp_ref, apd_ref = dp_refs_tuple + psd_data = ray.get(psd_ref) logger.debug(f"PSD: {psd_data}") channel_data.extend(psd_data) - pvt_data = ray.get(pvt_refs[index]) + pvt_data = ray.get(pvt_ref) logger.debug(f"PVT DATA: {pvt_data}") # Power-vs-Time results, a tuple of arrays logger.debug("splitting tuple") @@ -608,13 +604,13 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): median_ch_pwrs.append(DATA_TYPE(summaries[3])) del summaries - pfp_data = ray.get(pfp_refs[index]) + pfp_data = ray.get(pfp_ref) logger.debug(f"PFP: {pfp_data}") channel_data.extend(pfp_data) # APD result: append instead of extend, # since the result is a single 1D array - apd_data = ray.get(apd_refs[index]) + apd_data = ray.get(apd_ref) logger.debug(f"APD: {apd_data}") channel_data.append(apd_data) From e702bbd633d8e291dee7468a6b8e13961b64e02d Mon Sep 17 00:00:00 2001 From: Doug Boulware Date: Fri, 12 Apr 2024 08:05:35 -0600 Subject: [PATCH 15/26] don't log data product results. --- scos_actions/actions/acquire_sea_data_product.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/scos_actions/actions/acquire_sea_data_product.py b/scos_actions/actions/acquire_sea_data_product.py index 8d1391ee..013842f5 100644 --- a/scos_actions/actions/acquire_sea_data_product.py +++ b/scos_actions/actions/acquire_sea_data_product.py @@ -588,30 +588,29 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): # Now block until the data is ready dp_refs_tuple = ray.get(data_products_refs[i]) psd_ref, pvt_ref, pfp_ref, apd_ref = dp_refs_tuple + logger.debug("Getting PSD data.") psd_data = ray.get(psd_ref) - logger.debug(f"PSD: {psd_data}") channel_data.extend(psd_data) + + logger.debug("Getting PVT data.") pvt_data = ray.get(pvt_ref) - logger.debug(f"PVT DATA: {pvt_data}") # Power-vs-Time results, a tuple of arrays - logger.debug("splitting tuple") + logger.debug("Splitting PVT tuple") data, summaries = pvt_data # Split the tuple - logger.debug(f"data is {type(data)}: {data}") - logger.debug(f"summaries is {type(summaries)}: {summaries}") max_max_ch_pwrs.append(DATA_TYPE(summaries[0])) med_mean_ch_pwrs.append(DATA_TYPE(summaries[1])) mean_ch_pwrs.append(DATA_TYPE(summaries[2])) median_ch_pwrs.append(DATA_TYPE(summaries[3])) del summaries + logger.debug("Getting PFP data.") pfp_data = ray.get(pfp_ref) - logger.debug(f"PFP: {pfp_data}") channel_data.extend(pfp_data) # APD result: append instead of extend, # since the result is a single 1D array + logger.debug("Getting APD data.") apd_data = ray.get(apd_ref) - logger.debug(f"APD: {apd_data}") channel_data.append(apd_data) toc = perf_counter() From 6a6ab716e3e0510c42441b07655a94cfc8ca70da Mon Sep 17 00:00:00 2001 From: Doug Boulware Date: Fri, 12 Apr 2024 11:27:09 -0600 Subject: [PATCH 16/26] correct index in accessing data products. --- scos_actions/actions/acquire_sea_data_product.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scos_actions/actions/acquire_sea_data_product.py b/scos_actions/actions/acquire_sea_data_product.py index 013842f5..abcabe97 100644 --- a/scos_actions/actions/acquire_sea_data_product.py +++ b/scos_actions/actions/acquire_sea_data_product.py @@ -586,7 +586,7 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): logger.debug(f"Working on channel {index}") channel_data = [] # Now block until the data is ready - dp_refs_tuple = ray.get(data_products_refs[i]) + dp_refs_tuple = ray.get(data_products_refs[index]) psd_ref, pvt_ref, pfp_ref, apd_ref = dp_refs_tuple logger.debug("Getting PSD data.") psd_data = ray.get(psd_ref) From 409ec605bab83f971964d9859fdd41ad74868a11 Mon Sep 17 00:00:00 2001 From: Doug Boulware Date: Fri, 12 Apr 2024 12:04:46 -0600 Subject: [PATCH 17/26] Optionally use env var for number of ray workers. Reduce debug logging. --- scos_actions/actions/acquire_sea_data_product.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/scos_actions/actions/acquire_sea_data_product.py b/scos_actions/actions/acquire_sea_data_product.py index abcabe97..2c196ab2 100644 --- a/scos_actions/actions/acquire_sea_data_product.py +++ b/scos_actions/actions/acquire_sea_data_product.py @@ -113,7 +113,7 @@ FFT_WINDOW = get_fft_window(FFT_WINDOW_TYPE, FFT_SIZE) FFT_WINDOW_ECF = get_fft_window_correction(FFT_WINDOW, "energy") IMPEDANCE_OHMS = 50.0 -NUM_ACTORS = 3 # Number of ray actors to initialize +NUM_ACTORS = env.int("RAY_WORKERS", default=3) # Number of ray actors to initialize # Create power detectors TD_DETECTOR = create_statistical_detector("TdMeanMaxDetector", ["max", "mean"]) @@ -588,14 +588,11 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): # Now block until the data is ready dp_refs_tuple = ray.get(data_products_refs[index]) psd_ref, pvt_ref, pfp_ref, apd_ref = dp_refs_tuple - logger.debug("Getting PSD data.") psd_data = ray.get(psd_ref) channel_data.extend(psd_data) - logger.debug("Getting PVT data.") pvt_data = ray.get(pvt_ref) # Power-vs-Time results, a tuple of arrays - logger.debug("Splitting PVT tuple") data, summaries = pvt_data # Split the tuple max_max_ch_pwrs.append(DATA_TYPE(summaries[0])) med_mean_ch_pwrs.append(DATA_TYPE(summaries[1])) @@ -603,13 +600,11 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): median_ch_pwrs.append(DATA_TYPE(summaries[3])) del summaries - logger.debug("Getting PFP data.") pfp_data = ray.get(pfp_ref) channel_data.extend(pfp_data) # APD result: append instead of extend, # since the result is a single 1D array - logger.debug("Getting APD data.") apd_data = ray.get(apd_ref) channel_data.append(apd_data) From 057f6ab81ea3ac4f1cb074a38110fb9c12b7d8ab Mon Sep 17 00:00:00 2001 From: Doug Boulware Date: Mon, 15 Apr 2024 14:23:39 -0600 Subject: [PATCH 18/26] add num_returns to IQProcessor. --- scos_actions/actions/acquire_sea_data_product.py | 1 + 1 file changed, 1 insertion(+) diff --git a/scos_actions/actions/acquire_sea_data_product.py b/scos_actions/actions/acquire_sea_data_product.py index 2c196ab2..e4f90d1b 100644 --- a/scos_actions/actions/acquire_sea_data_product.py +++ b/scos_actions/actions/acquire_sea_data_product.py @@ -420,6 +420,7 @@ def __init__(self, params: dict, iir_sos: np.ndarray): del params + @ray.method(num_returns=4) def run(self, iqdata: np.ndarray) -> list: """ Filter the input IQ data and concurrently compute FFT, PVT, PFP, and APD results. From f6d4ef1550090a4342760669a79e2e4a31f2000e Mon Sep 17 00:00:00 2001 From: Justin Haze Date: Fri, 16 Aug 2024 15:19:26 -0600 Subject: [PATCH 19/26] update ray version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index a34afe30..bbb65179 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,7 +50,7 @@ dependencies = [ "numpy>=1.22.0", "psutil>=5.9.4", "python-dateutil>=2.0", - "ray>=2.6.3,<2.8.0", + "ray>=2.8.1,<2.10.0", "ruamel.yaml>=0.15", "scipy>=1.8.0", "sigmf @ git+https://github.com/NTIA/SigMF@multi-recording-archive", From be748893c27de09e75ab24bfabfeeed5ed2954a0 Mon Sep 17 00:00:00 2001 From: Justin Haze Date: Fri, 16 Aug 2024 15:35:26 -0600 Subject: [PATCH 20/26] update django version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index bbb65179..e6ea6760 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,7 +43,7 @@ classifiers = [ dependencies = [ "environs>=9.5.0", - "django>=3.2.18,<4.0", + "django>=4.2,<5.0", "its_preselector @ git+https://github.com/NTIA/Preselector@3.1.0", "msgspec>=0.16.0,<1.0.0", "numexpr>=2.8.3", From 5a01af56481739380b2d5bf0fd9827881fec7311 Mon Sep 17 00:00:00 2001 From: Justin Haze Date: Fri, 27 Sep 2024 12:42:45 -0600 Subject: [PATCH 21/26] lscpu different in ubuntu 22.04, replace with psutil.cpu_freq --- scos_actions/hardware/utils.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/scos_actions/hardware/utils.py b/scos_actions/hardware/utils.py index 8a286e6d..f01250f8 100644 --- a/scos_actions/hardware/utils.py +++ b/scos_actions/hardware/utils.py @@ -40,9 +40,8 @@ def get_current_cpu_clock_speed() -> float: :return: """ try: - out = subprocess.run("lscpu | grep 'MHz'", shell=True, capture_output=True) - spd = str(out.stdout).split("\\n")[0].split()[2] - return float(spd) + cpu_freq = psutil.cpu_freq() + return cpu_freq.current except Exception as e: logger.error("Unable to retrieve current CPU speed") raise e From 921dcdfc5056c2b293986be1a3de8914d25b8054 Mon Sep 17 00:00:00 2001 From: Justin Haze Date: Mon, 30 Sep 2024 08:12:31 -0600 Subject: [PATCH 22/26] debugging data product --- .../actions/acquire_sea_data_product.py | 54 ++++++++++--------- 1 file changed, 28 insertions(+), 26 deletions(-) diff --git a/scos_actions/actions/acquire_sea_data_product.py b/scos_actions/actions/acquire_sea_data_product.py index 2b8e0af9..b2116151 100644 --- a/scos_actions/actions/acquire_sea_data_product.py +++ b/scos_actions/actions/acquire_sea_data_product.py @@ -362,7 +362,7 @@ def run(self, iq: ray.ObjectRef) -> np.ndarray: # compute statistics first by cycle mean_power = power_bins.mean(axis=0) max_power = power_bins.max(axis=0) - del power_bins + #del power_bins # then do the detector pfp = np.array( @@ -438,7 +438,7 @@ def run(self, iqdata: np.ndarray) -> list: # Compute PSD, PVT, PFP, and APD concurrently. # Do not wait until they finish. Yield references to their results. yield [worker.run.remote(iqdata) for worker in self.workers] - del iqdata + #del iqdata class NasctnSeaDataProduct(Action): @@ -556,7 +556,7 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): dp_procs.append( iq_processors[i % NUM_ACTORS].run.remote(measurement_result["data"]) ) - del measurement_result["data"] + #del measurement_result["data"] toc = perf_counter() logger.debug(f"IQ data delivered for processing in {toc-tic:.2f} s") # Create capture segment with channel-specific metadata before sigan is reconfigured @@ -589,31 +589,33 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): # Retrieve object references for channel data channel_data_refs = ray.get(channel_data_process) channel_data = [] - for i, data_ref in enumerate(channel_data_refs): - # Now block until the data is ready - data = ray.get(data_ref) - if i == 1: - # Power-vs-Time results, a tuple of arrays - data, summaries = data # Split the tuple - max_max_ch_pwrs.append(DATA_TYPE(summaries[0])) - med_mean_ch_pwrs.append(DATA_TYPE(summaries[1])) - mean_ch_pwrs.append(DATA_TYPE(summaries[2])) - median_ch_pwrs.append(DATA_TYPE(summaries[3])) - del summaries - if i == 3: # Separate condition is intentional - # APD result: append instead of extend, - # since the result is a single 1D array - channel_data.append(data) - else: - # For 2D arrays (PSD, PVT, PFP) - channel_data.extend(data) + for object_ref in channel_data_refs: # only one + data_arr = ray.get(object_ref) + for i, data_ref in enumerate(data_arr): + # Now block until the data is ready + data = ray.get(data_ref) + if i == 1: + # Power-vs-Time results, a tuple of arrays + data, summaries = data # Split the tuple + max_max_ch_pwrs.append(DATA_TYPE(summaries[0])) + med_mean_ch_pwrs.append(DATA_TYPE(summaries[1])) + mean_ch_pwrs.append(DATA_TYPE(summaries[2])) + median_ch_pwrs.append(DATA_TYPE(summaries[3])) + #del summaries + if i == 3: # Separate condition is intentional + # APD result: append instead of extend, + # since the result is a single 1D array + channel_data.append(data) + else: + # For 2D arrays (PSD, PVT, PFP) + channel_data.extend(data) toc = perf_counter() logger.debug(f"Waited {toc-tic} s for channel data") all_data.extend(NasctnSeaDataProduct.transform_data(channel_data)) for ray_actor in iq_processors: ray.kill(ray_actor) result_toc = perf_counter() - del dp_procs, iq_processors, channel_data, channel_data_refs + #del dp_procs, iq_processors, channel_data, channel_data_refs logger.debug(f"Got all processed data in {result_toc-result_tic:.2f} s") # Build metadata and convert data to compressed bytes @@ -735,9 +737,9 @@ def capture_diagnostics( switch_diag["door_closed"] = not bool(all_switch_status["door_state"]) # Read preselector sensors - ps_diag = sensor.preselector.get_status() - del ps_diag["name"] - del ps_diag["healthy"] + #ps_diag = sensor.preselector.get_status() + #del ps_diag["name"] + #del ps_diag["healthy"] # Read computer performance metrics cpu_diag = { # Start with CPU min/max/mean speeds @@ -813,7 +815,7 @@ def capture_diagnostics( logger.debug(f"Got all diagnostics in {toc-tic} s") diagnostics = { "datetime": utils.get_datetime_str_now(), - "preselector": ntia_diagnostics.Preselector(**ps_diag), + #"preselector": ntia_diagnostics.Preselector(**ps_diag), "spu": ntia_diagnostics.SPU(**switch_diag), "computer": ntia_diagnostics.Computer(**cpu_diag), "software": ntia_diagnostics.Software(**software_diag), From db30f0438ca1c10c6d2e01ce483bc7f1591f674b Mon Sep 17 00:00:00 2001 From: Justin Haze Date: Mon, 30 Sep 2024 09:54:03 -0600 Subject: [PATCH 23/26] update contact info --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index c468763a..37b521c1 100644 --- a/README.md +++ b/README.md @@ -414,4 +414,5 @@ See [LICENSE](LICENSE.md). ## Contact -For technical questions about SCOS Actions, contact Justin Haze, [jhaze@ntia.gov](mailto:jhaze@ntia.gov) +For technical questions about SCOS Actions, contact the +[ITS Spectrum Monitoring Team](mailto:spectrummonitoring@ntia.gov). From 872a01f021226bf038bcbbdb6b4472b86f0159ff Mon Sep 17 00:00:00 2001 From: Justin Haze Date: Mon, 30 Sep 2024 14:43:00 -0600 Subject: [PATCH 24/26] undo unneeded changes --- scos_actions/actions/acquire_sea_data_product.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/scos_actions/actions/acquire_sea_data_product.py b/scos_actions/actions/acquire_sea_data_product.py index f8670d69..ecb25ce3 100644 --- a/scos_actions/actions/acquire_sea_data_product.py +++ b/scos_actions/actions/acquire_sea_data_product.py @@ -362,7 +362,7 @@ def run(self, iq: ray.ObjectRef) -> np.ndarray: # compute statistics first by cycle mean_power = power_bins.mean(axis=0) max_power = power_bins.max(axis=0) - #del power_bins + del power_bins # then do the detector pfp = np.array( @@ -738,9 +738,9 @@ def capture_diagnostics( switch_diag["door_closed"] = not bool(all_switch_status["door_state"]) # Read preselector sensors - #ps_diag = sensor.preselector.get_status() - #del ps_diag["name"] - #del ps_diag["healthy"] + ps_diag = sensor.preselector.get_status() + del ps_diag["name"] + del ps_diag["healthy"] # Read computer performance metrics cpu_diag = { # Start with CPU min/max/mean speeds @@ -816,7 +816,7 @@ def capture_diagnostics( logger.debug(f"Got all diagnostics in {toc-tic} s") diagnostics = { "datetime": utils.get_datetime_str_now(), - #"preselector": ntia_diagnostics.Preselector(**ps_diag), + "preselector": ntia_diagnostics.Preselector(**ps_diag), "spu": ntia_diagnostics.SPU(**switch_diag), "computer": ntia_diagnostics.Computer(**cpu_diag), "software": ntia_diagnostics.Software(**software_diag), From ab723701f629ca593087247ff9d2b9b76ba65173 Mon Sep 17 00:00:00 2001 From: Justin Haze Date: Mon, 7 Oct 2024 09:00:12 -0600 Subject: [PATCH 25/26] update version number --- scos_actions/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scos_actions/__init__.py b/scos_actions/__init__.py index b7a5e079..5b461163 100644 --- a/scos_actions/__init__.py +++ b/scos_actions/__init__.py @@ -1 +1 @@ -__version__ = "10.0.2" +__version__ = "11.0.0" From 617d04fa991c7256fec784a7a9f9b44d4c5946f9 Mon Sep 17 00:00:00 2001 From: Justin Haze Date: Thu, 31 Oct 2024 14:44:32 -0600 Subject: [PATCH 26/26] update pre-commit python, autoformatting, fix missing pvt data --- .pre-commit-config.yaml | 2 +- scos_actions/actions/acquire_sea_data_product.py | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 90d2f9e4..0b4ae5b3 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,5 +1,5 @@ default_language_version: - python: python3.8 + python: python3.10 repos: - repo: https://github.com/pre-commit/pre-commit-hooks rev: v4.6.0 diff --git a/scos_actions/actions/acquire_sea_data_product.py b/scos_actions/actions/acquire_sea_data_product.py index ecb25ce3..15fb5715 100644 --- a/scos_actions/actions/acquire_sea_data_product.py +++ b/scos_actions/actions/acquire_sea_data_product.py @@ -550,7 +550,9 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): ) # Start data product processing but do not block next IQ capture tic = perf_counter() - data_products_refs.append(iq_processors[i % NUM_ACTORS].run.remote(measurement_result["data"])) + data_products_refs.append( + iq_processors[i % NUM_ACTORS].run.remote(measurement_result["data"]) + ) del measurement_result["data"] toc = perf_counter() @@ -599,6 +601,7 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): med_mean_ch_pwrs.append(DATA_TYPE(summaries[1])) mean_ch_pwrs.append(DATA_TYPE(summaries[2])) median_ch_pwrs.append(DATA_TYPE(summaries[3])) + channel_data.extend(data) del summaries pfp_data = ray.get(pfp_ref) @@ -616,7 +619,7 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): for ray_actor in iq_processors: ray.kill(ray_actor) result_toc = perf_counter() - del iq_processors, channel_data + del iq_processors, channel_data logger.debug(f"Got all processed data in {result_toc-result_tic:.2f} s") # Build metadata and convert data to compressed bytes