Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update PSD statistical detector set in SEA action #105

Merged
merged 12 commits into from
Jan 5, 2024
2 changes: 1 addition & 1 deletion scos_actions/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "7.0.1"
__version__ = "7.0.2"
56 changes: 30 additions & 26 deletions scos_actions/actions/acquire_sea_data_product.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@
# Constants
DATA_TYPE = np.half
PFP_FRAME_RESOLUTION_S = (1e-3 * (1 + 1 / (14)) / 15) / 4
FFT_SIZE = 875
FFT_SIZE = 175 # 80 kHz resolution @ 14 MHz sampling rate
FFT_PERCENTILES = np.array([25, 75, 90, 95, 99, 99.9, 99.99])
FFT_WINDOW_TYPE = "flattop"
FFT_WINDOW = get_fft_window(FFT_WINDOW_TYPE, FFT_SIZE)
FFT_WINDOW_ECF = get_fft_window_correction(FFT_WINDOW, "energy")
Expand All @@ -119,8 +120,8 @@
NUM_ACTORS = 3 # Number of ray actors to initialize

# Create power detectors
TD_DETECTOR = create_statistical_detector("TdMeanMaxDetector", ["mean", "max"])
FFT_DETECTOR = create_statistical_detector("FftMeanMaxDetector", ["mean", "max"])
TD_DETECTOR = create_statistical_detector("TdMeanMaxDetector", ["max", "mean"])
FFT_M3_DETECTOR = create_statistical_detector("FftM3Detector", ["max", "mean", "median"])
PFP_M3_DETECTOR = create_statistical_detector("PfpM3Detector", ["min", "max", "mean"])


Expand All @@ -142,20 +143,22 @@ def __init__(
fft_size: int = FFT_SIZE,
fft_window: np.ndarray = FFT_WINDOW,
window_ecf: float = FFT_WINDOW_ECF,
detector: EnumMeta = FFT_DETECTOR,
detector: EnumMeta = FFT_M3_DETECTOR,
percentiles: np.ndarray = FFT_PERCENTILES,
impedance_ohms: float = IMPEDANCE_OHMS,
):
self.detector = detector
self.percentiles = percentiles
self.fft_size = fft_size
self.fft_window = fft_window
self.num_ffts = num_ffts
# Get truncation points: truncate FFT result to middle 625 samples (middle 10 MHz from 14 MHz)
self.bin_start = int(fft_size / 7) # bin_start = 125 with FFT_SIZE 875
self.bin_end = fft_size - self.bin_start # bin_end = 750 with FFT_SIZE 875
# Get truncation points: truncate FFT result to middle 125 samples (middle 10 MHz from 14 MHz)
self.bin_start = int(fft_size / 7) # bin_start = 25 with FFT_SIZE 175
self.bin_end = fft_size - self.bin_start # bin_end = 150 with FFT_SIZE 175
# Compute the amplitude shift for PSD scaling. The FFT result
# is in pseudo-power log units and must be scaled to a PSD.
self.fft_scale_factor = (
-10.0 * np.log10(impedance_ohms) # Pseudo-power to power
- 10.0 * np.log10(impedance_ohms) # Pseudo-power to power
+ 27.0 # Watts to dBm (+30) and baseband to RF (-3)
- 10.0 * np.log10(sample_rate_Hz * fft_size) # PSD scaling
+ 20.0 * np.log10(window_ecf) # Window energy correction
Expand All @@ -170,20 +173,23 @@ def run(self, iq: ray.ObjectRef) -> np.ndarray:
:return: A 2D NumPy array of statistical detector results computed
from PSD amplitudes, ordered (max, mean).
"""
fft_result = get_fft(
fft_amplitudes = get_fft(
iq, self.fft_size, "backward", self.fft_window, self.num_ffts, False, 1
)
fft_result = calculate_pseudo_power(fft_result)
fft_result = apply_statistical_detector(
fft_result, self.detector
) # (max, mean)
# Power in Watts
fft_amplitudes = calculate_pseudo_power(fft_amplitudes)
fft_result = apply_statistical_detector(fft_amplitudes, self.detector) # (max, mean, median)
percentile_result = np.percentile(fft_amplitudes, self.percentiles, axis=0)
fft_result = np.vstack((fft_result, percentile_result))
fft_result = np.fft.fftshift(fft_result, axes=(1,)) # Shift frequencies
fft_result = fft_result[
:, self.bin_start : self.bin_end
] # Truncation to middle bins
fft_result = 10.0 * np.log10(fft_result) + self.fft_scale_factor

# Returned order is (max, mean)
# Returned order is (max, mean, median, 25%, 75%, 90%, 95%, 99%, 99.9%, 99.99%)
# Total of 10 arrays, each of length 125 (output shape (10, 125))
# Percentile computation linearly interpolates. See numpy documentation.
return fft_result


Expand Down Expand Up @@ -985,18 +991,15 @@ def create_global_data_product_metadata(self) -> None:
self.sigmf_builder.set_processing_info([iir_obj, dft_obj])

psd_length = int(FFT_SIZE * (5 / 7))
psd_bin_center_offset = p[SAMPLE_RATE] / FFT_SIZE / 2
psd_x_axis__Hz = np.arange(psd_length) * (
(p[SAMPLE_RATE] / FFT_SIZE)
- (p[SAMPLE_RATE] * (5 / 7) / 2)
+ psd_bin_center_offset
)
psd_bin_start = int(FFT_SIZE / 7) # bin_start = 125 with FFT_SIZE 875
psd_bin_end = FFT_SIZE - psd_bin_start # bin_end = 750 with FFT_SIZE 875
psd_x_axis__Hz = get_fft_frequencies(FFT_SIZE, 14e6, 0.0) # Baseband
psd_x_axis__Hz = get_fft_frequencies(FFT_SIZE, p[SAMPLE_RATE], 0.0) # Baseband
psd_graph = ntia_algorithm.Graph(
name="Power Spectral Density",
series=[d.value for d in FFT_DETECTOR], # ["max", "mean"]
series=[d.value for d in FFT_M3_DETECTOR]
+ [
f"{int(p)}th_percentile" if p.is_integer() else f"{p}th_percentile" for p in FFT_PERCENTILES
], # ["max", "mean", "median", "25th_percentile", "75th_percentile", ... "99.99th_percentile"]
length=int(FFT_SIZE * (5 / 7)),
x_units="Hz",
x_start=[psd_x_axis__Hz[psd_bin_start]],
Expand All @@ -1006,9 +1009,10 @@ def create_global_data_product_metadata(self) -> None:
processing=[dft_obj.id],
reference=DATA_REFERENCE_POINT,
description=(
"Max- and mean-detected power spectral density, with the "
+ f"first and last {int(FFT_SIZE / 7)} samples discarded. "
+ "FFTs computed on IIR-filtered data."
"Results of statistical detectors (max, mean, median, 25th_percentile, 75th_percentile, "
+ "90th_percentile, 95th_percentile, 99th_percentile, 99.9th_percentile, 99.99th_percentile) "
+ f"applied to power spectral density samples, with the first and last {int(FFT_SIZE / 7)} "
+ "samples discarded. FFTs computed on IIR-filtered data."
),
)

Expand Down Expand Up @@ -1086,7 +1090,7 @@ def create_global_data_product_metadata(self) -> None:
[psd_graph, pvt_graph, pfp_graph, apd_graph]
)
self.total_channel_data_length = (
psd_length * len(FFT_DETECTOR)
psd_length * (len(FFT_M3_DETECTOR) + len(FFT_PERCENTILES))
+ pvt_length * len(TD_DETECTOR)
+ pfp_length * len(PFP_M3_DETECTOR) * 2
+ apd_graph.length
Expand Down