-
Notifications
You must be signed in to change notification settings - Fork 2
/
acquire_stepped_freq_tdomain_iq.py
180 lines (153 loc) · 7.32 KB
/
acquire_stepped_freq_tdomain_iq.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# What follows is a parameterizable description of the algorithm used by this
# action. The first line is the summary and should be written in plain text.
# Everything following that is the extended description, which can be written
# in Markdown and MathJax. Each name in curly brackets '{}' will be replaced
# with the value specified in the `description` method which can be found at
# the very bottom of this file. Since this parameterization step affects
# everything in curly brackets, math notation such as {m \over n} must be
# escaped to {{m \over n}}.
#
# To print out this docstring after parameterization, see
# scos-sensor/scripts/print_action_docstring.py. You can then paste that into the
# SCOS Markdown Editor (link below) to see the final rendering.
#
# Resources:
# - MathJax reference: https://math.meta.stackexchange.com/q/5020
# - Markdown reference: https://commonmark.org/help/
# - SCOS Markdown Editor: https://ntia.github.io/scos-md-editor/
#
r"""Capture time-domain IQ samples at the following {num_center_frequencies} frequencies: {center_frequencies}.
# {name}
## Signal Analyzer setup and sample acquisition
Each time this task runs, the following process is followed:
{acquisition_plan}
This will take a minimum of {min_duration_ms:.2f} ms, not including signal analyzer
tuning, dropping samples after retunes, and data storage.
## Time-domain processing
If specified, a voltage scaling factor is applied to the complex time-domain
signals.
"""
import logging
import numpy as np
from scos_actions.actions.acquire_single_freq_tdomain_iq import (
CAL_ADJUST,
DURATION_MS,
FREQUENCY,
NUM_SKIP,
SingleFrequencyTimeDomainIqAcquisition,
)
from scos_actions.hardware.sensor import Sensor
from scos_actions.metadata.structs import ntia_sensor
from scos_actions.metadata.structs.capture import CaptureSegment
from scos_actions.signals import measurement_action_completed
from scos_actions.utils import get_parameter
from scos_actions import utils
logger = logging.getLogger(__name__)
class SteppedFrequencyTimeDomainIqAcquisition(SingleFrequencyTimeDomainIqAcquisition):
"""Acquire IQ data at each of the requested frequencies.
The action will set any matching attributes found in the
signal analyzer object. The following parameters are required
by the action:
name: The name of the action.
frequency: An iterable of center frequencies, in Hz.
duration_ms: An iterable of measurement durations, per
center_frequency, in ms
For the parameters required by the signal analyzer, see the
documentation from the Python package for the signal analyzer
being used.
:param parameters: The dictionary of parameters needed for
the action and the signal analyzer.
:param sigan: instance of SignalAnalyzerInterface
"""
def __init__(self, parameters: dict):
super().__init__(parameters=parameters)
num_center_frequencies = len(parameters[FREQUENCY])
# Create iterable parameter set
self.iterable_params = utils.get_iterable_parameters(parameters)
self.num_center_frequencies = num_center_frequencies
def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int):
"""This is the entrypoint function called by the scheduler."""
self._sensor = sensor
self.test_required_components()
saved_samples = 0
for recording_id, measurement_params in enumerate(
self.iterable_params, start=1
):
self.get_sigmf_builder(schedule_entry)
self.configure(measurement_params)
duration_ms = get_parameter(DURATION_MS, measurement_params)
nskip = get_parameter(NUM_SKIP, measurement_params)
cal_adjust = get_parameter(CAL_ADJUST, measurement_params)
sample_rate = self.sensor.signal_analyzer.sample_rate
num_samples = int(sample_rate * duration_ms * 1e-3)
measurement_result = super().acquire_data(num_samples, nskip, cal_adjust)
measurement_result.update(measurement_params)
end_time = utils.get_datetime_str_now()
measurement_result["end_time"] = end_time
measurement_result["task_id"] = task_id
measurement_result["name"] = self.name
measurement_result["classification"] = self.classification
sigan_settings = self.get_sigan_settings(measurement_result)
capture_segment = CaptureSegment(
sample_start=0,
global_index=saved_samples,
frequency=measurement_params[FREQUENCY],
datetime=measurement_result["capture_time"],
duration=duration_ms,
overload=measurement_result["overload"],
sigan_settings=sigan_settings,
)
sensor_cal = self.sensor.sensor_calibration_data
if sensor_cal is not None:
if "1db_compression_point" in sensor_cal:
sensor_cal["compression_point"] = sensor_cal.pop(
"1db_compression_point"
)
if "reference" not in sensor_cal:
# If the calibration data already includes this, don't overwrite
sensor_cal["reference"] = measurement_result["reference"]
capture_segment.sensor_calibration = ntia_sensor.Calibration(
**sensor_cal
)
measurement_result["capture_segment"] = capture_segment
self.create_metadata(measurement_result, recording_id)
measurement_action_completed.send(
sender=self.__class__,
task_id=task_id,
data=measurement_result["data"],
metadata=self.sigmf_builder.metadata,
)
saved_samples += num_samples
@property
def description(self):
"""Parameterize and return the module-level docstring."""
acquisition_plan = ""
used_keys = [FREQUENCY, DURATION_MS, "name"]
acq_plan_template = "The signal analyzer is tuned to {center_frequency:.2f} MHz and the following parameters are set:\n"
acq_plan_template += "{parameters}"
acq_plan_template += "Then, acquire samples for {duration_ms} ms.\n"
for measurement_params in self.iterable_params:
parameters = ""
for name, value in measurement_params.items():
if name not in used_keys:
parameters += f"{name} = {value}\n"
acquisition_plan += acq_plan_template.format(
**{
"center_frequency": measurement_params[FREQUENCY] / 1e6,
"parameters": parameters,
"duration_ms": measurement_params[DURATION_MS],
}
)
durations = [v[DURATION_MS] for v in self.iterable_params]
min_duration_ms = np.sum(durations)
defs = {
"name": self.name,
"num_center_frequencies": self.num_center_frequencies,
"center_frequencies": ", ".join(
[f"{param[FREQUENCY] / 1e6:.2f} MHz" for param in self.iterable_params]
),
"acquisition_plan": acquisition_plan,
"min_duration_ms": min_duration_ms,
}
# __doc__ refers to the module docstring at the top of the file
return __doc__.format(**defs)