diff --git a/bindings/util/estimator_kalman_emulator.py b/bindings/util/estimator_kalman_emulator.py new file mode 100644 index 0000000000..21ba5cf767 --- /dev/null +++ b/bindings/util/estimator_kalman_emulator.py @@ -0,0 +1,137 @@ +import math +import cffirmware + + + + + +class EstimatorKalmanEmulator: + """ + This class emulates the behavior of estimator_kalman.c and is used as a helper to enable testing of the kalman + core functionatlity. Estimator_kalman.c is tightly coupled to FreeRTOS (using + tasks for instance) and can not really be tested on this level, instead this class can be used to drive the + kalman core functionality. + + The class emulates the measurement queue, the main loop in the task and the various calls to kalman core. + + Methods are named in a similar way to the functions in estimator_kalman.c to make it easier to understand + how they are connected. + + """ + def __init__(self, anchor_positions) -> None: + self.anchor_positions = anchor_positions + self.accSubSampler = cffirmware.Axis3fSubSampler_t() + self.gyroSubSampler = cffirmware.Axis3fSubSampler_t() + self.coreData = cffirmware.kalmanCoreData_t() + self.outlierFilterState = cffirmware.OutlierFilterTdoaState_t() + + self.TDOA_ENGINE_MEASUREMENT_NOISE_STD = 0.30 + self.PREDICT_RATE = 100 + self.PREDICT_STEP_MS = 1000 / self.PREDICT_RATE + + self._is_initialized = False + + def run_one_1khz_iteration(self, sensor_samples) -> tuple[float, cffirmware.state_t]: + """ + Run one iteration of the estimation loop (runs at 1kHz) + + Args: + sensor_samples : a list of samples to be consumed. The samples with time stamps that are used in this + iteration will be popped from the list. + + Returns: + tuple[float, cffirmware.state_t]: A tuple containing the time stamp of this iteration and the + estimated state + """ + if not self._is_initialized: + first_sample = sensor_samples[0] + time_ms = int(first_sample[1]['timestamp']) + self._lazy_init(time_ms) + + # Simplification, assume always flying + quad_is_flying = True + + if self.now_ms > self.next_prediction_ms: + cffirmware.axis3fSubSamplerFinalize(self.accSubSampler) + cffirmware.axis3fSubSamplerFinalize(self.gyroSubSampler) + + cffirmware.kalmanCorePredict(self.coreData, self.accSubSampler.subSample, self.gyroSubSampler.subSample, + self.now_ms, quad_is_flying) + + self.next_prediction_ms += self.PREDICT_STEP_MS + + cffirmware.kalmanCoreAddProcessNoise(self.coreData, self.coreParams, self.now_ms) + + self._update_queued_measurements(self.now_ms, sensor_samples) + + cffirmware.kalmanCoreFinalize(self.coreData) + + # Main loop called at 1000 Hz in the firmware + self.now_ms += 1 + + external_state = cffirmware.state_t() + acc_latest = cffirmware.Axis3f() + cffirmware.kalmanCoreExternalizeState(self.coreData, external_state, acc_latest) + + return self.now_ms, external_state + + def _lazy_init(self, sample_time_ms): + self.now_ms = sample_time_ms + self.next_prediction_ms = self.now_ms + self.PREDICT_STEP_MS + + GRAVITY_MAGNITUDE = 9.81 + DEG_TO_RAD = math.pi / 180.0 + cffirmware.axis3fSubSamplerInit(self.accSubSampler, GRAVITY_MAGNITUDE) + cffirmware.axis3fSubSamplerInit(self.gyroSubSampler, DEG_TO_RAD) + + self.coreParams = cffirmware.kalmanCoreParams_t() + cffirmware.kalmanCoreDefaultParams(self.coreParams) + cffirmware.outlierFilterTdoaReset(self.outlierFilterState) + cffirmware.kalmanCoreInit(self.coreData, self.coreParams, self.now_ms) + + self._is_initialized = True + + def _update_queued_measurements(self, now_ms: int, sensor_samples): + done = False + + while len(sensor_samples): + sample = sensor_samples.pop(0) + time_ms = int(sample[1]['timestamp']) + if time_ms <= now_ms: + self._update_with_sample(sample, now_ms) + else: + return + + def _update_with_sample(self, sample, now_ms): + if sample[0] == 'estTDOA': + tdoa_data = sample[1] + tdoa = cffirmware.tdoaMeasurement_t() + + tdoa.anchorIdA = int(tdoa_data['idA']) + tdoa.anchorIdB = int(tdoa_data['idB']) + tdoa.anchorPositionA = self.anchor_positions[tdoa.anchorIdA] + tdoa.anchorPositionB = self.anchor_positions[tdoa.anchorIdB] + tdoa.distanceDiff = float(tdoa_data['distanceDiff']) + tdoa.stdDev = self.TDOA_ENGINE_MEASUREMENT_NOISE_STD + + cffirmware.kalmanCoreUpdateWithTdoa(self.coreData, tdoa, now_ms, self.outlierFilterState) + + if sample[0] == 'estAcceleration': + acc_data = sample[1] + + acc = cffirmware.Axis3f() + acc.x = float(acc_data['acc.x']) + acc.y = float(acc_data['acc.y']) + acc.z = float(acc_data['acc.z']) + + cffirmware.axis3fSubSamplerAccumulate(self.accSubSampler, acc) + + if sample[0] == 'estGyroscope': + gyro_data = sample[1] + + gyro = cffirmware.Axis3f() + gyro.x = float(gyro_data['gyro.x']) + gyro.y = float(gyro_data['gyro.y']) + gyro.z = float(gyro_data['gyro.z']) + + cffirmware.axis3fSubSamplerAccumulate(self.gyroSubSampler, gyro) diff --git a/bindings/util/loco_utils.py b/bindings/util/loco_utils.py new file mode 100644 index 0000000000..283348ebea --- /dev/null +++ b/bindings/util/loco_utils.py @@ -0,0 +1,25 @@ +import cffirmware +import yaml + +def read_loco_anchor_positions(file_name: str) -> dict[int, cffirmware.vec3_s]: + """ + Read anchor position data from a file exported from the client. + + Args: + file_name (str): The name of the file + + Returns: + dict[int, cffirmware.vec3_s]: A dictionary from anchor id to a 3D-vector + """ + result = {} + + with open(file_name, 'r') as file: + data = yaml.safe_load(file) + for id, vals in data.items(): + point = cffirmware.vec3_s() + point.x = vals['x'] + point.y = vals['y'] + point.z = vals['z'] + result[id] = point + + return result diff --git a/bindings/util/sd_card_file_runner.py b/bindings/util/sd_card_file_runner.py new file mode 100644 index 0000000000..9ded48ff14 --- /dev/null +++ b/bindings/util/sd_card_file_runner.py @@ -0,0 +1,51 @@ +import tools.usdlog.cfusdlog as cfusdlog +from bindings.util.estimator_kalman_emulator import EstimatorKalmanEmulator + +class SdCardFileRunner: + """ + This class is used to read data from a file and feed it to a kalman estimator emulator, usually for testing + purposes. + """ + + def __init__(self, file_name: str) -> None: + """ + Read sampled data from a file and feed to a kalman estimator emulator. + The supported file format is what is recorded using a uSD-card deck on the Crazyflie. + In the common use case the file should contain accelerometer and gyro data, as well as some other sensor + data that is to be fed to the kalman estimator. + + Args: + file_name (str): Name of the file + """ + self.samples = self._read_sensor_data_sorted(file_name) + + def run_estimator_loop(self, emulator: EstimatorKalmanEmulator): + result = [] + while len(self.samples): + now_ms, external_state = emulator.run_one_1khz_iteration(self.samples) + result.append((now_ms, (external_state.position.x, external_state.position.y, external_state.position.z))) + + return result + + def _read_sensor_data_sorted(self, file_name: str): + """Read sensor data from a file recorded using the uSD-card on a Crazyflie + + Args: + file_name: The name of the file with recorded data + + Returns: + A list of sensor samples, sorted in time order. The first field of each row identifies the sensor + that produced the sample + """ + log_data = cfusdlog.decode(file_name) + + samples = [] + for log_type, data in log_data.items(): + for i in range(len(data['timestamp'])): + sample_data = {} + for name, data_list in data.items(): + sample_data[name] = data_list[i] + samples.append((log_type, sample_data)) + + samples.sort(key=lambda x: x[1]['timestamp']) + return samples diff --git a/test_python/test_kalman_core.py b/test_python/test_kalman_core.py index 0af10343db..1fea4b2db7 100644 --- a/test_python/test_kalman_core.py +++ b/test_python/test_kalman_core.py @@ -1,199 +1,21 @@ #!/usr/bin/env python -import yaml -import cffirmware -import tools.usdlog.cfusdlog as cfusdlog -import math import numpy as np +from bindings.util.estimator_kalman_emulator import EstimatorKalmanEmulator +from bindings.util.sd_card_file_runner import SdCardFileRunner +from bindings.util.loco_utils import read_loco_anchor_positions def test_kalman_core_with_tdoa3(): # Fixture fixture_base = 'test_python/fixtures/kalman_core' anchor_positions = read_loco_anchor_positions(fixture_base + '/anchor_positions.yaml') - sensor_samples = read_sensor_data_sorted(fixture_base + '/log05') + runner = SdCardFileRunner(fixture_base + '/log05') emulator = EstimatorKalmanEmulator(anchor_positions) # Test - actual = emulator.run_estimator_loop(sensor_samples) + actual = runner.run_estimator_loop(emulator) # Assert # Verify that the final position is close-ish to (0, 0, 0) actual_final_pos = np.array(actual[-1][1]) assert np.linalg.norm(actual_final_pos - [0.0, 0.0, 0.0]) < 0.4 - - -## Helpers ########################### - -def read_loco_anchor_positions(file_name: str) -> dict[int, cffirmware.vec3_s]: - """ - Read anchor position data from a file exported from the client. - - Args: - file_name (str): The name of the file - - Returns: - dict[int, cffirmware.vec3_s]: A dictionary from anchor id to a 3D-vector - """ - result = {} - - with open(file_name, 'r') as file: - data = yaml.safe_load(file) - for id, vals in data.items(): - point = cffirmware.vec3_s() - point.x = vals['x'] - point.y = vals['y'] - point.z = vals['z'] - result[id] = point - - return result - - -def read_sensor_data_sorted(file_name: str): - """Read sensor data from a file recorded using the uSD-card on a Crazyflie - - Args: - file_name: The name of the file with recorded data - - Returns: - _type_: A list of sensor samples, sorted in time order. The first field of each row identifies the sensor - that produced the sample - """ - log_data = cfusdlog.decode(file_name) - - samples = [] - for log_type, data in log_data.items(): - for i in range(len(data['timestamp'])): - sample_data = {} - for name, data_list in data.items(): - sample_data[name] = data_list[i] - samples.append((log_type, sample_data)) - - samples.sort(key=lambda x: x[1]['timestamp']) - return samples - - -class EstimatorKalmanEmulator: - """ - This class emulates the behavior of estimator_kalman.c and is used as a helper to enable testing of the kalman - core functionatlity. Estimator_kalman.c is tightly coupled to FreeRTOS (using - tasks for instance) and can not really be tested on this level, instead this class can be used to drive the - kalman core functionlity. - """ - def __init__(self, anchor_positions) -> None: - self.anchor_positions = anchor_positions - self.accSubSampler = cffirmware.Axis3fSubSampler_t() - self.gyroSubSampler = cffirmware.Axis3fSubSampler_t() - self.coreData = cffirmware.kalmanCoreData_t() - self.outlierFilterState = cffirmware.OutlierFilterTdoaState_t() - - self.TDOA_ENGINE_MEASUREMENT_NOISE_STD = 0.30 - self.PREDICT_RATE = 100 - self.PREDICT_STEP_MS = 1000 / self.PREDICT_RATE - - def run_estimator_loop(self, sensor_samples): - """ - Emulation of the main loop in estimator_kalman.c - - Args: - sensor_samples: Ordered list of sensor samples - - Returns: - estimated positions: A list of tuples containing the time and estimated position - """ - start_index = 0 - os_time_first_ms = int(sensor_samples[start_index][1]['timestamp']) - now_ms = os_time_first_ms - next_prediction_ms = now_ms + self.PREDICT_STEP_MS - - GRAVITY_MAGNITUDE = 9.81 - DEG_TO_RAD = math.pi / 180.0 - cffirmware.axis3fSubSamplerInit(self.accSubSampler, GRAVITY_MAGNITUDE) - cffirmware.axis3fSubSamplerInit(self.gyroSubSampler, DEG_TO_RAD) - - coreParams = cffirmware.kalmanCoreParams_t() - cffirmware.kalmanCoreDefaultParams(coreParams) - cffirmware.outlierFilterTdoaReset(self.outlierFilterState) - cffirmware.kalmanCoreInit(self.coreData, coreParams, now_ms) - - # Simplification, assume always flying - quad_is_flying = True - - # Main loop - index = start_index - result = [] - while index < len(sensor_samples): - if now_ms > next_prediction_ms: - cffirmware.axis3fSubSamplerFinalize(self.accSubSampler) - cffirmware.axis3fSubSamplerFinalize(self.gyroSubSampler) - - cffirmware.kalmanCorePredict(self.coreData, self.accSubSampler.subSample, self.gyroSubSampler.subSample, - now_ms, quad_is_flying) - - next_prediction_ms += self.PREDICT_STEP_MS - - cffirmware.kalmanCoreAddProcessNoise(self.coreData, coreParams, now_ms) - - index = self._update_queued_measurements(now_ms, sensor_samples, index) - - cffirmware.kalmanCoreFinalize(self.coreData) - - external_state = cffirmware.state_t() - acc_latest = cffirmware.Axis3f() - cffirmware.kalmanCoreExternalizeState(self.coreData, external_state, acc_latest) - result.append((now_ms, (external_state.position.x, external_state.position.y, external_state.position.z))) - - # Main loop called at 1000 Hz in the firmware - now_ms += 1 - - return result - - def _update_queued_measurements(self, now_ms: int, sensor_samples, current_index: int) -> int: - index = current_index - done = False - - while not done: - sample = sensor_samples[index] - time_ms = int(sample[1]['timestamp']) - if time_ms <= now_ms: - self._update_with_sample(sample, now_ms) - - index += 1 - done = index >= len(sensor_samples) - else: - done = True - - return index - - def _update_with_sample(self, sample, now_ms): - if sample[0] == 'estTDOA': - tdoa_data = sample[1] - tdoa = cffirmware.tdoaMeasurement_t() - - tdoa.anchorIdA = int(tdoa_data['idA']) - tdoa.anchorIdB = int(tdoa_data['idB']) - tdoa.anchorPositionA = self.anchor_positions[tdoa.anchorIdA] - tdoa.anchorPositionB = self.anchor_positions[tdoa.anchorIdB] - tdoa.distanceDiff = float(tdoa_data['distanceDiff']) - tdoa.stdDev = self.TDOA_ENGINE_MEASUREMENT_NOISE_STD - - cffirmware.kalmanCoreUpdateWithTdoa(self.coreData, tdoa, now_ms, self.outlierFilterState) - - if sample[0] == 'estAcceleration': - acc_data = sample[1] - - acc = cffirmware.Axis3f() - acc.x = float(acc_data['acc.x']) - acc.y = float(acc_data['acc.y']) - acc.z = float(acc_data['acc.z']) - - cffirmware.axis3fSubSamplerAccumulate(self.accSubSampler, acc) - - if sample[0] == 'estGyroscope': - gyro_data = sample[1] - - gyro = cffirmware.Axis3f() - gyro.x = float(gyro_data['gyro.x']) - gyro.y = float(gyro_data['gyro.y']) - gyro.z = float(gyro_data['gyro.z']) - - cffirmware.axis3fSubSamplerAccumulate(self.gyroSubSampler, gyro)