diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 90d2f9e4..0b4ae5b3 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,5 +1,5 @@ default_language_version: - python: python3.8 + python: python3.10 repos: - repo: https://github.com/pre-commit/pre-commit-hooks rev: v4.6.0 diff --git a/README.md b/README.md index c468763a..37b521c1 100644 --- a/README.md +++ b/README.md @@ -414,4 +414,5 @@ See [LICENSE](LICENSE.md). ## Contact -For technical questions about SCOS Actions, contact Justin Haze, [jhaze@ntia.gov](mailto:jhaze@ntia.gov) +For technical questions about SCOS Actions, contact the +[ITS Spectrum Monitoring Team](mailto:spectrummonitoring@ntia.gov). diff --git a/pyproject.toml b/pyproject.toml index a34afe30..61ee7e34 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,14 +43,14 @@ classifiers = [ dependencies = [ "environs>=9.5.0", - "django>=3.2.18,<4.0", + "django>=4.2,<5.0", "its_preselector @ git+https://github.com/NTIA/Preselector@3.1.0", "msgspec>=0.16.0,<1.0.0", "numexpr>=2.8.3", "numpy>=1.22.0", "psutil>=5.9.4", "python-dateutil>=2.0", - "ray>=2.6.3,<2.8.0", + "ray>=2.10.0", "ruamel.yaml>=0.15", "scipy>=1.8.0", "sigmf @ git+https://github.com/NTIA/SigMF@multi-recording-archive", @@ -65,7 +65,7 @@ test = [ dev = [ "hatchling>=1.14.1,<2.0", "pre-commit>=3.3.1,<4.0", - "ray[default]>=2.4.0", + "ray[default]>=2.10.0", "scos-actions[test]", ] diff --git a/scos_actions/__init__.py b/scos_actions/__init__.py index b7a5e079..5b461163 100644 --- a/scos_actions/__init__.py +++ b/scos_actions/__init__.py @@ -1 +1 @@ -__version__ = "10.0.2" +__version__ = "11.0.0" diff --git a/scos_actions/actions/acquire_sea_data_product.py b/scos_actions/actions/acquire_sea_data_product.py index 2b8e0af9..15fb5715 100644 --- a/scos_actions/actions/acquire_sea_data_product.py +++ b/scos_actions/actions/acquire_sea_data_product.py @@ -113,7 +113,7 @@ FFT_WINDOW = get_fft_window(FFT_WINDOW_TYPE, FFT_SIZE) FFT_WINDOW_ECF = get_fft_window_correction(FFT_WINDOW, "energy") IMPEDANCE_OHMS = 50.0 -NUM_ACTORS = 3 # Number of ray actors to initialize +NUM_ACTORS = env.int("RAY_WORKERS", default=3) # Number of ray actors to initialize # Create power detectors TD_DETECTOR = create_statistical_detector("TdMeanMaxDetector", ["max", "mean"]) @@ -417,14 +417,10 @@ def __init__(self, params: dict, iir_sos: np.ndarray): self.apd_worker = AmplitudeProbabilityDistribution.remote( params[APD_BIN_SIZE_DB], params[APD_MIN_BIN_DBM], params[APD_MAX_BIN_DBM] ) - self.workers = [ - self.fft_worker, - self.pvt_worker, - self.pfp_worker, - self.apd_worker, - ] + del params + @ray.method(num_returns=4) def run(self, iqdata: np.ndarray) -> list: """ Filter the input IQ data and concurrently compute FFT, PVT, PFP, and APD results. @@ -436,9 +432,11 @@ def run(self, iqdata: np.ndarray) -> list: # Filter IQ and place it in the object store iqdata = ray.put(sosfilt(sos=self.iir_sos, x=iqdata)) # Compute PSD, PVT, PFP, and APD concurrently. - # Do not wait until they finish. Yield references to their results. - yield [worker.run.remote(iqdata) for worker in self.workers] - del iqdata + fft_reference = self.fft_worker.run.remote(iqdata) + pvt_reference = self.pvt_worker.run.remote(iqdata) + pfp_reference = self.pfp_worker.run.remote(iqdata) + apd_reference = self.apd_worker.run.remote(iqdata) + return fft_reference, pvt_reference, pfp_reference, apd_reference class NasctnSeaDataProduct(Action): @@ -541,7 +539,7 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): logger.debug(f"Spawned {NUM_ACTORS} supervisor actors in {toc-tic:.2f} s") # Collect all IQ data and spawn data product computation processes - dp_procs, cpu_speed, reference_points = [], [], [] + data_products_refs, cpu_speed, reference_points = [], [], [] capture_tic = perf_counter() for i, parameters in enumerate(self.iteration_params): @@ -552,10 +550,10 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): ) # Start data product processing but do not block next IQ capture tic = perf_counter() - - dp_procs.append( + data_products_refs.append( iq_processors[i % NUM_ACTORS].run.remote(measurement_result["data"]) ) + del measurement_result["data"] toc = perf_counter() logger.debug(f"IQ data delivered for processing in {toc-tic:.2f} s") @@ -585,35 +583,43 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int): [], ) result_tic = perf_counter() - for channel_data_process in dp_procs: - # Retrieve object references for channel data - channel_data_refs = ray.get(channel_data_process) + channel_count = len(data_products_refs) + logger.debug(f"Have {channel_count} channel results") + for index in range(len(data_products_refs)): + logger.debug(f"Working on channel {index}") channel_data = [] - for i, data_ref in enumerate(channel_data_refs): - # Now block until the data is ready - data = ray.get(data_ref) - if i == 1: - # Power-vs-Time results, a tuple of arrays - data, summaries = data # Split the tuple - max_max_ch_pwrs.append(DATA_TYPE(summaries[0])) - med_mean_ch_pwrs.append(DATA_TYPE(summaries[1])) - mean_ch_pwrs.append(DATA_TYPE(summaries[2])) - median_ch_pwrs.append(DATA_TYPE(summaries[3])) - del summaries - if i == 3: # Separate condition is intentional - # APD result: append instead of extend, - # since the result is a single 1D array - channel_data.append(data) - else: - # For 2D arrays (PSD, PVT, PFP) - channel_data.extend(data) + # Now block until the data is ready + dp_refs_tuple = ray.get(data_products_refs[index]) + psd_ref, pvt_ref, pfp_ref, apd_ref = dp_refs_tuple + psd_data = ray.get(psd_ref) + channel_data.extend(psd_data) + + pvt_data = ray.get(pvt_ref) + # Power-vs-Time results, a tuple of arrays + data, summaries = pvt_data # Split the tuple + max_max_ch_pwrs.append(DATA_TYPE(summaries[0])) + med_mean_ch_pwrs.append(DATA_TYPE(summaries[1])) + mean_ch_pwrs.append(DATA_TYPE(summaries[2])) + median_ch_pwrs.append(DATA_TYPE(summaries[3])) + channel_data.extend(data) + del summaries + + pfp_data = ray.get(pfp_ref) + channel_data.extend(pfp_data) + + # APD result: append instead of extend, + # since the result is a single 1D array + apd_data = ray.get(apd_ref) + channel_data.append(apd_data) + toc = perf_counter() logger.debug(f"Waited {toc-tic} s for channel data") all_data.extend(NasctnSeaDataProduct.transform_data(channel_data)) + for ray_actor in iq_processors: ray.kill(ray_actor) result_toc = perf_counter() - del dp_procs, iq_processors, channel_data, channel_data_refs + del iq_processors, channel_data logger.debug(f"Got all processed data in {result_toc-result_tic:.2f} s") # Build metadata and convert data to compressed bytes diff --git a/scos_actions/hardware/utils.py b/scos_actions/hardware/utils.py index 8a286e6d..f01250f8 100644 --- a/scos_actions/hardware/utils.py +++ b/scos_actions/hardware/utils.py @@ -40,9 +40,8 @@ def get_current_cpu_clock_speed() -> float: :return: """ try: - out = subprocess.run("lscpu | grep 'MHz'", shell=True, capture_output=True) - spd = str(out.stdout).split("\\n")[0].split()[2] - return float(spd) + cpu_freq = psutil.cpu_freq() + return cpu_freq.current except Exception as e: logger.error("Unable to retrieve current CPU speed") raise e