diff --git a/pyomo/contrib/doe/doe.py b/pyomo/contrib/doe/doe.py index 6be87d58868..db9328044f6 100644 --- a/pyomo/contrib/doe/doe.py +++ b/pyomo/contrib/doe/doe.py @@ -64,6 +64,7 @@ class ObjectiveLib(Enum): determinant = "determinant" trace = "trace" + pseudo_trace = "pseudo_trace" minimum_eigenvalue = "minimum_eigenvalue" condition_number = "condition_number" zero = "zero" @@ -95,6 +96,7 @@ def __init__( grey_box_tee=False, get_labeled_model_args=None, logger_level=logging.WARNING, + improve_cholesky_roundoff_error=False, _Cholesky_option=True, _only_compute_fim_lower=True, ): @@ -125,8 +127,9 @@ def __init__( objective_option: String representation of the objective option. Current available options are: ``determinant`` (for determinant, or D-optimality), - ``trace`` (for trace, or A-optimality), ``minimum_eigenvalue``, (for - E-optimality), or ``condition_number`` (for ME-optimality) + ``trace`` (for trace of covariance matrix, or A-optimality), + ``pseudo_trace`` (for trace of Fisher Information Matrix(FIM), or pseudo A-optimality), + ``minimum_eigenvalue``, (for E-optimality), or ``condition_number`` (for ME-optimality) Note: E-optimality and ME-optimality are only supported when using the grey box objective (i.e., ``grey_box_solver`` is True) default: ``determinant`` @@ -165,6 +168,11 @@ def __init__( get_labeled_model_args: Additional arguments for the ``get_labeled_model`` function on the Experiment object. + improve_cholesky_roundoff_error: + Boolean value of whether or not to improve round-off error. If True, it will + apply M[i,i] >= L[i,j]^2. Where, M is the FIM and L is the lower triangular matrix + from Cholesky factorization. If the round-off error is not significant, this + option can be turned off to improve performance by skipping this constraint. _Cholesky_option: Boolean value of whether or not to use the cholesky factorization to compute the determinant for the D-optimality criteria. This parameter @@ -248,6 +256,9 @@ def __init__( self.Cholesky_option = _Cholesky_option self.only_compute_fim_lower = _only_compute_fim_lower + # To improve round-off error in Cholesky-based objectives + self.improve_cholesky_roundoff_error = improve_cholesky_roundoff_error + # model attribute to avoid rebuilding models self.model = pyo.ConcreteModel() # Build empty model @@ -421,6 +432,21 @@ def run_doe(self, model=None, results_file=None): for j, d in enumerate(model.parameter_names): model.L[c, d].value = L_vals_sq[i, j] + # Initialize the inverse of L if it exists + if hasattr(model, "L_inv"): + L_inv_vals = np.linalg.inv(L_vals_sq) + + for i, c in enumerate(model.parameter_names): + for j, d in enumerate(model.parameter_names): + if i >= j: + model.L_inv[c, d].value = L_inv_vals[i, j] + else: + model.L_inv[c, d].value = 0.0 + # Initialize the cov_trace if it exists + if hasattr(model, "cov_trace"): + initial_cov_trace = np.sum(L_inv_vals**2) + model.cov_trace.value = initial_cov_trace + if hasattr(model, "determinant"): model.determinant.value = np.linalg.det(np.array(self.get_FIM())) @@ -486,7 +512,8 @@ def run_doe(self, model=None, results_file=None): # Saving some stats on the FIM for convenience self.results["Objective expression"] = str(self.objective_option).split(".")[-1] - self.results["log10 A-opt"] = np.log10(np.trace(fim_local)) + self.results["log10 A-opt"] = np.log10(np.trace(np.linalg.inv(fim_local))) + self.results["log10 pseudo A-opt"] = np.log10(np.trace(fim_local)) self.results["log10 D-opt"] = np.log10(np.linalg.det(fim_local)) self.results["log10 E-opt"] = np.log10(min(np.linalg.eig(fim_local)[0])) self.results["FIM Condition Number"] = np.linalg.cond(fim_local) @@ -937,40 +964,77 @@ def initialize_jac(m, i, j): # Initialize the FIM if self.fim_initial is not None: dict_fim_initialize = { - (bu, un): self.fim_initial[i][j] + (bu, un): self.fim_initial[i, j] for i, bu in enumerate(model.parameter_names) for j, un in enumerate(model.parameter_names) } + if self.objective_option == ObjectiveLib.trace: + fim_initial_inv = np.linalg.pinv(self.fim_initial) + dict_fim_inv_initialize = { + (bu, un): fim_initial_inv[i, j] + for i, bu in enumerate(model.parameter_names) + for j, un in enumerate(model.parameter_names) + } def initialize_fim(m, j, d): return dict_fim_initialize[(j, d)] + def initialize_fim_inv(m, j, d): + return dict_fim_inv_initialize[(j, d)] + if self.fim_initial is not None: model.fim = pyo.Var( model.parameter_names, model.parameter_names, initialize=initialize_fim ) + if self.objective_option == ObjectiveLib.trace: + model.fim_inv = pyo.Var( + model.parameter_names, + model.parameter_names, + initialize=initialize_fim_inv, + ) else: model.fim = pyo.Var( model.parameter_names, model.parameter_names, initialize=identity_matrix ) + if self.objective_option == ObjectiveLib.trace: + model.fim_inv = pyo.Var( + model.parameter_names, + model.parameter_names, + initialize=identity_matrix, + ) # To-Do: Look into this functionality..... # if cholesky, define L elements as variables - if self.Cholesky_option and self.objective_option == ObjectiveLib.determinant: + if self.Cholesky_option and self.objective_option in ( + ObjectiveLib.determinant, + ObjectiveLib.trace, + ): model.L = pyo.Var( model.parameter_names, model.parameter_names, initialize=identity_matrix ) + # If trace objective, also need L inverse + if self.objective_option == ObjectiveLib.trace: + model.L_inv = pyo.Var( + model.parameter_names, + model.parameter_names, + initialize=identity_matrix, + ) + # loop over parameter name for i, c in enumerate(model.parameter_names): for j, d in enumerate(model.parameter_names): # fix the 0 half of L matrix to be 0.0 if i < j: model.L[c, d].fix(0.0) + if self.objective_option == ObjectiveLib.trace: + model.L_inv[c, d].fix(0.0) # Give LB to the diagonal entries if self.L_diagonal_lower_bound: if c == d: model.L[c, d].setlb(self.L_diagonal_lower_bound) + if self.objective_option == ObjectiveLib.trace: + model.L_inv[c, d].setlb(self.L_diagonal_lower_bound) # jacobian rule def jacobian_rule(m, n, p): @@ -1083,6 +1147,8 @@ def fim_rule(m, p, q): for ind_q, q in enumerate(model.parameter_names): if ind_p < ind_q: model.fim[p, q].fix(0.0) + if self.objective_option == ObjectiveLib.trace: + model.fim_inv[p, q].fix(0.0) # Create scenario block structure def _generate_scenario_blocks(self, model=None): @@ -1286,6 +1352,7 @@ def create_objective_function(self, model=None): if self.objective_option not in [ ObjectiveLib.determinant, ObjectiveLib.trace, + ObjectiveLib.pseudo_trace, ObjectiveLib.zero, ]: raise DeveloperError( @@ -1315,13 +1382,15 @@ def create_objective_function(self, model=None): ) ### Initialize the Cholesky decomposition matrix - if self.Cholesky_option and self.objective_option == ObjectiveLib.determinant: + if self.Cholesky_option and self.objective_option in ( + ObjectiveLib.determinant, + ObjectiveLib.trace, + ): # Calculate the eigenvalues of the FIM matrix eig = np.linalg.eigvals(fim) # If the smallest eigenvalue is (practically) negative, # add a diagonal matrix to make it positive definite - small_number = 1e-10 if min(eig) < small_number: fim = fim + np.eye(len(model.parameter_names)) * ( small_number - min(eig) @@ -1335,6 +1404,13 @@ def create_objective_function(self, model=None): for j, d in enumerate(model.parameter_names): model.L[c, d].value = L[i, j] + # Compute L inverse for trace objective and initialize + if self.objective_option == ObjectiveLib.trace: + L_inv = np.linalg.inv(L) + for i, c in enumerate(model.parameter_names): + for j, d in enumerate(model.parameter_names): + model.L_inv[c, d].value = L_inv[i, j] + def cholesky_imp(b, c, d): """ Calculate Cholesky L matrix using algebraic constraints @@ -1353,12 +1429,86 @@ def cholesky_imp(b, c, d): # This is the empty half of L above the diagonal return pyo.Constraint.Skip + # If trace objective, need L inverse constraints + if self.Cholesky_option and self.objective_option == ObjectiveLib.trace: + + def cholesky_inv_imp(b, c, d): + """ + Calculate Cholesky L inverse matrix using algebraic constraints + """ + # If the row is greater than or equal to the column, we are in the + # lower triangle region of the L_inv matrix. + # This region is where our equations are well-defined. + m = b.model() + if list(m.parameter_names).index(c) >= list(m.parameter_names).index(d): + return m.fim_inv[c, d] == sum( + m.L_inv[m.parameter_names.at(k + 1), c] + * m.L_inv[m.parameter_names.at(k + 1), d] + for k in range( + list(m.parameter_names).index(c), len(m.parameter_names) + ) + ) + else: + # This is the empty half of L_inv above the diagonal + return pyo.Constraint.Skip + + # If trace objective, need L * L^-1 = Identity matrix constraints + def cholesky_LLinv_imp(b, c, d): + """ + Calculate Cholesky L * L inverse matrix using algebraic constraints + """ + # If the row is greater than or equal to the column, we are in the + # lower triangle region of the L and L_inv matrices. + # This region is where our equations are well-defined. + m = b.model() + param_list = list(m.parameter_names) + idx_c = param_list.index(c) + idx_d = param_list.index(d) + # Do not need to calculate upper triangle entries + if idx_c < idx_d: + return pyo.Constraint.Skip + + target_value = 1 if idx_c == idx_d else 0 + return ( + sum( + m.L[c, m.parameter_names.at(k + 1)] + * m.L_inv[m.parameter_names.at(k + 1), d] + for k in range(len(m.parameter_names)) + ) + == target_value + ) + + # To improve round off error in Cholesky decomposition + if self.improve_cholesky_roundoff_error: + + def cholesky_fim_diag(b, c, d): + """ + M[c,c] >= L[c,d]^2 to improve round off error + """ + m = b.model() + return m.fim[c, c] >= m.L[c, d] ** 2 + + def cholesky_fim_inv_diag(b, c, d): + """ + M_inv[c,c] >= L_inv[c,d]^2 to improve round off error + """ + m = b.model() + return m.fim_inv[c, c] >= m.L_inv[c, d] ** 2 + + def cov_trace_calc(b): + """ + Calculate trace of covariance matrix (inverse of FIM). + Can scale each element with 1000 for performance + """ + m = b.model() + return m.cov_trace == sum(m.fim_inv[j, j] for j in m.parameter_names) + def trace_calc(b): """ Calculate FIM elements. Can scale each element with 1000 for performance """ m = b.model() - return m.trace == sum(m.fim[j, j] for j in m.parameter_names) + return m.fim_trace == sum(m.fim[j, j] for j in m.parameter_names) def determinant_general(b): r"""Calculate determinant. Can be applied to FIM of any size. @@ -1415,12 +1565,45 @@ def determinant_general(b): ) elif self.objective_option == ObjectiveLib.trace: + if not self.Cholesky_option: + raise ValueError( + "objective_option='trace' currently only implemented with ``_Cholesky option=True``." + ) + # if Cholesky and trace, calculating + # the OBJ with trace + model.cov_trace = pyo.Var( + initialize=np.trace(np.linalg.inv(fim)), bounds=(small_number, None) + ) + model.obj_cons.cholesky_cons = pyo.Constraint( + model.parameter_names, model.parameter_names, rule=cholesky_imp + ) + model.obj_cons.cholesky_inv_cons = pyo.Constraint( + model.parameter_names, model.parameter_names, rule=cholesky_inv_imp + ) + model.obj_cons.cholesky_LLinv_cons = pyo.Constraint( + model.parameter_names, model.parameter_names, rule=cholesky_LLinv_imp + ) + if self.improve_cholesky_roundoff_error: + model.obj_cons.cholesky_fim_diag_cons = pyo.Constraint( + model.parameter_names, model.parameter_names, rule=cholesky_fim_diag + ) + model.obj_cons.cholesky_fim_inv_diag_cons = pyo.Constraint( + model.parameter_names, + model.parameter_names, + rule=cholesky_fim_inv_diag, + ) + model.obj_cons.cov_trace_rule = pyo.Constraint(rule=cov_trace_calc) + model.objective = pyo.Objective(expr=model.cov_trace, sense=pyo.minimize) + + elif self.objective_option == ObjectiveLib.pseudo_trace: # if not determinant or Cholesky, calculating # the OBJ with trace - model.trace = pyo.Var(initialize=np.trace(fim), bounds=(small_number, None)) + model.fim_trace = pyo.Var( + initialize=np.trace(fim), bounds=(small_number, None) + ) model.obj_cons.trace_rule = pyo.Constraint(rule=trace_calc) model.objective = pyo.Objective( - expr=pyo.log10(model.trace), sense=pyo.maximize + expr=pyo.log10(model.fim_trace), sense=pyo.maximize ) # TODO: Add warning (should be unreachable) if the user calls @@ -1663,12 +1846,14 @@ def compute_FIM_full_factorial( - keys of model's experiment_inputs - "log10 D-opt": list of log10(D-optimality) - "log10 A-opt": list of log10(A-optimality) + - "log10 pseudo A-opt": list of log10(trace(FIM)) - "log10 E-opt": list of log10(E-optimality) - "log10 ME-opt": list of log10(ME-optimality) - "eigval_min": list of minimum eigenvalues - "eigval_max": list of maximum eigenvalues - "det_FIM": list of determinants - - "trace_FIM": list of traces + - "trace_cov": list of traces of covariance matrix + - "trace_FIM": list of traces of FIM - "solve_time": list of solve times Raises @@ -1718,11 +1903,13 @@ def compute_FIM_full_factorial( { "log10 D-opt": [], "log10 A-opt": [], + "log10 pseudo A-opt": [], "log10 E-opt": [], "log10 ME-opt": [], "eigval_min": [], "eigval_max": [], "det_FIM": [], + "trace_cov": [], "trace_FIM": [], "solve_time": [], } @@ -1785,9 +1972,18 @@ def compute_FIM_full_factorial( FIM = self._computed_FIM - (det_FIM, trace_FIM, E_vals, E_vecs, D_opt, A_opt, E_opt, ME_opt) = ( - compute_FIM_metrics(FIM) - ) + ( + det_FIM, + trace_cov, + trace_FIM, + E_vals, + E_vecs, + D_opt, + A_opt, + pseudo_A_opt, + E_opt, + ME_opt, + ) = compute_FIM_metrics(FIM) # Append the values for each of the experiment inputs for k, v in model.experiment_inputs.items(): @@ -1795,11 +1991,13 @@ def compute_FIM_full_factorial( fim_factorial_results["log10 D-opt"].append(D_opt) fim_factorial_results["log10 A-opt"].append(A_opt) + fim_factorial_results["log10 pseudo A-opt"].append(pseudo_A_opt) fim_factorial_results["log10 E-opt"].append(E_opt) fim_factorial_results["log10 ME-opt"].append(ME_opt) fim_factorial_results["eigval_min"].append(E_vals.min()) fim_factorial_results["eigval_max"].append(E_vals.max()) fim_factorial_results["det_FIM"].append(det_FIM) + fim_factorial_results["trace_cov"].append(trace_cov) fim_factorial_results["trace_FIM"].append(trace_FIM) fim_factorial_results["solve_time"].append(time_set[-1]) @@ -1999,7 +2197,7 @@ def _curve1D( Returns -------- - 4 Figures of 1D sensitivity curves for each criterion + 5 Figures of 1D sensitivity curves for each criterion """ if figure_file_name is not None: show_fig = False @@ -2012,6 +2210,9 @@ def _curve1D( # decide if the results are log scaled if log_scale: y_range_A = np.log10(self.figure_result_data["log10 A-opt"].values.tolist()) + y_range_pseudo_A = np.log10( + self.figure_result_data["log10 pseudo A-opt"].values.tolist() + ) y_range_D = np.log10(self.figure_result_data["log10 D-opt"].values.tolist()) y_range_E = np.log10(self.figure_result_data["log10 E-opt"].values.tolist()) y_range_ME = np.log10( @@ -2019,6 +2220,9 @@ def _curve1D( ) else: y_range_A = self.figure_result_data["log10 A-opt"].values.tolist() + y_range_pseudo_A = self.figure_result_data[ + "log10 pseudo A-opt" + ].values.tolist() y_range_D = self.figure_result_data["log10 D-opt"].values.tolist() y_range_E = self.figure_result_data["log10 E-opt"].values.tolist() y_range_ME = self.figure_result_data["log10 ME-opt"].values.tolist() @@ -2043,6 +2247,28 @@ def _curve1D( plt.pyplot.savefig( pathlib.Path(figure_file_name + "_A_opt.png"), format="png", dpi=450 ) + # Draw pseudo A-optimality + fig = plt.pyplot.figure() + plt.pyplot.rc("axes", titlesize=font_axes) + plt.pyplot.rc("axes", labelsize=font_axes) + plt.pyplot.rc("xtick", labelsize=font_tick) + plt.pyplot.rc("ytick", labelsize=font_tick) + ax = fig.add_subplot(111) + params = {"mathtext.default": "regular"} + # plt.rcParams.update(params) + ax.plot(x_range, y_range_pseudo_A) + ax.scatter(x_range, y_range_pseudo_A) + ax.set_ylabel("$log_{10}$ Trace") + ax.set_xlabel(xlabel_text) + plt.pyplot.title(title_text + ": pseudo A-optimality") + if show_fig: + plt.pyplot.show() + else: + plt.pyplot.savefig( + pathlib.Path(figure_file_name + "_pseudo_A_opt.png"), + format="png", + dpi=450, + ) # Draw D-optimality fig = plt.pyplot.figure() @@ -2153,17 +2379,20 @@ def _heatmap( # extract the design criteria values A_range = self.figure_result_data["log10 A-opt"].values.tolist() + pseudo_A_range = self.figure_result_data["log10 pseudo A-opt"].values.tolist() D_range = self.figure_result_data["log10 D-opt"].values.tolist() E_range = self.figure_result_data["log10 E-opt"].values.tolist() ME_range = self.figure_result_data["log10 ME-opt"].values.tolist() # reshape the design criteria values for heatmaps cri_a = np.asarray(A_range).reshape(len(x_range), len(y_range)) + cri_pseudo_a = np.asarray(pseudo_A_range).reshape(len(x_range), len(y_range)) cri_d = np.asarray(D_range).reshape(len(x_range), len(y_range)) cri_e = np.asarray(E_range).reshape(len(x_range), len(y_range)) cri_e_cond = np.asarray(ME_range).reshape(len(x_range), len(y_range)) self.cri_a = cri_a + self.cri_pseudo_a = cri_pseudo_a self.cri_d = cri_d self.cri_e = cri_e self.cri_e_cond = cri_e_cond @@ -2171,11 +2400,13 @@ def _heatmap( # decide if log scaled if log_scale: hes_a = np.log10(self.cri_a) + hes_pseudo_a = np.log10(self.cri_pseudo_a) hes_e = np.log10(self.cri_e) hes_d = np.log10(self.cri_d) hes_e2 = np.log10(self.cri_e_cond) else: hes_a = self.cri_a + hes_pseudo_a = self.cri_pseudo_a hes_e = self.cri_e hes_d = self.cri_d hes_e2 = self.cri_e_cond @@ -2201,7 +2432,7 @@ def _heatmap( ax.set_xlabel(xlabel_text) im = ax.imshow(hes_a.T, cmap=plt.pyplot.cm.hot_r) ba = plt.pyplot.colorbar(im) - ba.set_label("log10(trace(FIM))") + ba.set_label("log10(trace(cov))") plt.pyplot.title(title_text + ": A-optimality") if show_fig: plt.pyplot.show() @@ -2210,6 +2441,34 @@ def _heatmap( pathlib.Path(figure_file_name + "_A_opt.png"), format="png", dpi=450 ) + # pseudo A-optimality + fig = plt.pyplot.figure() + plt.pyplot.rc("axes", titlesize=font_axes) + plt.pyplot.rc("axes", labelsize=font_axes) + plt.pyplot.rc("xtick", labelsize=font_tick) + plt.pyplot.rc("ytick", labelsize=font_tick) + ax = fig.add_subplot(111) + params = {"mathtext.default": "regular"} + plt.pyplot.rcParams.update(params) + ax.set_yticks(range(len(yLabel))) + ax.set_yticklabels(yLabel) + ax.set_ylabel(ylabel_text) + ax.set_xticks(range(len(xLabel))) + ax.set_xticklabels(xLabel) + ax.set_xlabel(xlabel_text) + im = ax.imshow(hes_pseudo_a.T, cmap=plt.pyplot.cm.hot_r) + ba = plt.pyplot.colorbar(im) + ba.set_label("log10(trace(FIM))") + plt.pyplot.title(title_text + ": pseudo A-optimality") + if show_fig: + plt.pyplot.show() + else: + plt.pyplot.savefig( + pathlib.Path(figure_file_name + "_pseudo_A_opt.png"), + format="png", + dpi=450, + ) + # D-optimality fig = plt.pyplot.figure() plt.pyplot.rc("axes", titlesize=font_axes) diff --git a/pyomo/contrib/doe/examples/reactor_example.py b/pyomo/contrib/doe/examples/reactor_example.py index 0038ed114de..c098cdfa403 100644 --- a/pyomo/contrib/doe/examples/reactor_example.py +++ b/pyomo/contrib/doe/examples/reactor_example.py @@ -25,6 +25,8 @@ def run_reactor_doe( compute_FIM_full_factorial=True, plot_factorial_results=True, save_plots=True, + figure_file_name="example_reactor_compute_FIM", + log_scale=False, run_optimal_doe=True, ): """ @@ -40,6 +42,8 @@ def run_reactor_doe( whether to plot the results of the full factorial design, by default True save_plots : bool, optional whether to save draw_factorial_figure plots, by default True + figure_file_name : str, optional + file name to save the factorial figure, by default "example_reactor_compute_FIM" run_optimal_doe : bool, optional whether to run the optimal DoE, by default True """ @@ -101,7 +105,7 @@ def run_reactor_doe( ) if plot_factorial_results: if save_plots: - figure_file_name = "example_reactor_compute_FIM" + figure_file_name = figure_file_name else: figure_file_name = None @@ -122,7 +126,7 @@ def run_reactor_doe( xlabel_text="Concentration of A (M)", ylabel_text="Initial Temperature (K)", figure_file_name=figure_file_name, - log_scale=False, + log_scale=log_scale, ) ########################### diff --git a/pyomo/contrib/doe/tests/test_doe_build.py b/pyomo/contrib/doe/tests/test_doe_build.py index b157c1c9d4f..25324862969 100644 --- a/pyomo/contrib/doe/tests/test_doe_build.py +++ b/pyomo/contrib/doe/tests/test_doe_build.py @@ -31,6 +31,12 @@ ReactorExperiment as FullReactorExperiment, ) +from pyomo.contrib.parmest.examples.rooney_biegler.rooney_biegler import ( + RooneyBieglerExperiment, +) +from pyomo.contrib.parmest.examples.rooney_biegler.doe_example import ( + run_rooney_biegler_doe, +) import pyomo.environ as pyo from pyomo.opt import SolverFactory @@ -135,7 +141,7 @@ def get_standard_args(experiment, fd_method, obj_used): class TestReactorExampleBuild(unittest.TestCase): def test_reactor_fd_central_check_fd_eqns(self): fd_method = "central" - obj_used = "trace" + obj_used = "pseudo_trace" experiment = FullReactorExperiment(data_ex, 10, 3) @@ -174,7 +180,7 @@ def test_reactor_fd_central_check_fd_eqns(self): def test_reactor_fd_backward_check_fd_eqns(self): fd_method = "backward" - obj_used = "trace" + obj_used = "pseudo_trace" experiment = FullReactorExperiment(data_ex, 10, 3) @@ -212,7 +218,7 @@ def test_reactor_fd_backward_check_fd_eqns(self): def test_reactor_fd_forward_check_fd_eqns(self): fd_method = "forward" - obj_used = "trace" + obj_used = "pseudo_trace" experiment = FullReactorExperiment(data_ex, 10, 3) @@ -250,7 +256,7 @@ def test_reactor_fd_forward_check_fd_eqns(self): def test_reactor_fd_central_design_fixing(self): fd_method = "central" - obj_used = "trace" + obj_used = "pseudo_trace" experiment = FullReactorExperiment(data_ex, 10, 3) @@ -283,7 +289,7 @@ def test_reactor_fd_central_design_fixing(self): def test_reactor_fd_backward_design_fixing(self): fd_method = "backward" - obj_used = "trace" + obj_used = "pseudo_trace" experiment = FullReactorExperiment(data_ex, 10, 3) @@ -316,7 +322,7 @@ def test_reactor_fd_backward_design_fixing(self): def test_reactor_fd_forward_design_fixing(self): fd_method = "forward" - obj_used = "trace" + obj_used = "pseudo_trace" experiment = FullReactorExperiment(data_ex, 10, 3) @@ -376,7 +382,7 @@ def test_reactor_check_user_initialization(self): def test_update_FIM(self): fd_method = "forward" - obj_used = "trace" + obj_used = "pseudo_trace" experiment = FullReactorExperiment(data_ex, 10, 3) @@ -398,7 +404,7 @@ def test_update_FIM(self): def test_get_experiment_inputs_without_blocks(self): fd_method = "forward" - obj_used = "trace" + obj_used = "pseudo_trace" experiment = FullReactorExperiment(data_ex, 10, 3) @@ -417,7 +423,7 @@ def test_get_experiment_inputs_without_blocks(self): def test_get_experiment_outputs_without_blocks(self): fd_method = "forward" - obj_used = "trace" + obj_used = "pseudo_trace" experiment = FullReactorExperiment(data_ex, 10, 3) @@ -436,7 +442,7 @@ def test_get_experiment_outputs_without_blocks(self): def test_get_measurement_error_without_blocks(self): fd_method = "forward" - obj_used = "trace" + obj_used = "pseudo_trace" experiment = FullReactorExperiment(data_ex, 10, 3) @@ -455,7 +461,7 @@ def test_get_measurement_error_without_blocks(self): def test_get_unknown_parameters_without_blocks(self): fd_method = "forward" - obj_used = "trace" + obj_used = "pseudo_trace" experiment = FullReactorExperiment(data_ex, 10, 3) @@ -475,7 +481,7 @@ def test_get_unknown_parameters_without_blocks(self): def test_generate_blocks_without_model(self): fd_method = "forward" - obj_used = "trace" + obj_used = "pseudo_trace" experiment = FullReactorExperiment(data_ex, 10, 3) @@ -502,5 +508,119 @@ def test_reactor_update_suffix_items(self): self.assertAlmostEqual(v, new_vals[i], places=6) +@unittest.skipIf(not ipopt_available, "The 'ipopt' command is not available") +@unittest.skipIf(not numpy_available, "Numpy is not available") +class TestDoEObjectiveOptions(unittest.TestCase): + def test_trace_constraints(self): + fd_method = "central" + obj_used = "trace" + + experiment = run_rooney_biegler_doe()["experiment"] + + DoE_args = get_standard_args(experiment, fd_method, obj_used) + doe_obj = DesignOfExperiments(**DoE_args) + + doe_obj.create_doe_model() + doe_obj.create_objective_function() + + model = doe_obj.model + + # Basic objects exist + self.assertTrue(hasattr(model, "objective")) + self.assertTrue(hasattr(model, "cov_trace")) + self.assertTrue(hasattr(model, "fim_inv")) + self.assertTrue(hasattr(model, "L")) + self.assertTrue(hasattr(model, "L_inv")) + + # Constraints live under obj_cons block + self.assertTrue(hasattr(model, "obj_cons")) + + # Cholesky-related constraints + self.assertTrue(hasattr(model.obj_cons, "cholesky_cons")) + self.assertTrue(hasattr(model.obj_cons, "cholesky_inv_cons")) + self.assertTrue(hasattr(model.obj_cons, "cholesky_LLinv_cons")) + self.assertTrue(hasattr(model.obj_cons, "cov_trace_rule")) + + self.assertIsInstance(model.obj_cons.cholesky_cons, pyo.Constraint) + self.assertIsInstance(model.obj_cons.cholesky_inv_cons, pyo.Constraint) + self.assertIsInstance(model.obj_cons.cholesky_LLinv_cons, pyo.Constraint) + self.assertIsInstance(model.obj_cons.cov_trace_rule, pyo.Constraint) + + # Indexing logic: lower triangle only + params = list(model.parameter_names) + + for i, c in enumerate(params): + for j, d in enumerate(params): + # cholesky_inv_imp: only defined for i >= j + if i >= j: + self.assertIn( + (c, d), + model.obj_cons.cholesky_inv_cons, + msg=f"Missing cholesky_inv_cons[{c},{d}]", + ) + else: + self.assertNotIn( + (c, d), + model.obj_cons.cholesky_inv_cons, + msg=f"Unexpected cholesky_inv_cons[{c},{d}]", + ) + + # cholesky_LLinv_imp: only defined for i >= j + if i >= j: + self.assertIn( + (c, d), + model.obj_cons.cholesky_LLinv_cons, + msg=f"Missing cholesky_LLinv_cons[{c},{d}]", + ) + else: + self.assertNotIn( + (c, d), + model.obj_cons.cholesky_LLinv_cons, + msg=f"Unexpected cholesky_LLinv_cons[{c},{d}]", + ) + + # Objective definition sanity + self.assertIsInstance(model.objective, pyo.Objective) + self.assertEqual(model.objective.sense, pyo.minimize) + self.assertIs(model.objective.expr, model.cov_trace) + + def test_trace_initialization_consistency(self): + fd_method = "central" + obj_used = "trace" + + experiment = run_rooney_biegler_doe()["experiment"] + + DoE_args = get_standard_args(experiment, fd_method, obj_used) + doe_obj = DesignOfExperiments(**DoE_args) + + doe_obj.create_doe_model() + doe_obj.create_objective_function() + + model = doe_obj.model + params = list(model.parameter_names) + + # Check cov_trace initialization + cov_trace_from_fim_inv = sum(pyo.value(model.fim_inv[j, j]) for j in params) + + self.assertAlmostEqual( + pyo.value(model.cov_trace), cov_trace_from_fim_inv, places=4 + ) + + # Check L * L_inv ≈ I (lower triangle) + for i, c in enumerate(params): + for j, d in enumerate(params): + if i < j: + continue # upper triangle skipped by design + + val = sum( + pyo.value(model.L[c, params[k]]) + * pyo.value(model.L_inv[params[k], d]) + for k in range(len(params)) + ) + + expected = 1.0 if i == j else 0.0 + self.assertAlmostEqual(val, expected, places=4) + + if __name__ == "__main__": unittest.main() diff --git a/pyomo/contrib/doe/tests/test_doe_errors.py b/pyomo/contrib/doe/tests/test_doe_errors.py index a55e4a7166b..93cd352a376 100644 --- a/pyomo/contrib/doe/tests/test_doe_errors.py +++ b/pyomo/contrib/doe/tests/test_doe_errors.py @@ -33,6 +33,9 @@ FullReactorExperiment, ) +from pyomo.contrib.parmest.examples.rooney_biegler.doe_example import ( + run_rooney_biegler_doe, +) from pyomo.opt import SolverFactory ipopt_available = SolverFactory("ipopt").available() @@ -65,7 +68,8 @@ def get_standard_args(experiment, fd_method, obj_used, flag): solver.options["max_iter"] = 3000 args['solver'] = solver args['tee'] = False - args['get_labeled_model_args'] = {"flag": flag} + if flag is not None: + args['get_labeled_model_args'] = {"flag": flag} args['_Cholesky_option'] = True args['_only_compute_fim_lower'] = True return args @@ -76,7 +80,7 @@ def get_standard_args(experiment, fd_method, obj_used, flag): class TestReactorExampleErrors(unittest.TestCase): def test_experiment_none_error(self): fd_method = "central" - obj_used = "trace" + obj_used = "pseudo_trace" flag_val = 1 # Value for faulty model build mode - 1: No exp outputs with self.assertRaisesRegex( @@ -89,7 +93,7 @@ def test_experiment_none_error(self): def test_reactor_check_no_get_labeled_model(self): fd_method = "central" - obj_used = "trace" + obj_used = "pseudo_trace" flag_val = 1 # Value for faulty model build mode - 1: No exp outputs experiment = BadExperiment() @@ -104,7 +108,7 @@ def test_reactor_check_no_get_labeled_model(self): def test_reactor_check_no_experiment_outputs(self): fd_method = "central" - obj_used = "trace" + obj_used = "pseudo_trace" flag_val = 1 # Value for faulty model build mode - 1: No exp outputs experiment = FullReactorExperiment(data_ex, 10, 3) @@ -121,7 +125,7 @@ def test_reactor_check_no_experiment_outputs(self): def test_reactor_check_no_measurement_error(self): fd_method = "central" - obj_used = "trace" + obj_used = "pseudo_trace" flag_val = 2 # Value for faulty model build mode - 2: No meas error experiment = FullReactorExperiment(data_ex, 10, 3) @@ -138,7 +142,7 @@ def test_reactor_check_no_measurement_error(self): def test_reactor_check_no_experiment_inputs(self): fd_method = "central" - obj_used = "trace" + obj_used = "pseudo_trace" flag_val = 3 # Value for faulty model build mode - 3: No exp inputs/design vars experiment = FullReactorExperiment(data_ex, 10, 3) @@ -155,7 +159,7 @@ def test_reactor_check_no_experiment_inputs(self): def test_reactor_check_no_unknown_parameters(self): fd_method = "central" - obj_used = "trace" + obj_used = "pseudo_trace" flag_val = 4 # Value for faulty model build mode - 4: No unknown params experiment = FullReactorExperiment(data_ex, 10, 3) @@ -172,7 +176,7 @@ def test_reactor_check_no_unknown_parameters(self): def test_reactor_check_bad_prior_size(self): fd_method = "central" - obj_used = "trace" + obj_used = "pseudo_trace" flag_val = 0 # Value for faulty model build mode - 0: full model prior_FIM = np.ones((5, 5)) @@ -197,7 +201,7 @@ def test_reactor_check_bad_prior_negative_eigenvalue(self): from pyomo.contrib.doe.doe import _SMALL_TOLERANCE_DEFINITENESS fd_method = "central" - obj_used = "trace" + obj_used = "pseudo_trace" flag_val = 0 # Value for faulty model build mode - 0: full model prior_FIM = -np.ones((4, 4)) @@ -222,7 +226,7 @@ def test_reactor_check_bad_prior_not_symmetric(self): from pyomo.contrib.doe.utils import _SMALL_TOLERANCE_SYMMETRY fd_method = "central" - obj_used = "trace" + obj_used = "pseudo_trace" flag_val = 0 # Value for faulty model build mode - 0: full model prior_FIM = np.zeros((4, 4)) @@ -245,7 +249,7 @@ def test_reactor_check_bad_prior_not_symmetric(self): def test_reactor_check_bad_jacobian_init_size(self): fd_method = "central" - obj_used = "trace" + obj_used = "pseudo_trace" flag_val = 0 # Value for faulty model build mode - 0: full model jac_init = np.ones((5, 5)) @@ -267,7 +271,7 @@ def test_reactor_check_bad_jacobian_init_size(self): def test_reactor_check_unbuilt_update_FIM(self): fd_method = "central" - obj_used = "trace" + obj_used = "pseudo_trace" flag_val = 0 # Value for faulty model build mode - 0: full model FIM_update = np.ones((4, 4)) @@ -287,7 +291,7 @@ def test_reactor_check_unbuilt_update_FIM(self): def test_reactor_check_none_update_FIM(self): fd_method = "central" - obj_used = "trace" + obj_used = "pseudo_trace" flag_val = 0 # Value for faulty model build mode - 0: full model FIM_update = None @@ -306,7 +310,7 @@ def test_reactor_check_none_update_FIM(self): def test_reactor_check_results_file_name(self): fd_method = "central" - obj_used = "trace" + obj_used = "pseudo_trace" flag_val = 0 # Value for faulty model build mode - 0: Full model experiment = FullReactorExperiment(data_ex, 10, 3) @@ -322,7 +326,7 @@ def test_reactor_check_results_file_name(self): def test_reactor_check_measurement_and_output_length_match(self): fd_method = "central" - obj_used = "trace" + obj_used = "pseudo_trace" flag_val = ( 5 # Value for faulty model build mode - 5: Mismatch error and output length ) @@ -509,7 +513,7 @@ def test_reactor_figure_drawing_bad_sens_names(self): def test_reactor_check_get_FIM_without_FIM(self): fd_method = "central" - obj_used = "trace" + obj_used = "pseudo_trace" flag_val = ( 0 # Value for faulty model build mode - 5: Mismatch error and output length ) @@ -529,7 +533,7 @@ def test_reactor_check_get_FIM_without_FIM(self): def test_reactor_check_get_sens_mat_without_model(self): fd_method = "central" - obj_used = "trace" + obj_used = "pseudo_trace" flag_val = ( 0 # Value for faulty model build mode - 5: Mismatch error and output length ) @@ -550,7 +554,7 @@ def test_reactor_check_get_sens_mat_without_model(self): def test_reactor_check_get_exp_inputs_without_model(self): fd_method = "central" - obj_used = "trace" + obj_used = "pseudo_trace" flag_val = ( 0 # Value for faulty model build mode - 5: Mismatch error and output length ) @@ -571,7 +575,7 @@ def test_reactor_check_get_exp_inputs_without_model(self): def test_reactor_check_get_exp_outputs_without_model(self): fd_method = "central" - obj_used = "trace" + obj_used = "pseudo_trace" flag_val = ( 0 # Value for faulty model build mode - 5: Mismatch error and output length ) @@ -592,7 +596,7 @@ def test_reactor_check_get_exp_outputs_without_model(self): def test_reactor_check_get_unknown_params_without_model(self): fd_method = "central" - obj_used = "trace" + obj_used = "pseudo_trace" flag_val = ( 0 # Value for faulty model build mode - 5: Mismatch error and output length ) @@ -613,7 +617,7 @@ def test_reactor_check_get_unknown_params_without_model(self): def test_reactor_check_get_meas_error_without_model(self): fd_method = "central" - obj_used = "trace" + obj_used = "pseudo_trace" flag_val = ( 0 # Value for faulty model build mode - 5: Mismatch error and output length ) @@ -634,7 +638,7 @@ def test_reactor_check_get_meas_error_without_model(self): def test_multiple_exp_not_implemented_seq(self): fd_method = "central" - obj_used = "trace" + obj_used = "pseudo_trace" flag_val = ( 0 # Value for faulty model build mode - 5: Mismatch error and output length ) @@ -652,7 +656,7 @@ def test_multiple_exp_not_implemented_seq(self): def test_multiple_exp_not_implemented_sim(self): fd_method = "central" - obj_used = "trace" + obj_used = "pseudo_trace" flag_val = ( 0 # Value for faulty model build mode - 5: Mismatch error and output length ) @@ -670,7 +674,7 @@ def test_multiple_exp_not_implemented_sim(self): def test_update_unknown_parameter_values_not_implemented_seq(self): fd_method = "central" - obj_used = "trace" + obj_used = "pseudo_trace" flag_val = ( 0 # Value for faulty model build mode - 5: Mismatch error and output length ) @@ -689,7 +693,7 @@ def test_update_unknown_parameter_values_not_implemented_seq(self): @unittest.skipIf(not ipopt_available, "The 'ipopt' command is not available") def test_bad_FD_generate_scens(self): fd_method = "central" - obj_used = "trace" + obj_used = "pseudo_trace" flag_val = ( 0 # Value for faulty model build mode - 5: Mismatch error and output length ) @@ -716,7 +720,7 @@ def test_bad_FD_generate_scens(self): @unittest.skipIf(not ipopt_available, "The 'ipopt' command is not available") def test_bad_FD_seq_compute_FIM(self): fd_method = "central" - obj_used = "trace" + obj_used = "pseudo_trace" flag_val = ( 0 # Value for faulty model build mode - 5: Mismatch error and output length ) @@ -742,7 +746,7 @@ def test_bad_FD_seq_compute_FIM(self): def test_bad_objective(self): fd_method = "central" - obj_used = "trace" + obj_used = "pseudo_trace" flag_val = ( 0 # Value for faulty model build mode - 5: Mismatch error and output length ) @@ -768,7 +772,7 @@ def test_bad_objective(self): def test_no_model_for_objective(self): fd_method = "central" - obj_used = "trace" + obj_used = "pseudo_trace" flag_val = ( 0 # Value for faulty model build mode - 5: Mismatch error and output length ) @@ -789,7 +793,7 @@ def test_no_model_for_objective(self): @unittest.skipIf(not ipopt_available, "The 'ipopt' command is not available") def test_bad_compute_FIM_option(self): fd_method = "central" - obj_used = "trace" + obj_used = "pseudo_trace" flag_val = ( 0 # Value for faulty model build mode - 5: Mismatch error and output length ) @@ -808,6 +812,25 @@ def test_bad_compute_FIM_option(self): ): doe_obj.compute_FIM(method="Bad Method") + @unittest.skipIf(not ipopt_available, "The 'ipopt' command is not available") + def test_invalid_trace_without_cholesky(self): + fd_method = "central" + obj_used = "trace" + + experiment = run_rooney_biegler_doe()["experiment"] + + DoE_args = get_standard_args(experiment, fd_method, obj_used, flag=None) + DoE_args['_Cholesky_option'] = False + + doe_obj = DesignOfExperiments(**DoE_args) + doe_obj.create_doe_model() + + with self.assertRaisesRegex( + ValueError, + "objective_option='trace' currently only implemented with ``_Cholesky option=True``.", + ): + doe_obj.create_objective_function() + if __name__ == "__main__": unittest.main() diff --git a/pyomo/contrib/doe/tests/test_doe_solve.py b/pyomo/contrib/doe/tests/test_doe_solve.py index 7b4be303d04..93f4f85eb48 100644 --- a/pyomo/contrib/doe/tests/test_doe_solve.py +++ b/pyomo/contrib/doe/tests/test_doe_solve.py @@ -10,7 +10,8 @@ # ___________________________________________________________________________ import json import logging -import os.path +import os, os.path +from glob import glob from pyomo.common.dependencies import ( numpy as np, @@ -18,8 +19,13 @@ pandas as pd, pandas_available, scipy_available, + attempt_import, ) +# Safely try to import matplotlib +matplotlib, matplotlib_available = attempt_import('matplotlib') +if matplotlib_available: + matplotlib.use("Agg") # Use non-interactive backend (for CI testing purposes from pyomo.common.fileutils import this_file_dir import pyomo.common.unittest as unittest @@ -38,6 +44,9 @@ FullReactorExperimentBad, ) from pyomo.contrib.doe.utils import rescale_FIM +from pyomo.contrib.parmest.examples.rooney_biegler.doe_example import ( + run_rooney_biegler_doe, +) import pyomo.environ as pyo @@ -140,7 +149,7 @@ def get_standard_args(experiment, fd_method, obj_used): class TestReactorExampleSolving(unittest.TestCase): def test_reactor_fd_central_solve(self): fd_method = "central" - obj_used = "trace" + obj_used = "pseudo_trace" experiment = FullReactorExperiment(data_ex, 10, 3) @@ -185,7 +194,7 @@ def test_reactor_fd_forward_solve(self): def test_reactor_fd_backward_solve(self): fd_method = "backward" - obj_used = "trace" + obj_used = "pseudo_trace" experiment = FullReactorExperiment(data_ex, 10, 3) @@ -544,7 +553,7 @@ def test_doe_full_factorial(self): ff_results["log10 D-opt"], log10_D_opt_expected, abstol=1e-4 ) self.assertStructuredAlmostEqual( - ff_results["log10 A-opt"], log10_A_opt_expected, abstol=1e-4 + ff_results["log10 pseudo A-opt"], log10_A_opt_expected, abstol=1e-4 ) self.assertStructuredAlmostEqual( ff_results["log10 E-opt"], log10_E_opt_expected, abstol=1e-4 @@ -561,6 +570,155 @@ def test_doe_full_factorial(self): self.assertStructuredAlmostEqual(ff_results["det_FIM"], det_FIM_expected) self.assertStructuredAlmostEqual(ff_results["trace_FIM"], trace_FIM_expected) + def test_doe_A_optimality(self): + A_opt_value_expected = -2.2364242059539663 + A_opt_design_value_expected = 9.999955457176451 + + A_opt_value = run_rooney_biegler_doe(optimize_experiment_A=True)[ + "optimization" + ]["A"]["value"] + A_opt_design_value = run_rooney_biegler_doe(optimize_experiment_A=True)[ + "optimization" + ]["A"]["design"][0] + + self.assertAlmostEqual(A_opt_value, A_opt_value_expected, places=2) + print("A optimal design value:", A_opt_design_value) + self.assertAlmostEqual( + A_opt_design_value, A_opt_design_value_expected, places=2 + ) + + +@unittest.skipIf(not ipopt_available, "The 'ipopt' solver is not available") +@unittest.skipIf(not numpy_available, "Numpy is not available") +@unittest.skipIf(not pandas_available, "pandas is not available") +@unittest.skipIf(not matplotlib_available, "Matplotlib is not available") +class TestDoEFactorialFigure(unittest.TestCase): + def test_doe_1D_plotting_function(self): + # For 1D plotting we will use the Rooney-Biegler example in parmest/examples + plt = matplotlib.pyplot + """ + Test that the plotting function executes without error and + creates a matplotlib figure. We do NOT test visual correctness. + """ + + # File prefix for saved plots + # Define prefixes for the two runs + prefix_linear = "rooney_linear" + prefix_log = "rooney_log" + + # Clean up any existing plot files from test runs + def cleanup_files(): + files_to_remove = glob("rooney_*.png") + for f in files_to_remove: + try: + os.remove(f) + except OSError: + pass + plt.close('all') + + self.addCleanup(cleanup_files) + + fd_method = "central" + obj_used = "trace" + + experiment = run_rooney_biegler_doe()["experiment"] + + DoE_args = get_standard_args(experiment, fd_method, obj_used) + doe_obj = DesignOfExperiments(**DoE_args) + + doe_obj.compute_FIM_full_factorial(design_ranges={'hour': [0, 10, 5]}) + + # Call the plotting function for linear scale + doe_obj.draw_factorial_figure( + sensitivity_design_variables=['hour'], + fixed_design_variables={}, + log_scale=False, + figure_file_name=prefix_linear, + ) + + # Call the plotting function for log scale + doe_obj.draw_factorial_figure( + sensitivity_design_variables=['hour'], + fixed_design_variables={}, + log_scale=True, + figure_file_name=prefix_log, + ) + + # Verify that the linear scale plots were also created + # Check that we found exactly 5 files (A, D, E, ME, pseudo_A) + expected_plot_linear = glob(f"{prefix_linear}*.png") + self.assertEqual( + len(expected_plot_linear), + 5, + f"Expected 5 plot files, but found {len(expected_plot_linear)}. Files found: {expected_plot_linear}", + ) + + # Verify that the log scale plots were also created + expected_plot_log = glob(f"{prefix_log}*.png") + self.assertEqual( + len(expected_plot_log), + 5, + f"Expected 5 plot files, but found {len(expected_plot_log)}. Files found: {expected_plot_log}", + ) + + def test_doe_2D_plotting_function(self): + # For 2D plotting we will use the Rooney-Biegler example in doe/examples + plt = matplotlib.pyplot + + # File prefix for saved plots + prefix_linear = "reactor_linear" + prefix_log = "reactor_log" + + # Clean up any existing plot files from test runs + def cleanup_files(): + files_to_remove = glob("reactor_*.png") + for f in files_to_remove: + try: + os.remove(f) + except OSError: + pass + plt.close('all') + + self.addCleanup(cleanup_files) + + # Run the reactor example + run_reactor_doe( + n_points_for_design=3, + compute_FIM_full_factorial=True, + plot_factorial_results=True, + save_plots=True, + figure_file_name=prefix_linear, + log_scale=False, + run_optimal_doe=False, + ) + + # Verify that the linear scale plots were also created + # Check that we found exactly 5 files (A, D, E, ME, pseudo_A) + expected_plot_linear = glob(f"{prefix_linear}*.png") + self.assertTrue( + len(expected_plot_linear) == 5, + f"Expected 5 plot files, but found {len(expected_plot_linear)}. Files found: {expected_plot_linear}", + ) + + # Run the reactor example with log scale + run_reactor_doe( + n_points_for_design=3, + compute_FIM_full_factorial=True, + plot_factorial_results=True, + save_plots=True, + figure_file_name=prefix_log, + log_scale=True, + run_optimal_doe=False, + ) + + # Verify that the log scale plots were also created + # Check that we found exactly 5 files (A, D, E, ME, pseudo_A) + expected_plot_log = glob(f"{prefix_log}*.png") + self.assertTrue( + len(expected_plot_log) == 5, + f"Expected 5 plot files, but found {len(expected_plot_log)}. Files found: {expected_plot_log}", + ) + if __name__ == "__main__": unittest.main() diff --git a/pyomo/contrib/doe/tests/test_utils.py b/pyomo/contrib/doe/tests/test_utils.py index 6f77a4ca75b..08e5d272abb 100644 --- a/pyomo/contrib/doe/tests/test_utils.py +++ b/pyomo/contrib/doe/tests/test_utils.py @@ -64,38 +64,58 @@ def test_check_FIM_non_symmetric(self): """Test the compute_FIM_metrics() from utils.py.""" + ### Helper methods for test cases + # Sample FIM for testing + def _get_test_fim(self): + """Helper method returning test FIM matrix.""" + return np.array([[10, 2], [2, 3]]) + + # Expected results for the test FIM + def _get_expected_fim_results(self): + """Helper method returning expected FIM computation results.""" + return { + 'det': 26.000000000000004, + 'D_opt': 1.414973347970818, + 'trace_cov': 0.5, + 'A_opt': -0.3010299956639812, + 'trace_FIM': 13, + 'pseudo_A_opt': 1.1139433523068367, + 'E_vals': np.array([10.53112887, 2.46887113]), + 'E_vecs': np.array([[0.96649965, -0.25666794], [0.25666794, 0.96649965]]), + 'E_opt': 0.3924984205140895, + 'ME_opt': 0.6299765069426388, + } + def test_compute_FIM_metrics(self): # Create a sample Fisher Information Matrix (FIM) - FIM = np.array([[10, 2], [2, 3]]) - - (det_FIM, trace_FIM, E_vals, E_vecs, D_opt, A_opt, E_opt, ME_opt) = ( - compute_FIM_metrics(FIM) - ) - + FIM = self._get_test_fim() # expected results - det_expected = 26.000000000000004 - D_opt_expected = 1.414973347970818 - - trace_expected = 13 - A_opt_expected = 1.1139433523068367 - - E_vals_expected = np.array([10.53112887, 2.46887113]) - E_vecs_expected = np.array( - [[0.96649965, -0.25666794], [0.25666794, 0.96649965]] - ) - E_opt_expected = 0.3924984205140895 - - ME_opt_expected = 0.6299765069426388 + expected = self._get_expected_fim_results() + + ( + det_FIM, + trace_cov, + trace_FIM, + E_vals, + E_vecs, + D_opt, + A_opt, + pseudo_A_opt, + E_opt, + ME_opt, + ) = compute_FIM_metrics(FIM) # Test results - self.assertAlmostEqual(det_FIM, det_expected) - self.assertAlmostEqual(trace_FIM, trace_expected) - self.assertTrue(np.allclose(E_vals, E_vals_expected)) - self.assertTrue(np.allclose(E_vecs, E_vecs_expected)) - self.assertAlmostEqual(D_opt, D_opt_expected) - self.assertAlmostEqual(A_opt, A_opt_expected) - self.assertAlmostEqual(E_opt, E_opt_expected) - self.assertAlmostEqual(ME_opt, ME_opt_expected) + self.assertAlmostEqual(det_FIM, expected['det']) + self.assertAlmostEqual(trace_cov, expected['trace_cov']) + self.assertAlmostEqual(trace_FIM, expected['trace_FIM']) + self.assertTrue(np.allclose(E_vals, expected['E_vals'])) + self.assertTrue(np.allclose(E_vecs, expected['E_vecs'])) + self.assertAlmostEqual(D_opt, expected['D_opt']) + self.assertAlmostEqual(A_opt, expected['A_opt']) + self.assertAlmostEqual(pseudo_A_opt, expected['pseudo_A_opt']) + self.assertAlmostEqual(E_opt, expected['E_opt']) + self.assertAlmostEqual(ME_opt, expected['ME_opt']) def test_FIM_eigenvalue_warning(self): # Create a matrix with an imaginary component large enough @@ -114,34 +134,25 @@ def test_FIM_eigenvalue_warning(self): def test_get_FIM_metrics(self): # Create a sample Fisher Information Matrix (FIM) - FIM = np.array([[10, 2], [2, 3]]) - fim_metrics = get_FIM_metrics(FIM) - + FIM = self._get_test_fim() # expected results - det_expected = 26.000000000000004 - D_opt_expected = 1.414973347970818 - - trace_expected = 13 - A_opt_expected = 1.1139433523068367 - - E_vals_expected = np.array([10.53112887, 2.46887113]) - E_vecs_expected = np.array( - [[0.96649965, -0.25666794], [0.25666794, 0.96649965]] - ) - E_opt_expected = 0.3924984205140895 - - ME_opt_expected = 0.6299765069426388 + expected = self._get_expected_fim_results() + fim_metrics = get_FIM_metrics(FIM) # Test results - self.assertAlmostEqual(fim_metrics["Determinant of FIM"], det_expected) - self.assertAlmostEqual(fim_metrics["Trace of FIM"], trace_expected) - self.assertTrue(np.allclose(fim_metrics["Eigenvalues"], E_vals_expected)) - self.assertTrue(np.allclose(fim_metrics["Eigenvectors"], E_vecs_expected)) - self.assertAlmostEqual(fim_metrics["log10(D-Optimality)"], D_opt_expected) - self.assertAlmostEqual(fim_metrics["log10(A-Optimality)"], A_opt_expected) - self.assertAlmostEqual(fim_metrics["log10(E-Optimality)"], E_opt_expected) + self.assertAlmostEqual(fim_metrics["Determinant of FIM"], expected['det']) + self.assertAlmostEqual(fim_metrics["Trace of cov"], expected['trace_cov']) + self.assertAlmostEqual(fim_metrics["Trace of FIM"], expected['trace_FIM']) + self.assertTrue(np.allclose(fim_metrics["Eigenvalues"], expected['E_vals'])) + self.assertTrue(np.allclose(fim_metrics["Eigenvectors"], expected['E_vecs'])) + self.assertAlmostEqual(fim_metrics["log10(D-Optimality)"], expected['D_opt']) + self.assertAlmostEqual(fim_metrics["log10(A-Optimality)"], expected['A_opt']) + self.assertAlmostEqual( + fim_metrics["log10(Pseudo A-Optimality)"], expected['pseudo_A_opt'] + ) + self.assertAlmostEqual(fim_metrics["log10(E-Optimality)"], expected['E_opt']) self.assertAlmostEqual( - fim_metrics["log10(Modified E-Optimality)"], ME_opt_expected + fim_metrics["log10(Modified E-Optimality)"], expected['ME_opt'] ) diff --git a/pyomo/contrib/doe/utils.py b/pyomo/contrib/doe/utils.py index a968648dc40..0cdc9b295cf 100644 --- a/pyomo/contrib/doe/utils.py +++ b/pyomo/contrib/doe/utils.py @@ -141,6 +141,8 @@ def compute_FIM_metrics(FIM): det_FIM : float Determinant of the FIM. + trace_cov : float + Trace of the covariance matrix. trace_FIM : float Trace of the FIM. E_vals : numpy.ndarray @@ -151,6 +153,8 @@ def compute_FIM_metrics(FIM): log10(D-optimality) metric. A_opt : float log10(A-optimality) metric. + pseudo_A_opt : float + log10(trace(FIM)) metric. E_opt : float log10(E-optimality) metric. ME_opt : float @@ -164,8 +168,12 @@ def compute_FIM_metrics(FIM): det_FIM = np.linalg.det(FIM) D_opt = np.log10(det_FIM) + # Trace of FIM is the pseudo A-optimality, not the proper definition of A-optimality, + # The trace of covariance is the proper definition of A-optimality trace_FIM = np.trace(FIM) - A_opt = np.log10(trace_FIM) + pseudo_A_opt = np.log10(trace_FIM) + trace_cov = np.trace(np.linalg.pinv(FIM)) + A_opt = np.log10(trace_cov) E_vals, E_vecs = np.linalg.eig(FIM) E_ind = np.argmin(E_vals.real) # index of smallest eigenvalue @@ -185,7 +193,18 @@ def compute_FIM_metrics(FIM): ME_opt = np.log10(np.linalg.cond(FIM)) - return det_FIM, trace_FIM, E_vals, E_vecs, D_opt, A_opt, E_opt, ME_opt + return ( + det_FIM, + trace_cov, + trace_FIM, + E_vals, + E_vecs, + D_opt, + A_opt, + pseudo_A_opt, + E_opt, + ME_opt, + ) # Standalone Function for user to calculate FIM metrics directly without using the class @@ -203,6 +222,8 @@ def get_FIM_metrics(FIM): "Determinant of FIM" : float determinant of the FIM + "Trace of cov" : float + trace of the covariance matrix "Trace of FIM" : float trace of the FIM "Eigenvalues" : numpy.ndarray @@ -213,23 +234,36 @@ def get_FIM_metrics(FIM): log10(D-optimality) metric "log10(A-Optimality)" : float log10(A-optimality) metric + "log10(Pseudo A-Optimality)" : float + log10(trace(FIM)) metric "log10(E-Optimality)" : float log10(E-optimality) metric "log10(Modified E-Optimality)" : float log10(Modified E-optimality) metric """ - (det_FIM, trace_FIM, E_vals, E_vecs, D_opt, A_opt, E_opt, ME_opt) = ( - compute_FIM_metrics(FIM) - ) + ( + det_FIM, + trace_cov, + trace_FIM, + E_vals, + E_vecs, + D_opt, + A_opt, + pseudo_A_opt, + E_opt, + ME_opt, + ) = compute_FIM_metrics(FIM) return { "Determinant of FIM": det_FIM, + "Trace of cov": trace_cov, "Trace of FIM": trace_FIM, "Eigenvalues": E_vals, "Eigenvectors": E_vecs, "log10(D-Optimality)": D_opt, "log10(A-Optimality)": A_opt, + "log10(Pseudo A-Optimality)": pseudo_A_opt, "log10(E-Optimality)": E_opt, "log10(Modified E-Optimality)": ME_opt, } diff --git a/pyomo/contrib/parmest/examples/rooney_biegler/doe_example.py b/pyomo/contrib/parmest/examples/rooney_biegler/doe_example.py new file mode 100644 index 00000000000..5d78489d7f3 --- /dev/null +++ b/pyomo/contrib/parmest/examples/rooney_biegler/doe_example.py @@ -0,0 +1,193 @@ +from pyomo.contrib.parmest.examples.rooney_biegler.rooney_biegler import ( + RooneyBieglerExperiment, +) +from pyomo.contrib.doe import DesignOfExperiments +from pyomo.common.dependencies import pandas as pd, numpy as np, matplotlib + + +def run_rooney_biegler_doe( + optimize_experiment_A=False, + optimize_experiment_D=False, + compute_FIM_full_factorial=False, + draw_factorial_figure=False, + design_range={'hour': [0, 10, 40]}, + plot_optimal_design=False, + tee=False, + print_output=False, +): + # Initialize a container for all potential results + results_container = { + "experiment": None, + "results_dict": {}, + "optimization": {}, # Will hold D/A optimization results if run + "plots": [], # Store figure objects if created + } + + # Data Setup + data = pd.DataFrame( + data=[[1, 8.3], [2, 10.3], [3, 19.0], [4, 16.0], [5, 15.6], [7, 19.8]], + columns=['hour', 'y'], + ) + theta = {'asymptote': 15, 'rate_constant': 0.5} + measurement_error = 0.1 + + # Compute prior FIM from existing data + FIM = np.zeros((2, 2)) + for i in range(len(data)): + exp_i = RooneyBieglerExperiment( + data=data.loc[i, :], theta=theta, measure_error=measurement_error + ) + doe_prior = DesignOfExperiments( + experiment=exp_i, objective_option="determinant", tee=tee + ) + FIM += doe_prior.compute_FIM() + + # Base Experiment for Design + rooney_biegler_experiment = RooneyBieglerExperiment( + data=data.loc[0, :], theta=theta, measure_error=measurement_error + ) + results_container["experiment"] = rooney_biegler_experiment + + rooney_biegler_doe = DesignOfExperiments( + experiment=rooney_biegler_experiment, + objective_option="determinant", + tee=tee, + prior_FIM=FIM, + improve_cholesky_roundoff_error=True, + ) + + if optimize_experiment_D: + # D-Optimality + rooney_biegler_doe_D = DesignOfExperiments( + experiment=rooney_biegler_experiment, + objective_option="determinant", + tee=tee, + prior_FIM=FIM, + improve_cholesky_roundoff_error=True, + ) + rooney_biegler_doe_D.run_doe() + + results_container["optimization"]["D"] = { + "value": rooney_biegler_doe_D.results['log10 D-opt'], + "design": rooney_biegler_doe_D.results['Experiment Design'], + } + if print_output: + print("Optimal results for D-optimality:", rooney_biegler_doe_D.results) + + if optimize_experiment_A: + # A-Optimality + rooney_biegler_doe_A = DesignOfExperiments( + experiment=rooney_biegler_experiment, + objective_option="trace", + tee=tee, + prior_FIM=FIM, + improve_cholesky_roundoff_error=False, + ) + rooney_biegler_doe_A.run_doe() + + results_container["optimization"]["A"] = { + "value": rooney_biegler_doe_A.results['log10 A-opt'], + "design": rooney_biegler_doe_A.results['Experiment Design'], + } + + if print_output: + print("Optimal results for A-optimality:", rooney_biegler_doe_A.results) + + # Compute Full Factorial Design Results + if compute_FIM_full_factorial: + results_container["results_dict"] = ( + rooney_biegler_doe.compute_FIM_full_factorial(design_ranges=design_range) + ) + if print_output: + print("Full Factorial Design Results:\n", results_container["results_dict"]) + + # Custom Plotting Functionality that shows the optimal designs and + # the max/min from the full factorial results + # Plotting Block + if plot_optimal_design: + plt = matplotlib.pyplot + res_dict = results_container["results_dict"] + opt_D = results_container["optimization"]["D"] + opt_A = results_container["optimization"]["A"] + + # D-Optimality Plot + fig1, ax1 = plt.subplots(figsize=(10, 5)) + # Locate Star values (max) + id_max_D = np.argmax(res_dict["log10 D-opt"]) + + ax1.plot(res_dict["hour"], res_dict["log10 D-opt"]) + ax1.scatter( + opt_D["design"][0], + opt_D["value"], + color='green', + marker="*", + s=600, + label=f'Optimal D-opt: {opt_D["value"]:.2f}', + ) + ax1.scatter( + res_dict["hour"][id_max_D], + res_dict["log10 D-opt"][id_max_D], + color='red', + marker="o", + s=200, + label='Max D-opt in Range', + ) + ax1.set_xlabel("Time Points") + ax1.set_ylabel("log10 D-optimality") + ax1.legend() + + results_container["plots"].append(fig1) + + # A-Optimality Plot + fig2, ax2 = plt.subplots(figsize=(10, 5)) + # Locate Star values (min) + id_min_A = np.argmin(res_dict["log10 A-opt"]) + + ax2.plot(res_dict["hour"], res_dict["log10 A-opt"]) + ax2.scatter( + opt_A["design"][0], + opt_A["value"], + color='green', + marker="*", + s=600, + label=f'Optimal A-opt: {opt_A["value"]:.2f}', + ) + ax2.scatter( + res_dict["hour"][id_min_A], + res_dict["log10 A-opt"][id_min_A], + color='red', + marker="o", + s=200, + label='Min A-opt in Range', + ) + ax2.set_xlabel("Time Points") + ax2.set_ylabel("log10 A-optimality") + ax2.legend() + + results_container["plots"].append(fig2) + + if draw_factorial_figure: + rooney_biegler_doe.draw_factorial_figure( + sensitivity_design_variables=['hour'], + fixed_design_variables={}, + log_scale=False, + figure_file_name="rooney_biegler", + ) + + return results_container + + +if __name__ == "__main__": + + results = run_rooney_biegler_doe( + optimize_experiment_A=True, + optimize_experiment_D=True, + compute_FIM_full_factorial=True, + draw_factorial_figure=True, # Set True to test file generation + design_range={'hour': [0, 10, 11]}, # Small range for speed + plot_optimal_design=True, + print_output=True, + ) + + # Show plots if running locally + # matplotlib.pyplot.show() diff --git a/pyomo/contrib/parmest/examples/rooney_biegler/rooney_biegler.py b/pyomo/contrib/parmest/examples/rooney_biegler/rooney_biegler.py index 50d8cd72861..c840b67be9f 100644 --- a/pyomo/contrib/parmest/examples/rooney_biegler/rooney_biegler.py +++ b/pyomo/contrib/parmest/examples/rooney_biegler/rooney_biegler.py @@ -91,6 +91,7 @@ def label_model(self): m.experiment_inputs.update([(m.hour, self.data['hour'])]) def get_labeled_model(self): - self.create_model() - self.label_model() + if self.model is None: + self.create_model() + self.label_model() return self.model diff --git a/pyomo/contrib/parmest/tests/test_examples.py b/pyomo/contrib/parmest/tests/test_examples.py index ce790b7ddb7..d165bafc058 100644 --- a/pyomo/contrib/parmest/tests/test_examples.py +++ b/pyomo/contrib/parmest/tests/test_examples.py @@ -14,10 +14,15 @@ from pyomo.contrib.parmest.graphics import matplotlib_available, seaborn_available from pyomo.contrib.pynumero.asl import AmplInterface from pyomo.opt import SolverFactory +from pyomo.common.dependencies import attempt_import +import os ipopt_available = SolverFactory("ipopt").available() pynumero_ASL_available = AmplInterface.available() +pandas, pandas_available = attempt_import("pandas") +matplotlib, matplotlib_available = attempt_import("matplotlib") + @unittest.skipIf( not parmest.parmest_available, @@ -56,6 +61,70 @@ def test_likelihood_ratio_example(self): likelihood_ratio_example.main() + # Test rooney biegler DOE example + @unittest.skipUnless(pandas_available, "test requires pandas") + @unittest.skipUnless(matplotlib_available, "test requires matplotlib") + def test_doe_example(self): + """ + Tests the Design of Experiments (DoE) functionality, including + plotting logic, without displaying GUI windows. + """ + import glob # To handle multiple file matches + from pyomo.contrib.parmest.examples.rooney_biegler.doe_example import ( + run_rooney_biegler_doe, + ) + + plt = matplotlib.pyplot + + # Switch backend to 'Agg' to prevent GUI windows from popping up + plt.switch_backend('Agg') + + # Define the prefix used to identify files to be cleaned up + file_prefix = "rooney_biegler" + + # Ensure we clean up the file even if the test fails halfway through + def cleanup_file(): + generated_files = glob.glob(f"{file_prefix}_*.png") + for f in generated_files: + try: + os.remove(f) + except OSError: + pass + + self.addCleanup(cleanup_file) + + # Run with plotting flags set to TRUE + results = run_rooney_biegler_doe( + optimize_experiment_A=True, + optimize_experiment_D=True, + compute_FIM_full_factorial=True, + draw_factorial_figure=True, + design_range={'hour': [0, 10, 3]}, + plot_optimal_design=True, + tee=False, + ) + + # Assertions for Numerical Results + self.assertIn("D", results["optimization"]) + self.assertIn("A", results["optimization"]) + + # Assertions for Plotting Logic + # Check that the custom plotting block ran and stored figures + self.assertTrue( + len(results["plots"]) > 0, "No plot objects were stored in results['plots']" + ) + + # Check that draw_factorial_figure actually created the file + expected_d_plot = f"{file_prefix}_D_opt.png" + self.assertTrue( + os.path.exists(expected_d_plot), + f"Expected plot file '{expected_d_plot}' was not created.", + ) + + # Explicitly close figures to free memory + for fig in results["plots"]: + plt.close(fig) + @unittest.skipUnless(pynumero_ASL_available, "test requires libpynumero_ASL") @unittest.skipUnless(ipopt_available, "The 'ipopt' solver is not available")