113
113
FFT_WINDOW = get_fft_window (FFT_WINDOW_TYPE , FFT_SIZE )
114
114
FFT_WINDOW_ECF = get_fft_window_correction (FFT_WINDOW , "energy" )
115
115
IMPEDANCE_OHMS = 50.0
116
- NUM_ACTORS = 3 # Number of ray actors to initialize
116
+ NUM_ACTORS = env . int ( "RAY_WORKERS" , default = 3 ) # Number of ray actors to initialize
117
117
118
118
# Create power detectors
119
119
TD_DETECTOR = create_statistical_detector ("TdMeanMaxDetector" , ["max" , "mean" ])
@@ -417,14 +417,10 @@ def __init__(self, params: dict, iir_sos: np.ndarray):
417
417
self .apd_worker = AmplitudeProbabilityDistribution .remote (
418
418
params [APD_BIN_SIZE_DB ], params [APD_MIN_BIN_DBM ], params [APD_MAX_BIN_DBM ]
419
419
)
420
- self .workers = [
421
- self .fft_worker ,
422
- self .pvt_worker ,
423
- self .pfp_worker ,
424
- self .apd_worker ,
425
- ]
420
+
426
421
del params
427
422
423
+ @ray .method (num_returns = 4 )
428
424
def run (self , iqdata : np .ndarray ) -> list :
429
425
"""
430
426
Filter the input IQ data and concurrently compute FFT, PVT, PFP, and APD results.
@@ -436,9 +432,11 @@ def run(self, iqdata: np.ndarray) -> list:
436
432
# Filter IQ and place it in the object store
437
433
iqdata = ray .put (sosfilt (sos = self .iir_sos , x = iqdata ))
438
434
# Compute PSD, PVT, PFP, and APD concurrently.
439
- # Do not wait until they finish. Yield references to their results.
440
- yield [worker .run .remote (iqdata ) for worker in self .workers ]
441
- del iqdata
435
+ fft_reference = self .fft_worker .run .remote (iqdata )
436
+ pvt_reference = self .pvt_worker .run .remote (iqdata )
437
+ pfp_reference = self .pfp_worker .run .remote (iqdata )
438
+ apd_reference = self .apd_worker .run .remote (iqdata )
439
+ return fft_reference , pvt_reference , pfp_reference , apd_reference
442
440
443
441
444
442
class NasctnSeaDataProduct (Action ):
@@ -541,7 +539,7 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int):
541
539
logger .debug (f"Spawned { NUM_ACTORS } supervisor actors in { toc - tic :.2f} s" )
542
540
543
541
# Collect all IQ data and spawn data product computation processes
544
- dp_procs , cpu_speed , reference_points = [], [], []
542
+ data_products_refs , cpu_speed , reference_points = [], [], []
545
543
capture_tic = perf_counter ()
546
544
547
545
for i , parameters in enumerate (self .iteration_params ):
@@ -552,10 +550,10 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int):
552
550
)
553
551
# Start data product processing but do not block next IQ capture
554
552
tic = perf_counter ()
555
-
556
- dp_procs .append (
553
+ data_products_refs .append (
557
554
iq_processors [i % NUM_ACTORS ].run .remote (measurement_result ["data" ])
558
555
)
556
+
559
557
del measurement_result ["data" ]
560
558
toc = perf_counter ()
561
559
logger .debug (f"IQ data delivered for processing in { toc - tic :.2f} s" )
@@ -585,35 +583,43 @@ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int):
585
583
[],
586
584
)
587
585
result_tic = perf_counter ()
588
- for channel_data_process in dp_procs :
589
- # Retrieve object references for channel data
590
- channel_data_refs = ray .get (channel_data_process )
586
+ channel_count = len (data_products_refs )
587
+ logger .debug (f"Have { channel_count } channel results" )
588
+ for index in range (len (data_products_refs )):
589
+ logger .debug (f"Working on channel { index } " )
591
590
channel_data = []
592
- for i , data_ref in enumerate (channel_data_refs ):
593
- # Now block until the data is ready
594
- data = ray .get (data_ref )
595
- if i == 1 :
596
- # Power-vs-Time results, a tuple of arrays
597
- data , summaries = data # Split the tuple
598
- max_max_ch_pwrs .append (DATA_TYPE (summaries [0 ]))
599
- med_mean_ch_pwrs .append (DATA_TYPE (summaries [1 ]))
600
- mean_ch_pwrs .append (DATA_TYPE (summaries [2 ]))
601
- median_ch_pwrs .append (DATA_TYPE (summaries [3 ]))
602
- del summaries
603
- if i == 3 : # Separate condition is intentional
604
- # APD result: append instead of extend,
605
- # since the result is a single 1D array
606
- channel_data .append (data )
607
- else :
608
- # For 2D arrays (PSD, PVT, PFP)
609
- channel_data .extend (data )
591
+ # Now block until the data is ready
592
+ dp_refs_tuple = ray .get (data_products_refs [index ])
593
+ psd_ref , pvt_ref , pfp_ref , apd_ref = dp_refs_tuple
594
+ psd_data = ray .get (psd_ref )
595
+ channel_data .extend (psd_data )
596
+
597
+ pvt_data = ray .get (pvt_ref )
598
+ # Power-vs-Time results, a tuple of arrays
599
+ data , summaries = pvt_data # Split the tuple
600
+ max_max_ch_pwrs .append (DATA_TYPE (summaries [0 ]))
601
+ med_mean_ch_pwrs .append (DATA_TYPE (summaries [1 ]))
602
+ mean_ch_pwrs .append (DATA_TYPE (summaries [2 ]))
603
+ median_ch_pwrs .append (DATA_TYPE (summaries [3 ]))
604
+ channel_data .extend (data )
605
+ del summaries
606
+
607
+ pfp_data = ray .get (pfp_ref )
608
+ channel_data .extend (pfp_data )
609
+
610
+ # APD result: append instead of extend,
611
+ # since the result is a single 1D array
612
+ apd_data = ray .get (apd_ref )
613
+ channel_data .append (apd_data )
614
+
610
615
toc = perf_counter ()
611
616
logger .debug (f"Waited { toc - tic } s for channel data" )
612
617
all_data .extend (NasctnSeaDataProduct .transform_data (channel_data ))
618
+
613
619
for ray_actor in iq_processors :
614
620
ray .kill (ray_actor )
615
621
result_toc = perf_counter ()
616
- del dp_procs , iq_processors , channel_data , channel_data_refs
622
+ del iq_processors , channel_data
617
623
logger .debug (f"Got all processed data in { result_toc - result_tic :.2f} s" )
618
624
619
625
# Build metadata and convert data to compressed bytes
0 commit comments