diff --git a/examples/kalman_filter.py b/examples/kalman_filter.py index eab72e2d..fe77d089 100644 --- a/examples/kalman_filter.py +++ b/examples/kalman_filter.py @@ -104,6 +104,9 @@ def run_example(): # Define the initial state to be slightly off of actual x_guess = {'x': 1.75, 'v': 35} # Guess of initial state, actual is {'x': 1.83, 'v': 40} kf = KalmanFilter(m, x_guess) + # You can also change the process and measurement noise for the filter by specifying them in the constructor. + # This is used for the case where you want to apply different noise in the filter + # By default, they are set to the model noise. # Step 3: Run the Kalman Filter State Estimator # Here we're using simulated data from the thrown_object. In a real application you would be using sensor data from the system diff --git a/src/prog_algs/state_estimators/kalman_filter.py b/src/prog_algs/state_estimators/kalman_filter.py index ce2a2969..d0289668 100644 --- a/src/prog_algs/state_estimators/kalman_filter.py +++ b/src/prog_algs/state_estimators/kalman_filter.py @@ -24,10 +24,10 @@ class KalmanFilter(state_estimator.StateEstimator): Starting time (s) dt : float time step (s) - Q : List[List[float]] - Process Noise Matrix - R : List[List[float]] - Measurement Noise Matrix + process_noise : 2darray[float] + Process Noise Matrix (n_states, n_states) - default model.process_noise + measurement_noise : 2darray[float] + Measurement Noise Matrix (n_measurements x n_measurements) - default model.measurement_noise Note: The Kalman Filter does not support a custom measurement function @@ -46,7 +46,7 @@ def __init__(self, model, x0, measurement_eqn : Callable = None, **kwargs): super().__init__(model, x0, None, **kwargs) self.x0 = x0 - + if 'Q' not in self.parameters: self.parameters['Q'] = np.diag([1.0e-3 for i in x0.keys()]) if 'R' not in self.parameters: @@ -54,9 +54,47 @@ def __init__(self, model, x0, measurement_eqn : Callable = None, **kwargs): # This is determined by running the measure function on the first state self.parameters['R'] = np.diag([1.0e-3 for i in range(model.n_outputs)]) - num_states = len(x0.keys()) + num_states = model.n_states num_inputs = model.n_inputs + 1 num_measurements = model.n_outputs + + # Process Noise (Q) + # Users can use process_noise (like in prog_models) or Q (like in filterpy). They're synced. + if 'Q' in self.parameters: + self.parameters['process_noise'] = self.parameters['Q'] + + if 'process_noise' not in self.parameters: + if 'process_noise' in model.parameters: + self.parameters['process_noise'] = np.diag([model.parameters['process_noise'][key] for key in x0.keys()]) + else: + self.parameters['process_noise'] = np.diag([1.0e-3 for _ in range(num_states)]) + else: + # Manage type + if isinstance(self.parameters['process_noise'], list): + self.parameters['process_noise'] = np.array(self.parameters['process_noise']) + + # Check size + if self.parameters['process_noise'].shape != (num_states, num_states): + raise Exception('process_noise must be a square matrix with size equal to the number of states') + + # Measurement Noise (R) + # Users can use measurement_noise (like in prog_models) or R (like in filterpy). They're synced. + if 'R' in self.parameters: + self.parameters['measurement_noise'] = self.parameters['R'] + elif 'measurement_noise' not in self.parameters: + if 'measurement_noise' in model.parameters: + self.parameters['measurement_noise'] = np.diag([model.parameters['measurement_noise'][key] for key in model.outputs]) + else: + self.parameters['measurement_noise'] = np.diag([1.0e-3 for _ in range(num_measurements)]) + else: + # Manage type + if isinstance(self.parameters['measurement_noise'], list): + self.parameters['measurement_noise'] = np.array(self.parameters['measurement_noise']) + + # Check size + if self.parameters['measurement_noise'].shape != (num_measurements, num_measurements): + raise Exception('measurement_noise must be a square matrix with size equal to the number of outputs') + F = deepcopy(model.A) B = deepcopy(model.B) if np.size(B) == 0: diff --git a/src/prog_algs/state_estimators/unscented_kalman_filter.py b/src/prog_algs/state_estimators/unscented_kalman_filter.py index 59fdfe72..ea57fe1f 100644 --- a/src/prog_algs/state_estimators/unscented_kalman_filter.py +++ b/src/prog_algs/state_estimators/unscented_kalman_filter.py @@ -22,10 +22,10 @@ class UnscentedKalmanFilter(state_estimator.StateEstimator): Starting time (s) dt : float time step (s) - Q : List[List[float]] - Process Noise Matrix - R : List[List[float]] - Measurement Noise Matrix + process_noise : 2darray[float] + Process Noise Matrix (n_states, n_states) - default model.process_noise + measurement_noise : 2darray[float] + Measurement Noise Matrix (n_measurements x n_measurements) - default model.measurement_noise """ default_parameters = { 'alpha': 1, @@ -56,20 +56,64 @@ def measure(x): z = measurement_eqn(x) return array(list(z.values())).ravel() - if 'Q' not in self.parameters: - self.parameters['Q'] = diag([1.0e-3 for i in x0.keys()]) + num_states = model.n_states + + # Process Noise (Q) + # Users can use process_noise (like in prog_models) or Q (like in filterpy). They're synced. + if 'Q' in self.parameters: + self.parameters['process_noise'] = self.parameters['Q'] + elif 'process_noise' not in self.parameters: + # Not provided + if 'process_noise' in model.parameters: + # If model has process noise, use it + self.parameters['process_noise'] = diag([model.parameters['process_noise'][key] for key in x0.keys()]) + else: + self.parameters['process_noise'] = diag([1.0e-3 for _ in range(num_states)]) + else: + # If process noise is provided + + # Manage type + if isinstance(self.parameters['process_noise'], list): + self.parameters['process_noise'] = array(self.parameters['process_noise']) + + # Check dims + if self.parameters['process_noise'].shape != (num_states, num_states): + raise Exception('process_noise must be a square matrix with size equal to the number of states') + + # Measurement Noise (R) + # Users can use measurement_noise (like in prog_models) or R (like in filterpy). They're synced. + if 'R' in self.parameters: + self.parameters['measurement_noise'] = self.parameters['R'] + elif 'measurement_noise' not in self.parameters: + # Not provided + if 'measurement_noise' in model.parameters and measurement_eqn is None: + # Pull from model noise (doesn't work when measurement equation is provided) + self.parameters['measurement_noise'] = diag([model.parameters['measurement_noise'][key] for key in model.outputs]) + else: + # Default to 1.0e-3 standard deviation on every output + model.parameters['measurement_noise'] = 0 + num_measurements = len(measure(x0.values())) + self.parameters['measurement_noise'] = diag([1.0e-3 for _ in range(num_measurements)]) + else: + # Manage type + if isinstance(self.parameters['measurement_noise'], list): + self.parameters['measurement_noise'] = array(self.parameters['measurement_noise']) + + # Check dims + num_measurements = len(measure(x0.values())) + if self.parameters['measurement_noise'].shape != (num_measurements, num_measurements): + raise Exception('measurement_noise must be a square matrix with size equal to the number of outputs') + + num_measurements = model.n_outputs def state_transition(x, dt): x = {key: value for (key, value) in zip(x0.keys(), x)} Q_err = model.parameters['process_noise'].copy() model.parameters['process_noise'] = dict.fromkeys(Q_err, 0) - z = model.output(x) - model.parameters['process_noise'] = Q_err x = model.next_state(x, self.__input, dt) + model.parameters['process_noise'] = Q_err return array(list(x.values())).ravel() - num_states = len(x0.keys()) - num_measurements = model.n_outputs points = kalman.MerweScaledSigmaPoints(num_states, alpha=self.parameters['alpha'], beta=self.parameters['beta'], kappa=self.parameters['kappa']) self.filter = kalman.UnscentedKalmanFilter(num_states, num_measurements, self.parameters['dt'], measure, state_transition, points) diff --git a/tests/test_state_estimators.py b/tests/test_state_estimators.py index 8319a1f5..a82ec11b 100644 --- a/tests/test_state_estimators.py +++ b/tests/test_state_estimators.py @@ -86,6 +86,47 @@ def __test_state_est(self, filt, m): def test_UKF(self): from prog_models.models import ThrownObject + self.__test_state_est(UnscentedKalmanFilter, ThrownObject) + + # Check default + m = ThrownObject() + kf = UnscentedKalmanFilter(m, m.initialize()) + self.assertTrue((kf.filter.Q == np.diag([m.parameters['process_noise'][key] for key in m.states])).all()) + self.assertTrue((kf.filter.R == np.diag([m.parameters['measurement_noise'][key] for key in m.outputs])).all()) + + # Without model parameters + del m.parameters['process_noise'] + del m.parameters['measurement_noise'] + kf = UnscentedKalmanFilter(m, m.initialize()) + self.assertTrue((kf.filter.Q == np.diag([0.001 for _ in m.states])).all()) + self.assertTrue((kf.filter.R == np.diag([0.001 for _ in m.outputs])).all()) + + # Specifying Explicitly + Q = np.array([[0.1, 1e-3], [1e-3, 0.1]]) + R = np.array([[0.2]]) + kf = UnscentedKalmanFilter(m, m.initialize(), Q = Q, R = R) + self.assertTrue((kf.filter.Q == Q).all()) + self.assertTrue((kf.filter.R == R).all()) + + # Using Lists + Q = [[0.1, 1e-3], [1e-3, 0.1]] + R = [[0.2]] + kf = UnscentedKalmanFilter(m, m.initialize(), Q = Q, R = R) + self.assertTrue((kf.filter.Q == Q).all()) + self.assertTrue((kf.filter.R == R).all()) + + # Wrong size - Q + Q = [[0.1, 1e-3, 1e-4], [1e-3, 0.1, 1e-4], [1e-4, 1e-3, 0.1]] + R = [[0.2]] + with self.assertRaises(Exception): + kf = UnscentedKalmanFilter(m, m.initialize(), Q = Q, R = R) + + # Wrong size - R + Q = [[0.1, 1e-3], [1e-3, 0.1]] + R = [[0.2, 0.3]] + with self.assertRaises(Exception): + kf = UnscentedKalmanFilter(m, m.initialize(), Q = Q, R = R) + from prog_algs.state_estimators import UnscentedKalmanFilter m = ThrownObject(process_noise=5e-2, measurement_noise=5e-2) @@ -416,6 +457,45 @@ def event_state(self, x): # Missing states KalmanFilter(ThrownObject, {}) + # Check default + m = ThrownObject() + kf = KalmanFilter(m, m.initialize()) + self.assertTrue((kf.filter.Q == np.diag([m.parameters['process_noise'][key] for key in m.states])).all()) + self.assertTrue((kf.filter.R == np.diag([m.parameters['measurement_noise'][key] for key in m.outputs])).all()) + + # Without model parameters + del m.parameters['process_noise'] + del m.parameters['measurement_noise'] + kf = KalmanFilter(m, m.initialize()) + self.assertTrue((kf.filter.Q == np.diag([0.001 for _ in m.states])).all()) + self.assertTrue((kf.filter.R == np.diag([0.001 for _ in m.outputs])).all()) + + # Specifying Explicitly + Q = np.array([[0.1, 1e-3], [1e-3, 0.1]]) + R = np.array([[0.2]]) + kf = KalmanFilter(m, m.initialize(), Q = Q, R = R) + self.assertTrue((kf.filter.Q == Q).all()) + self.assertTrue((kf.filter.R == R).all()) + + # Using Lists + Q = [[0.1, 1e-3], [1e-3, 0.1]] + R = [[0.2]] + kf = KalmanFilter(m, m.initialize(), Q = Q, R = R) + self.assertTrue((kf.filter.Q == Q).all()) + self.assertTrue((kf.filter.R == R).all()) + + # Wrong size - Q + Q = [[0.1, 1e-3, 1e-4], [1e-3, 0.1, 1e-4], [1e-4, 1e-3, 0.1]] + R = [[0.2]] + with self.assertRaises(Exception): + kf = KalmanFilter(m, m.initialize(), Q = Q, R = R) + + # Wrong size - R + Q = [[0.1, 1e-3], [1e-3, 0.1]] + R = [[0.2, 0.3]] + with self.assertRaises(Exception): + kf = KalmanFilter(m, m.initialize(), Q = Q, R = R) + # This allows the module to be executed directly def run_tests(): # This ensures that the directory containing StateEstimatorTemplate is in the python search directory