Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Django, Ray, Ubuntu #123

Merged
merged 27 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
cd2b137
ray 2.10.0
dboulware Apr 11, 2024
857c842
debugging
dboulware Apr 11, 2024
4b4dfc5
fix logging error.
dboulware Apr 11, 2024
be84b57
get remote objects.
dboulware Apr 11, 2024
1052fc2
debugging
dboulware Apr 11, 2024
c8550a8
debugging
dboulware Apr 11, 2024
2943914
debugging.
dboulware Apr 11, 2024
f9b5b32
debugging.
dboulware Apr 11, 2024
3d96494
debugging
dboulware Apr 11, 2024
df1a6d1
fix summaries.
dboulware Apr 11, 2024
81e7d8a
fix if else
dboulware Apr 11, 2024
a1e95ed
debugging
dboulware Apr 11, 2024
0332a70
return file references from IQ processor.
dboulware Apr 12, 2024
c0223d0
handle tuple returned from IQ processor.
dboulware Apr 12, 2024
e702bbd
don't log data product results.
dboulware Apr 12, 2024
6a6ab71
correct index in accessing data products.
dboulware Apr 12, 2024
409ec60
Optionally use env var for number of ray workers. Reduce debug logging.
dboulware Apr 12, 2024
057f6ab
add num_returns to IQProcessor.
dboulware Apr 15, 2024
f6d4ef1
update ray version
jhazentia Aug 16, 2024
be74889
update django version
jhazentia Aug 16, 2024
5a01af5
lscpu different in ubuntu 22.04, replace with psutil.cpu_freq
jhazentia Sep 27, 2024
921dcdf
debugging data product
jhazentia Sep 30, 2024
82d5129
Merge branch 'ray_2.10.0' of https://github.com/NTIA/scos-actions int…
jhazentia Sep 30, 2024
db30f04
update contact info
jhazentia Sep 30, 2024
872a01f
undo unneeded changes
jhazentia Sep 30, 2024
ab72370
update version number
jhazentia Oct 7, 2024
617d04f
update pre-commit python, autoformatting, fix missing pvt data
jhazentia Oct 31, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -414,4 +414,5 @@ See [LICENSE](LICENSE.md).

## Contact

For technical questions about SCOS Actions, contact Justin Haze, [[email protected]](mailto:[email protected])
For technical questions about SCOS Actions, contact the
[ITS Spectrum Monitoring Team](mailto:[email protected]).
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ classifiers = [

dependencies = [
"environs>=9.5.0",
"django>=3.2.18,<4.0",
"django>=4.2,<5.0",
"its_preselector @ git+https://github.com/NTIA/[email protected]",
"msgspec>=0.16.0,<1.0.0",
"numexpr>=2.8.3",
"numpy>=1.22.0",
"psutil>=5.9.4",
"python-dateutil>=2.0",
"ray>=2.6.3,<2.8.0",
"ray>=2.10.0",
"ruamel.yaml>=0.15",
"scipy>=1.8.0",
"sigmf @ git+https://github.com/NTIA/SigMF@multi-recording-archive",
Expand All @@ -65,7 +65,7 @@ test = [
dev = [
"hatchling>=1.14.1,<2.0",
"pre-commit>=3.3.1,<4.0",
"ray[default]>=2.4.0",
"ray[default]>=2.10.0",
"scos-actions[test]",
]

Expand Down
2 changes: 1 addition & 1 deletion scos_actions/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "10.0.2"
__version__ = "11.0.0"
75 changes: 39 additions & 36 deletions scos_actions/actions/acquire_sea_data_product.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@
FFT_WINDOW = get_fft_window(FFT_WINDOW_TYPE, FFT_SIZE)
FFT_WINDOW_ECF = get_fft_window_correction(FFT_WINDOW, "energy")
IMPEDANCE_OHMS = 50.0
NUM_ACTORS = 3 # Number of ray actors to initialize
NUM_ACTORS = env.int("RAY_WORKERS", default=3) # Number of ray actors to initialize

# Create power detectors
TD_DETECTOR = create_statistical_detector("TdMeanMaxDetector", ["max", "mean"])
Expand Down Expand Up @@ -417,14 +417,10 @@ def __init__(self, params: dict, iir_sos: np.ndarray):
self.apd_worker = AmplitudeProbabilityDistribution.remote(
params[APD_BIN_SIZE_DB], params[APD_MIN_BIN_DBM], params[APD_MAX_BIN_DBM]
)
self.workers = [
self.fft_worker,
self.pvt_worker,
self.pfp_worker,
self.apd_worker,
]

del params

@ray.method(num_returns=4)
def run(self, iqdata: np.ndarray) -> list:
"""
Filter the input IQ data and concurrently compute FFT, PVT, PFP, and APD results.
Expand All @@ -436,9 +432,11 @@ def run(self, iqdata: np.ndarray) -> list:
# Filter IQ and place it in the object store
iqdata = ray.put(sosfilt(sos=self.iir_sos, x=iqdata))
# Compute PSD, PVT, PFP, and APD concurrently.
# Do not wait until they finish. Yield references to their results.
yield [worker.run.remote(iqdata) for worker in self.workers]
del iqdata
fft_reference = self.fft_worker.run.remote(iqdata)
pvt_reference = self.pvt_worker.run.remote(iqdata)
pfp_reference = self.pfp_worker.run.remote(iqdata)
apd_reference = self.apd_worker.run.remote(iqdata)
return fft_reference, pvt_reference, pfp_reference, apd_reference


class NasctnSeaDataProduct(Action):
Expand Down Expand Up @@ -541,7 +539,7 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int):
logger.debug(f"Spawned {NUM_ACTORS} supervisor actors in {toc-tic:.2f} s")

# Collect all IQ data and spawn data product computation processes
dp_procs, cpu_speed, reference_points = [], [], []
data_products_refs, cpu_speed, reference_points = [], [], []
capture_tic = perf_counter()

for i, parameters in enumerate(self.iteration_params):
Expand All @@ -552,10 +550,8 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int):
)
# Start data product processing but do not block next IQ capture
tic = perf_counter()
data_products_refs.append(iq_processors[i % NUM_ACTORS].run.remote(measurement_result["data"]))

dp_procs.append(
iq_processors[i % NUM_ACTORS].run.remote(measurement_result["data"])
)
del measurement_result["data"]
toc = perf_counter()
logger.debug(f"IQ data delivered for processing in {toc-tic:.2f} s")
Expand Down Expand Up @@ -585,35 +581,42 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int):
[],
)
result_tic = perf_counter()
for channel_data_process in dp_procs:
# Retrieve object references for channel data
channel_data_refs = ray.get(channel_data_process)
channel_count = len(data_products_refs)
logger.debug(f"Have {channel_count} channel results")
for index in range(len(data_products_refs)):
logger.debug(f"Working on channel {index}")
channel_data = []
for i, data_ref in enumerate(channel_data_refs):
# Now block until the data is ready
data = ray.get(data_ref)
if i == 1:
# Power-vs-Time results, a tuple of arrays
data, summaries = data # Split the tuple
max_max_ch_pwrs.append(DATA_TYPE(summaries[0]))
med_mean_ch_pwrs.append(DATA_TYPE(summaries[1]))
mean_ch_pwrs.append(DATA_TYPE(summaries[2]))
median_ch_pwrs.append(DATA_TYPE(summaries[3]))
del summaries
if i == 3: # Separate condition is intentional
# APD result: append instead of extend,
# since the result is a single 1D array
channel_data.append(data)
else:
# For 2D arrays (PSD, PVT, PFP)
channel_data.extend(data)
# Now block until the data is ready
dp_refs_tuple = ray.get(data_products_refs[index])
psd_ref, pvt_ref, pfp_ref, apd_ref = dp_refs_tuple
psd_data = ray.get(psd_ref)
channel_data.extend(psd_data)

pvt_data = ray.get(pvt_ref)
# Power-vs-Time results, a tuple of arrays
data, summaries = pvt_data # Split the tuple
aromanielloNTIA marked this conversation as resolved.
Show resolved Hide resolved
max_max_ch_pwrs.append(DATA_TYPE(summaries[0]))
med_mean_ch_pwrs.append(DATA_TYPE(summaries[1]))
mean_ch_pwrs.append(DATA_TYPE(summaries[2]))
median_ch_pwrs.append(DATA_TYPE(summaries[3]))
del summaries

pfp_data = ray.get(pfp_ref)
channel_data.extend(pfp_data)

# APD result: append instead of extend,
# since the result is a single 1D array
apd_data = ray.get(apd_ref)
channel_data.append(apd_data)

toc = perf_counter()
logger.debug(f"Waited {toc-tic} s for channel data")
all_data.extend(NasctnSeaDataProduct.transform_data(channel_data))

for ray_actor in iq_processors:
ray.kill(ray_actor)
result_toc = perf_counter()
del dp_procs, iq_processors, channel_data, channel_data_refs
del iq_processors, channel_data
logger.debug(f"Got all processed data in {result_toc-result_tic:.2f} s")

# Build metadata and convert data to compressed bytes
Expand Down
5 changes: 2 additions & 3 deletions scos_actions/hardware/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ def get_current_cpu_clock_speed() -> float:
:return:
"""
try:
out = subprocess.run("lscpu | grep 'MHz'", shell=True, capture_output=True)
spd = str(out.stdout).split("\\n")[0].split()[2]
return float(spd)
cpu_freq = psutil.cpu_freq()
return cpu_freq.current
except Exception as e:
logger.error("Unable to retrieve current CPU speed")
raise e
Expand Down
Loading