From 876b6a665aa82da6e7c3a87b2efccda2ae7a1685 Mon Sep 17 00:00:00 2001 From: Justin Haze Date: Tue, 9 Apr 2024 13:11:08 -0600 Subject: [PATCH] move sensor overload check from scos-usrp to sensor class --- scos_actions/hardware/sensor.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/scos_actions/hardware/sensor.py b/scos_actions/hardware/sensor.py index 47198b93..52f41679 100644 --- a/scos_actions/hardware/sensor.py +++ b/scos_actions/hardware/sensor.py @@ -4,6 +4,7 @@ import logging from typing import Any, Dict, List, Optional +import numpy as np from its_preselector.preselector import Preselector from its_preselector.web_relay import WebRelay @@ -246,6 +247,26 @@ def recompute_calibration_data(self, params: dict) -> None: if not recomputed: logger.warning("Failed to recompute calibration data") + def check_sensor_overload(self, data) -> bool: + """Check for sensor overload in the measurement data.""" + measured_data = data.astype(np.complex64) + + time_domain_avg_power = 10 * np.log10(np.mean(np.abs(measured_data) ** 2)) + time_domain_avg_power += ( + 10 * np.log10(1 / (2 * 50)) + 30 + ) # Convert log(V^2) to dBm + # explicitly check is not None since 1db compression could be 0 + if self.sensor_calibration_data["compression_point"] is not None: + return bool( + time_domain_avg_power + > self.sensor_calibration_data["compression_point"] + ) + else: + logger.debug( + "Compression point is None, returning False for sensor overload." + ) + return False + def acquire_time_domain_samples( self, num_samples: int, @@ -288,6 +309,7 @@ def acquire_time_domain_samples( logger.debug("*************************************\n") max_retries = retries + sensor_overload = False # Acquire samples from signal analyzer if self.signal_analyzer is not None: while True: @@ -363,6 +385,15 @@ def acquire_time_domain_samples( measurement_result["applied_calibration"]["compression_point"] = ( self.sensor_calibration_data["compression_point"] ) + sensor_overload = self.check_sensor_overload( + measurement_result["data"] + ) + if sensor_overload: + logger.warning("Sensor overload occurred!") + # measurement_result["overload"] could be true based on sigan overload or sensor overload + measurement_result["overload"] = ( + measurement_result["overload"] or sensor_overload + ) applied_cal = measurement_result["applied_calibration"] logger.debug(f"Setting applied_calibration to: {applied_cal}") else: