From 52ca8c5ebda4e812900dc47af4e1f7304f42b04a Mon Sep 17 00:00:00 2001 From: Christopher Teubert Date: Wed, 16 Mar 2022 12:34:04 -0700 Subject: [PATCH 1/2] KF and UKF --- .../state_estimators/kalman_filter.py | 53 ++++++++++--- .../unscented_kalman_filter.py | 67 ++++++++++++---- tests/test_state_estimators.py | 78 +++++++++++++++++++ 3 files changed, 174 insertions(+), 24 deletions(-) diff --git a/src/prog_algs/state_estimators/kalman_filter.py b/src/prog_algs/state_estimators/kalman_filter.py index 2a228fb4..a78bf123 100644 --- a/src/prog_algs/state_estimators/kalman_filter.py +++ b/src/prog_algs/state_estimators/kalman_filter.py @@ -45,16 +45,47 @@ def __init__(self, model, x0, measurement_eqn = None, **kwargs): self.x0 = x0 - if 'Q' not in self.parameters: - self.parameters['Q'] = np.diag([1.0e-3 for i in x0.keys()]) - if 'R' not in self.parameters: - # Size of what's being measured (not output) - # This is determined by running the measure function on the first state - self.parameters['R'] = np.diag([1.0e-3 for i in range(model.n_outputs)]) - - num_states = len(x0.keys()) + num_states = model.n_states num_inputs = model.n_inputs + 1 num_measurements = model.n_outputs + + # Process Noise (Q) + # Users can use process_noise (like in prog_models) or Q (like in filterpy). They're synced. + if 'Q' in self.parameters: + self.parameters['process_noise'] = self.parameters['Q'] + + if 'process_noise' not in self.parameters: + if 'process_noise' in model.parameters: + self.parameters['process_noise'] = np.diag([model.parameters['process_noise'][key] for key in x0.keys()]) + else: + self.parameters['process_noise'] = np.diag([1.0e-3 for _ in range(num_states)]) + else: + # Manage type + if isinstance(self.parameters['process_noise'], list): + self.parameters['process_noise'] = np.array(self.parameters['process_noise']) + + # Check size + if self.parameters['process_noise'].shape != (num_states, num_states): + raise Exception('process_noise must be a square matrix with size equal to the number of states') + + # Measurement Noise (R) + # Users can use measurement_noise (like in prog_models) or R (like in filterpy). They're synced. + if 'R' in self.parameters: + self.parameters['measurement_noise'] = self.parameters['R'] + if 'measurement_noise' not in self.parameters: + if 'measurement_noise' in model.parameters: + self.parameters['measurement_noise'] = np.diag([model.parameters['measurement_noise'][key] for key in model.outputs]) + else: + self.parameters['measurement_noise'] = np.diag([1.0e-3 for _ in range(num_measurements)]) + else: + # Manage type + if isinstance(self.parameters['measurement_noise'], list): + self.parameters['measurement_noise'] = np.array(self.parameters['measurement_noise']) + + # Check size + if self.parameters['measurement_noise'].shape != (num_measurements, num_measurements): + raise Exception('measurement_noise must be a square matrix with size equal to the number of outputs') + F = deepcopy(model.A) B = deepcopy(model.B) if np.size(B) == 0: @@ -67,9 +98,9 @@ def __init__(self, model, x0, measurement_eqn = None, **kwargs): self.filter = kalman.KalmanFilter(num_states, num_measurements, num_inputs) self.filter.x = np.array([[x0[key]] for key in model.states]) - self.filter.P = self.parameters['Q'] / 10 - self.filter.Q = self.parameters['Q'] - self.filter.R = self.parameters['R'] + self.filter.P = self.parameters['process_noise'] / 10 + self.filter.Q = self.parameters['process_noise'] + self.filter.R = self.parameters['measurement_noise'] self.filter.F = F self.filter.B = B diff --git a/src/prog_algs/state_estimators/unscented_kalman_filter.py b/src/prog_algs/state_estimators/unscented_kalman_filter.py index c4925aaa..df243132 100644 --- a/src/prog_algs/state_estimators/unscented_kalman_filter.py +++ b/src/prog_algs/state_estimators/unscented_kalman_filter.py @@ -53,30 +53,71 @@ def measure(x): z = measurement_eqn(x) return array(list(z.values())).ravel() - if 'Q' not in self.parameters: - self.parameters['Q'] = diag([1.0e-3 for i in x0.keys()]) - if 'R' not in self.parameters: - # Size of what's being measured (not output) - # This is determined by running the measure function on the first state - self.parameters['R'] = diag([1.0e-3 for i in range(len(measure(x0.values())))]) + num_states = model.n_states + + # Process Noise (Q) + # Users can use process_noise (like in prog_models) or Q (like in filterpy). They're synced. + if 'Q' in self.parameters: + self.parameters['process_noise'] = self.parameters['Q'] + + if 'process_noise' not in self.parameters: + # Not provided + if 'process_noise' in model.parameters: + # If model has process noise, use it + self.parameters['process_noise'] = diag([model.parameters['process_noise'][key] for key in x0.keys()]) + else: + self.parameters['process_noise'] = diag([1.0e-3 for _ in range(num_states)]) + else: + # If process noise is provided + + # Manage type + if isinstance(self.parameters['process_noise'], list): + self.parameters['process_noise'] = array(self.parameters['process_noise']) + + # Check dims + if self.parameters['process_noise'].shape != (num_states, num_states): + raise Exception('process_noise must be a square matrix with size equal to the number of states') + + # Measurement Noise (R) + # Users can use measurement_noise (like in prog_models) or R (like in filterpy). They're synced. + if 'R' in self.parameters: + self.parameters['measurement_noise'] = self.parameters['R'] + if 'measurement_noise' not in self.parameters: + # Not provided + if 'measurement_noise' in model.parameters and measurement_eqn is None: + # Pull from model noise (doesn't work when measurement equation is provided) + self.parameters['measurement_noise'] = diag([model.parameters['measurement_noise'][key] for key in model.outputs]) + else: + # Default to 1.0e-3 standard deviation on every output + model.parameters['measurement_noise'] = 0 + num_measurements = len(measure(x0.values())) + self.parameters['measurement_noise'] = diag([1.0e-3 for _ in range(num_measurements)]) + else: + # Manage type + if isinstance(self.parameters['measurement_noise'], list): + self.parameters['measurement_noise'] = array(self.parameters['measurement_noise']) + + # Check dims + num_measurements = len(measure(x0.values())) + if self.parameters['measurement_noise'].shape != (num_measurements, num_measurements): + raise Exception('measurement_noise must be a square matrix with size equal to the number of outputs') + + num_measurements = model.n_outputs def state_transition(x, dt): x = {key: value for (key, value) in zip(x0.keys(), x)} Q_err = model.parameters['process_noise'].copy() model.parameters['process_noise'] = dict.fromkeys(Q_err, 0) - z = model.output(x) - model.parameters['process_noise'] = Q_err x = model.next_state(x, self.__input, dt) + model.parameters['process_noise'] = Q_err return array(list(x.values())).ravel() - num_states = len(x0.keys()) - num_measurements = model.n_outputs points = kalman.MerweScaledSigmaPoints(num_states, alpha=self.parameters['alpha'], beta=self.parameters['beta'], kappa=self.parameters['kappa']) self.filter = kalman.UnscentedKalmanFilter(num_states, num_measurements, self.parameters['dt'], measure, state_transition, points) self.filter.x = array(list(x0.values())).ravel() - self.filter.P = self.parameters['Q'] / 10 - self.filter.Q = self.parameters['Q'] - self.filter.R = self.parameters['R'] + self.filter.P = self.parameters['process_noise'] / 10 + self.filter.Q = self.parameters['process_noise'] + self.filter.R = self.parameters['measurement_noise'] def estimate(self, t, u, z): """ diff --git a/tests/test_state_estimators.py b/tests/test_state_estimators.py index d849961e..0de4c51e 100644 --- a/tests/test_state_estimators.py +++ b/tests/test_state_estimators.py @@ -87,6 +87,45 @@ def test_UKF(self): from prog_algs.state_estimators import UnscentedKalmanFilter from prog_models.models import ThrownObject self.__test_state_est(UnscentedKalmanFilter, ThrownObject) + + # Check default + m = ThrownObject() + kf = UnscentedKalmanFilter(m, m.initialize()) + self.assertTrue((kf.filter.Q == np.diag([m.parameters['process_noise'][key] for key in m.states])).all()) + self.assertTrue((kf.filter.R == np.diag([m.parameters['measurement_noise'][key] for key in m.outputs])).all()) + + # Without model parameters + del m.parameters['process_noise'] + del m.parameters['measurement_noise'] + kf = UnscentedKalmanFilter(m, m.initialize()) + self.assertTrue((kf.filter.Q == np.diag([0.001 for _ in m.states])).all()) + self.assertTrue((kf.filter.R == np.diag([0.001 for _ in m.outputs])).all()) + + # Specifying Explicitly + Q = np.array([[0.1, 1e-3], [1e-3, 0.1]]) + R = np.array([[0.2]]) + kf = UnscentedKalmanFilter(m, m.initialize(), Q = Q, R = R) + self.assertTrue((kf.filter.Q == Q).all()) + self.assertTrue((kf.filter.R == R).all()) + + # Using Lists + Q = [[0.1, 1e-3], [1e-3, 0.1]] + R = [[0.2]] + kf = UnscentedKalmanFilter(m, m.initialize(), Q = Q, R = R) + self.assertTrue((kf.filter.Q == Q).all()) + self.assertTrue((kf.filter.R == R).all()) + + # Wrong size - Q + Q = [[0.1, 1e-3, 1e-4], [1e-3, 0.1, 1e-4], [1e-4, 1e-3, 0.1]] + R = [[0.2]] + with self.assertRaises(Exception): + kf = UnscentedKalmanFilter(m, m.initialize(), Q = Q, R = R) + + # Wrong size - R + Q = [[0.1, 1e-3], [1e-3, 0.1]] + R = [[0.2, 0.3]] + with self.assertRaises(Exception): + kf = UnscentedKalmanFilter(m, m.initialize(), Q = Q, R = R) def __incorrect_input_tests(self, filter): class IncompleteModel: @@ -287,6 +326,45 @@ def event_state(self, x): # Missing states KalmanFilter(ThrownObject, {}) + # Check default + m = ThrownObject() + kf = KalmanFilter(m, m.initialize()) + self.assertTrue((kf.filter.Q == np.diag([m.parameters['process_noise'][key] for key in m.states])).all()) + self.assertTrue((kf.filter.R == np.diag([m.parameters['measurement_noise'][key] for key in m.outputs])).all()) + + # Without model parameters + del m.parameters['process_noise'] + del m.parameters['measurement_noise'] + kf = KalmanFilter(m, m.initialize()) + self.assertTrue((kf.filter.Q == np.diag([0.001 for _ in m.states])).all()) + self.assertTrue((kf.filter.R == np.diag([0.001 for _ in m.outputs])).all()) + + # Specifying Explicitly + Q = np.array([[0.1, 1e-3], [1e-3, 0.1]]) + R = np.array([[0.2]]) + kf = KalmanFilter(m, m.initialize(), Q = Q, R = R) + self.assertTrue((kf.filter.Q == Q).all()) + self.assertTrue((kf.filter.R == R).all()) + + # Using Lists + Q = [[0.1, 1e-3], [1e-3, 0.1]] + R = [[0.2]] + kf = KalmanFilter(m, m.initialize(), Q = Q, R = R) + self.assertTrue((kf.filter.Q == Q).all()) + self.assertTrue((kf.filter.R == R).all()) + + # Wrong size - Q + Q = [[0.1, 1e-3, 1e-4], [1e-3, 0.1, 1e-4], [1e-4, 1e-3, 0.1]] + R = [[0.2]] + with self.assertRaises(Exception): + kf = KalmanFilter(m, m.initialize(), Q = Q, R = R) + + # Wrong size - R + Q = [[0.1, 1e-3], [1e-3, 0.1]] + R = [[0.2, 0.3]] + with self.assertRaises(Exception): + kf = KalmanFilter(m, m.initialize(), Q = Q, R = R) + # This allows the module to be executed directly def run_tests(): # This ensures that the directory containing StateEstimatorTemplate is in the python search directory From aa244bbabbdaf47ea8532f973dd2f872f267ca08 Mon Sep 17 00:00:00 2001 From: Christopher Teubert Date: Wed, 16 Mar 2022 12:43:18 -0700 Subject: [PATCH 2/2] added comments --- examples/kalman_filter.py | 3 +++ src/prog_algs/state_estimators/kalman_filter.py | 10 +++++----- .../state_estimators/unscented_kalman_filter.py | 13 ++++++------- 3 files changed, 14 insertions(+), 12 deletions(-) diff --git a/examples/kalman_filter.py b/examples/kalman_filter.py index eab72e2d..fe77d089 100644 --- a/examples/kalman_filter.py +++ b/examples/kalman_filter.py @@ -104,6 +104,9 @@ def run_example(): # Define the initial state to be slightly off of actual x_guess = {'x': 1.75, 'v': 35} # Guess of initial state, actual is {'x': 1.83, 'v': 40} kf = KalmanFilter(m, x_guess) + # You can also change the process and measurement noise for the filter by specifying them in the constructor. + # This is used for the case where you want to apply different noise in the filter + # By default, they are set to the model noise. # Step 3: Run the Kalman Filter State Estimator # Here we're using simulated data from the thrown_object. In a real application you would be using sensor data from the system diff --git a/src/prog_algs/state_estimators/kalman_filter.py b/src/prog_algs/state_estimators/kalman_filter.py index a78bf123..164f5c9e 100644 --- a/src/prog_algs/state_estimators/kalman_filter.py +++ b/src/prog_algs/state_estimators/kalman_filter.py @@ -22,10 +22,10 @@ class KalmanFilter(state_estimator.StateEstimator): Starting time (s) dt : float time step (s) - Q : List[List[float]] - Process Noise Matrix - R : List[List[float]] - Measurement Noise Matrix + process_noise : 2darray[float] + Process Noise Matrix (n_states, n_states) - default model.process_noise + measurement_noise : 2darray[float] + Measurement Noise Matrix (n_measurements x n_measurements) - default model.measurement_noise Note: The Kalman Filter does not support a custom measurement function @@ -72,7 +72,7 @@ def __init__(self, model, x0, measurement_eqn = None, **kwargs): # Users can use measurement_noise (like in prog_models) or R (like in filterpy). They're synced. if 'R' in self.parameters: self.parameters['measurement_noise'] = self.parameters['R'] - if 'measurement_noise' not in self.parameters: + elif 'measurement_noise' not in self.parameters: if 'measurement_noise' in model.parameters: self.parameters['measurement_noise'] = np.diag([model.parameters['measurement_noise'][key] for key in model.outputs]) else: diff --git a/src/prog_algs/state_estimators/unscented_kalman_filter.py b/src/prog_algs/state_estimators/unscented_kalman_filter.py index df243132..16bfae0b 100644 --- a/src/prog_algs/state_estimators/unscented_kalman_filter.py +++ b/src/prog_algs/state_estimators/unscented_kalman_filter.py @@ -20,10 +20,10 @@ class UnscentedKalmanFilter(state_estimator.StateEstimator): Starting time (s) dt : float time step (s) - Q : List[List[float]] - Process Noise Matrix - R : List[List[float]] - Measurement Noise Matrix + process_noise : 2darray[float] + Process Noise Matrix (n_states, n_states) - default model.process_noise + measurement_noise : 2darray[float] + Measurement Noise Matrix (n_measurements x n_measurements) - default model.measurement_noise """ default_parameters = { 'alpha': 1, @@ -59,8 +59,7 @@ def measure(x): # Users can use process_noise (like in prog_models) or Q (like in filterpy). They're synced. if 'Q' in self.parameters: self.parameters['process_noise'] = self.parameters['Q'] - - if 'process_noise' not in self.parameters: + elif 'process_noise' not in self.parameters: # Not provided if 'process_noise' in model.parameters: # If model has process noise, use it @@ -82,7 +81,7 @@ def measure(x): # Users can use measurement_noise (like in prog_models) or R (like in filterpy). They're synced. if 'R' in self.parameters: self.parameters['measurement_noise'] = self.parameters['R'] - if 'measurement_noise' not in self.parameters: + elif 'measurement_noise' not in self.parameters: # Not provided if 'measurement_noise' in model.parameters and measurement_eqn is None: # Pull from model noise (doesn't work when measurement equation is provided)