From 8f506de1d7ad87b510a68f7e9fa6b4c459a5e08b Mon Sep 17 00:00:00 2001 From: Alex Dowling Date: Mon, 22 Jun 2026 11:55:54 -0400 Subject: [PATCH] Bug fix: SSE_weighted should support heterogenous measurement errors --- pyomo/contrib/parmest/parmest.py | 32 +++++++++++--- pyomo/contrib/parmest/tests/test_parmest.py | 46 +++++++++++++++++++++ 2 files changed, 73 insertions(+), 5 deletions(-) diff --git a/pyomo/contrib/parmest/parmest.py b/pyomo/contrib/parmest/parmest.py index 6e6d040b792..af79bb8a19e 100644 --- a/pyomo/contrib/parmest/parmest.py +++ b/pyomo/contrib/parmest/parmest.py @@ -447,7 +447,13 @@ def validate_experiment_outputs(output_vars): ), "Experiment outputs must share the same indices or data points" -def _count_total_experiments(experiment_list): +def _all_measurement_errors_known(model): + return hasattr(model, "measurement_error") and all( + model.measurement_error[y_hat] is not None for y_hat in model.experiment_outputs + ) + + +def _count_total_experiments(experiment_list, require_uniform_output_grid=True): """ Counts the number of data points in the list of experiments @@ -474,6 +480,10 @@ def _count_total_experiments(experiment_list): experiment_list : list List of Experiment class objects containing the Pyomo model for the different experimental conditions + require_uniform_output_grid : bool, optional + If True, validate that all output families within an experiment share + the same number of indices and the same indices before counting data + points. Default is True. Returns ------- @@ -488,7 +498,8 @@ def _count_total_experiments(experiment_list): output_vars = model.experiment_outputs # check if the experiment outputs are defined correctly - validate_experiment_outputs(output_vars) + if require_uniform_output_grid: + validate_experiment_outputs(output_vars) # store the indices of the experiment outputs indices = [] @@ -1074,9 +1085,6 @@ def __init__( assert isinstance(experiment_list, list) self.exp_list = experiment_list - # get the number of experiments - self.number_exp = _count_total_experiments(self.exp_list) - # check if the experiment has a ``get_labeled_model`` function model = _get_labeled_model(self.exp_list[0]) @@ -1095,6 +1103,20 @@ def __init__( else: self.obj_function = obj_function + allow_heterogeneous_output_grids = ( + self.obj_function == ObjectiveType.SSE_weighted + and all( + _all_measurement_errors_known(_get_labeled_model(experiment)) + for experiment in self.exp_list + ) + ) + + # get the number of experiments + self.number_exp = _count_total_experiments( + self.exp_list, + require_uniform_output_grid=not allow_heterogeneous_output_grids, + ) + if isinstance(regularization, str): try: self.regularization = RegularizationType(regularization) diff --git a/pyomo/contrib/parmest/tests/test_parmest.py b/pyomo/contrib/parmest/tests/test_parmest.py index f44331a86ac..63cd1694ead 100644 --- a/pyomo/contrib/parmest/tests/test_parmest.py +++ b/pyomo/contrib/parmest/tests/test_parmest.py @@ -2173,6 +2173,42 @@ def get_labeled_model(self): return self.model +class HeterogeneousOutputExperiment(Experiment): + def get_labeled_model(self): + m = pyo.ConcreteModel() + + # Two measured output families intentionally use different time grids. + m.t1 = pyo.Set(initialize=[0.0, 1.0, 2.0]) + m.t2 = pyo.Set(initialize=[0.0, 2.0]) + + m.y1 = pyo.Var(m.t1, initialize=1.0) + m.y2 = pyo.Var(m.t2, initialize=2.0) + + m.theta = pyo.Var(initialize=1.0) + m.theta.fix(1.0) + + m.experiment_outputs = pyo.Suffix(direction=pyo.Suffix.LOCAL) + m.measurement_error = pyo.Suffix(direction=pyo.Suffix.LOCAL) + m.unknown_parameters = pyo.Suffix(direction=pyo.Suffix.LOCAL) + + y1_data = {0.0: 1.1, 1.0: 1.2, 2.0: 1.3} + y2_data = {0.0: 2.1, 2.0: 2.3} + + for t in m.t1: + m.experiment_outputs[m.y1[t]] = y1_data[t] + m.measurement_error[m.y1[t]] = 0.1 + + for t in m.t2: + m.experiment_outputs[m.y2[t]] = y2_data[t] + m.measurement_error[m.y2[t]] = 0.2 + + # All measurement errors are supplied, so SSE_weighted should accept + # these heterogeneous output grids. + m.unknown_parameters[m.theta] = pyo.ComponentUID(m.theta) + + return m + + def _build_estimator(data, include_second_output=False): exp_list = [ LinearThetaExperiment(x=x, y=y, include_second_output=include_second_output) @@ -2442,6 +2478,16 @@ def test_count_total_experiments_rejects_time_not_in_first_index(self): ): parmest._count_total_experiments(exp_list) + def test_estimator_accepts_sse_weighted_heterogeneous_outputs(self): + # Regression test for heterogeneous experiment outputs with complete + # measurement_error data under the SSE_weighted objective. + pest = parmest.Estimator( + [HeterogeneousOutputExperiment()], obj_function="SSE_weighted" + ) + + # Constructor success is the behavior under test. + self.assertIsInstance(pest, parmest.Estimator) + ########################### # tests for deprecated UI #