Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 27 additions & 5 deletions pyomo/contrib/parmest/parmest.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,13 @@ def validate_experiment_outputs(output_vars):
), "Experiment outputs must share the same indices or data points"


def _count_total_experiments(experiment_list):
def _all_measurement_errors_known(model):
return hasattr(model, "measurement_error") and all(
model.measurement_error[y_hat] is not None for y_hat in model.experiment_outputs
)


def _count_total_experiments(experiment_list, require_uniform_output_grid=True):
"""
Counts the number of data points in the list of experiments

Expand All @@ -474,6 +480,10 @@ def _count_total_experiments(experiment_list):
experiment_list : list
List of Experiment class objects containing the Pyomo model
for the different experimental conditions
require_uniform_output_grid : bool, optional
If True, validate that all output families within an experiment share
the same number of indices and the same indices before counting data
points. Default is True.

Returns
-------
Expand All @@ -488,7 +498,8 @@ def _count_total_experiments(experiment_list):
output_vars = model.experiment_outputs

# check if the experiment outputs are defined correctly
validate_experiment_outputs(output_vars)
if require_uniform_output_grid:
validate_experiment_outputs(output_vars)

# store the indices of the experiment outputs
indices = []
Expand Down Expand Up @@ -1074,9 +1085,6 @@ def __init__(
assert isinstance(experiment_list, list)
self.exp_list = experiment_list

# get the number of experiments
self.number_exp = _count_total_experiments(self.exp_list)

# check if the experiment has a ``get_labeled_model`` function
model = _get_labeled_model(self.exp_list[0])

Expand All @@ -1095,6 +1103,20 @@ def __init__(
else:
self.obj_function = obj_function

allow_heterogeneous_output_grids = (
self.obj_function == ObjectiveType.SSE_weighted
and all(
_all_measurement_errors_known(_get_labeled_model(experiment))
for experiment in self.exp_list
)
)

# get the number of experiments
self.number_exp = _count_total_experiments(
self.exp_list,
require_uniform_output_grid=not allow_heterogeneous_output_grids,
)

if isinstance(regularization, str):
try:
self.regularization = RegularizationType(regularization)
Expand Down
46 changes: 46 additions & 0 deletions pyomo/contrib/parmest/tests/test_parmest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2173,6 +2173,42 @@ def get_labeled_model(self):
return self.model


class HeterogeneousOutputExperiment(Experiment):
def get_labeled_model(self):
m = pyo.ConcreteModel()

# Two measured output families intentionally use different time grids.
m.t1 = pyo.Set(initialize=[0.0, 1.0, 2.0])
m.t2 = pyo.Set(initialize=[0.0, 2.0])

m.y1 = pyo.Var(m.t1, initialize=1.0)
m.y2 = pyo.Var(m.t2, initialize=2.0)

m.theta = pyo.Var(initialize=1.0)
m.theta.fix(1.0)

m.experiment_outputs = pyo.Suffix(direction=pyo.Suffix.LOCAL)
m.measurement_error = pyo.Suffix(direction=pyo.Suffix.LOCAL)
m.unknown_parameters = pyo.Suffix(direction=pyo.Suffix.LOCAL)

y1_data = {0.0: 1.1, 1.0: 1.2, 2.0: 1.3}
y2_data = {0.0: 2.1, 2.0: 2.3}

for t in m.t1:
m.experiment_outputs[m.y1[t]] = y1_data[t]
m.measurement_error[m.y1[t]] = 0.1

for t in m.t2:
m.experiment_outputs[m.y2[t]] = y2_data[t]
m.measurement_error[m.y2[t]] = 0.2

# All measurement errors are supplied, so SSE_weighted should accept
# these heterogeneous output grids.
m.unknown_parameters[m.theta] = pyo.ComponentUID(m.theta)

return m


def _build_estimator(data, include_second_output=False):
exp_list = [
LinearThetaExperiment(x=x, y=y, include_second_output=include_second_output)
Expand Down Expand Up @@ -2442,6 +2478,16 @@ def test_count_total_experiments_rejects_time_not_in_first_index(self):
):
parmest._count_total_experiments(exp_list)

def test_estimator_accepts_sse_weighted_heterogeneous_outputs(self):
# Regression test for heterogeneous experiment outputs with complete
# measurement_error data under the SSE_weighted objective.
pest = parmest.Estimator(
[HeterogeneousOutputExperiment()], obj_function="SSE_weighted"
)

# Constructor success is the behavior under test.
self.assertIsInstance(pest, parmest.Estimator)


###########################
# tests for deprecated UI #
Expand Down