diff --git a/botorch/acquisition/analytic.py b/botorch/acquisition/analytic.py index 265677d8bf..ca1bf6d957 100644 --- a/botorch/acquisition/analytic.py +++ b/botorch/acquisition/analytic.py @@ -13,7 +13,7 @@ import math -from abc import ABC +from abc import ABC, abstractmethod from contextlib import nullcontext from copy import deepcopy @@ -415,7 +415,112 @@ def forward(self, X: Tensor) -> Tensor: return _log_ei_helper(u) + sigma.log() -class LogConstrainedExpectedImprovement(AnalyticAcquisitionFunction): +class ConstrainedAnalyticAcquisitionFunctionMixin(ABC): + r"""Base class for constrained analytic acquisition functions.""" + + def __init__( + self, + constraints: dict[int, tuple[float | None, float | None]], + ) -> None: + r"""Analytic Log Probability of Feasibility. + + Args: + model: A fitted multi-output model. + constraints: A dictionary of the form `{i: [lower, upper]}`, where + `i` is the output index, and `lower` and `upper` are lower and upper + bounds on that output (resp. interpreted as -Inf / Inf if None). + """ + self.constraints = constraints + self._preprocess_constraint_bounds(constraints=constraints) + + @abstractmethod + def register_buffer(self, name: str, value: Tensor) -> None: + """Add a buffer that can be accessed by `self.name` and stores the Tensor + `value`, usually provided by derivatives that also inherit from `nn.Module`. + """ + + def _preprocess_constraint_bounds( + self, + constraints: dict[int, tuple[float | None, float | None]], + ) -> None: + r"""Set up constraint bounds. + + Args: + constraints: A dictionary of the form `{i: [lower, upper]}`, where + `i` is the output index, and `lower` and `upper` are lower and upper + bounds on that output (resp. interpreted as -Inf / Inf if None) + """ + con_lower, con_lower_inds = [], [] + con_upper, con_upper_inds = [], [] + con_both, con_both_inds = [], [] + con_indices = list(constraints.keys()) + if len(con_indices) == 0: + raise ValueError("There must be at least one constraint.") + # CEI, LogCEI have an objective index, but LogPOF does not. + if hasattr(self, "objective_index") and self.objective_index in con_indices: + raise ValueError( + "Output corresponding to objective should not be a constraint." + ) + for k in con_indices: + if constraints[k][0] is not None and constraints[k][1] is not None: + if constraints[k][1] <= constraints[k][0]: + raise ValueError("Upper bound is less than the lower bound.") + con_both_inds.append(k) + con_both.append([constraints[k][0], constraints[k][1]]) + elif constraints[k][0] is not None: + con_lower_inds.append(k) + con_lower.append(constraints[k][0]) + elif constraints[k][1] is not None: + con_upper_inds.append(k) + con_upper.append(constraints[k][1]) + + for name, value in [ + ("con_lower_inds", con_lower_inds), + ("con_upper_inds", con_upper_inds), + ("con_both_inds", con_both_inds), + ("con_both", con_both), + ("con_lower", con_lower), + ("con_upper", con_upper), + ]: + # tensor-based indexing is much faster than list-based advanced indexing + self.register_buffer(name, tensor=torch.as_tensor(value)) + + def _compute_log_prob_feas( + self, + means: Tensor, + sigmas: Tensor, + ) -> Tensor: + r"""Compute logarithm of the feasibility probability for each batch of X. + + Args: + X: A `(b) x 1 x d`-dim Tensor of `(b)` t-batches of `d`-dim design + points each. + means: A `(b) x m`-dim Tensor of means. + sigmas: A `(b) x m`-dim Tensor of standard deviations. + + Returns: + A `b`-dim tensor of log feasibility probabilities + + Note: This function does case-work for upper bound, lower bound, and both-sided + bounds. Another way to do it would be to use 'inf' and -'inf' for the + one-sided bounds and use the logic for the both-sided case. But this + causes an issue with autograd since we get 0 * inf. + """ + return compute_log_prob_feas_from_bounds( + con_lower_inds=self.con_lower_inds, + con_upper_inds=self.con_upper_inds, + con_both_inds=self.con_both_inds, + con_lower=self.con_lower, + con_upper=self.con_upper, + con_both=self.con_both, + means=means, + sigmas=sigmas, + ) + + +class LogConstrainedExpectedImprovement( + AnalyticAcquisitionFunction, ConstrainedAnalyticAcquisitionFunctionMixin +): r"""Log Constrained Expected Improvement (feasibility-weighted). Computes the logarithm of the analytic expected improvement for a Normal posterior @@ -464,13 +569,12 @@ def __init__( maximize: If True, consider the problem a maximization problem. """ # Use AcquisitionFunction constructor to avoid check for posterior transform. - super(AnalyticAcquisitionFunction, self).__init__(model=model) + AcquisitionFunction.__init__(self, model=model) self.posterior_transform = None self.maximize = maximize self.objective_index = objective_index - self.constraints = constraints self.register_buffer("best_f", torch.as_tensor(best_f)) - _preprocess_constraint_bounds(self, constraints=constraints) + ConstrainedAnalyticAcquisitionFunctionMixin.__init__(self, constraints) self.register_forward_pre_hook(convert_to_target_pre_hook) @t_batch_mode_transform(expected_q=1) @@ -490,11 +594,77 @@ def forward(self, X: Tensor) -> Tensor: mean_obj, sigma_obj = means[..., ind], sigmas[..., ind] u = _scaled_improvement(mean_obj, sigma_obj, self.best_f, self.maximize) log_ei = _log_ei_helper(u) + sigma_obj.log() - log_prob_feas = _compute_log_prob_feas(self, means=means, sigmas=sigmas) + log_prob_feas = self._compute_log_prob_feas(means=means, sigmas=sigmas) return log_ei + log_prob_feas -class ConstrainedExpectedImprovement(AnalyticAcquisitionFunction): +class LogProbabilityOfFeasibility( + AnalyticAcquisitionFunction, ConstrainedAnalyticAcquisitionFunctionMixin +): + r"""Log Probability of Feasbility. + + Computes the logarithm of the analytic probability of feasibility for a Normal + posterior distribution weighted by a probability of feasibility. The objective and + constraints are assumed to be independent and have Gaussian posterior + distributions. Only supports non-batch mode (i.e. `q=1`). The model should be + multi-outcome, with the index of the objective and constraints passed to + the constructor. + + See [Ament2023logei]_ for details. Formally, + + `LogPF(x) = Sum_i log(P(y_i \in [lower_i, upper_i]))`, + + where `y_i ~ constraint_i(x)` and `lower_i`, `upper_i` are the lower and + upper bounds for the i-th constraint, respectively. + + Example: + # example where the 0th output has a non-negativity constraint + >>> model = SingleTaskGP(train_X, train_Y) + >>> constraints = {0: (0.0, None)} + >>> LogPOF = LogProbabilityOfFeasibility(model, constraints) + >>> cei = LogPF(test_X) + """ + + _log: bool = True + + def __init__( + self, + model: Model, + constraints: dict[int, tuple[float | None, float | None]], + ) -> None: + r"""Analytic Log Probability of Feasibility. + + Args: + model: A fitted multi-output model. + constraints: A dictionary of the form `{i: [lower, upper]}`, where + `i` is the output index, and `lower` and `upper` are lower and upper + bounds on that output (resp. interpreted as -Inf / Inf if None) + """ + # Use AcquisitionFunction constructor to avoid check for posterior transform. + AcquisitionFunction.__init__(self, model=model) + self.posterior_transform = None + ConstrainedAnalyticAcquisitionFunctionMixin.__init__(self, constraints) + self.register_forward_pre_hook(convert_to_target_pre_hook) + + @t_batch_mode_transform(expected_q=1) + def forward(self, X: Tensor) -> Tensor: + r"""Evaluate Constrained Log Probability of Feasibility on the candidate set X. + + Args: + X: A `(b) x 1 x d`-dim Tensor of `(b)` t-batches of `d`-dim design + points each. + + Returns: + A `(b)`-dim Tensor of Log Probability of Feasibility values at the given + design points `X`. + """ + means, sigmas = self._mean_and_sigma(X) # (b) x 1 + (m = num constraints) + return self._compute_log_prob_feas(means=means, sigmas=sigmas) + + +class ConstrainedExpectedImprovement( + AnalyticAcquisitionFunction, ConstrainedAnalyticAcquisitionFunctionMixin +): r"""Constrained Expected Improvement (feasibility-weighted). Computes the analytic expected improvement for a Normal posterior @@ -543,13 +713,12 @@ def __init__( """ legacy_ei_numerics_warning(legacy_name=type(self).__name__) # Use AcquisitionFunction constructor to avoid check for posterior transform. - super(AnalyticAcquisitionFunction, self).__init__(model=model) + AcquisitionFunction.__init__(self, model=model) self.posterior_transform = None self.maximize = maximize self.objective_index = objective_index - self.constraints = constraints self.register_buffer("best_f", torch.as_tensor(best_f)) - _preprocess_constraint_bounds(self, constraints=constraints) + ConstrainedAnalyticAcquisitionFunctionMixin.__init__(self, constraints) self.register_forward_pre_hook(convert_to_target_pre_hook) @t_batch_mode_transform(expected_q=1) @@ -569,7 +738,7 @@ def forward(self, X: Tensor) -> Tensor: mean_obj, sigma_obj = means[..., ind], sigmas[..., ind] u = _scaled_improvement(mean_obj, sigma_obj, self.best_f, self.maximize) ei = sigma_obj * _ei_helper(u) - log_prob_feas = _compute_log_prob_feas(self, means=means, sigmas=sigmas) + log_prob_feas = self._compute_log_prob_feas(means=means, sigmas=sigmas) return ei.mul(log_prob_feas.exp()) @@ -1131,82 +1300,3 @@ def _get_noiseless_fantasy_model( fantasy_model.likelihood.noise_covar.noise = Yvar return fantasy_model - - -def _preprocess_constraint_bounds( - acqf: LogConstrainedExpectedImprovement | ConstrainedExpectedImprovement, - constraints: dict[int, tuple[float | None, float | None]], -) -> None: - r"""Set up constraint bounds. - - Args: - constraints: A dictionary of the form `{i: [lower, upper]}`, where - `i` is the output index, and `lower` and `upper` are lower and upper - bounds on that output (resp. interpreted as -Inf / Inf if None) - """ - con_lower, con_lower_inds = [], [] - con_upper, con_upper_inds = [], [] - con_both, con_both_inds = [], [] - con_indices = list(constraints.keys()) - if len(con_indices) == 0: - raise ValueError("There must be at least one constraint.") - if acqf.objective_index in con_indices: - raise ValueError( - "Output corresponding to objective should not be a constraint." - ) - for k in con_indices: - if constraints[k][0] is not None and constraints[k][1] is not None: - if constraints[k][1] <= constraints[k][0]: - raise ValueError("Upper bound is less than the lower bound.") - con_both_inds.append(k) - con_both.append([constraints[k][0], constraints[k][1]]) - elif constraints[k][0] is not None: - con_lower_inds.append(k) - con_lower.append(constraints[k][0]) - elif constraints[k][1] is not None: - con_upper_inds.append(k) - con_upper.append(constraints[k][1]) - # tensor-based indexing is much faster than list-based advanced indexing - for name, indices in [ - ("con_lower_inds", con_lower_inds), - ("con_upper_inds", con_upper_inds), - ("con_both_inds", con_both_inds), - ("con_both", con_both), - ("con_lower", con_lower), - ("con_upper", con_upper), - ]: - acqf.register_buffer(name, tensor=torch.as_tensor(indices)) - - -def _compute_log_prob_feas( - acqf: LogConstrainedExpectedImprovement | ConstrainedExpectedImprovement, - means: Tensor, - sigmas: Tensor, -) -> Tensor: - r"""Compute logarithm of the feasibility probability for each batch of X. - - Args: - X: A `(b) x 1 x d`-dim Tensor of `(b)` t-batches of `d`-dim design - points each. - means: A `(b) x m`-dim Tensor of means. - sigmas: A `(b) x m`-dim Tensor of standard deviations. - Returns: - A `b`-dim tensor of log feasibility probabilities - - Note: This function does case-work for upper bound, lower bound, and both-sided - bounds. Another way to do it would be to use 'inf' and -'inf' for the - one-sided bounds and use the logic for the both-sided case. But this - causes an issue with autograd since we get 0 * inf. - TODO: Investigate further. - """ - acqf.to(device=means.device) - return compute_log_prob_feas_from_bounds( - acqf.con_lower_inds, - acqf.con_upper_inds, - acqf.con_both_inds, - acqf.con_lower, - acqf.con_upper, - acqf.con_both, - means, - sigmas, - ) diff --git a/botorch/acquisition/factory.py b/botorch/acquisition/factory.py index 33f638c709..d424496dc0 100644 --- a/botorch/acquisition/factory.py +++ b/botorch/acquisition/factory.py @@ -145,6 +145,16 @@ def get_acquisition_function( constraints=constraints, eta=eta, ) + elif acquisition_function_name == "qLogPF": + return logei.qLogProbabilityOfFeasibility( + model=model, + constraints=constraints, + sampler=sampler, + objective=objective, + posterior_transform=posterior_transform, + X_pending=X_pending, + eta=eta, + ) elif acquisition_function_name in ["qNEI", "qLogNEI"]: acqf_class = ( monte_carlo.qNoisyExpectedImprovement diff --git a/botorch/acquisition/input_constructors.py b/botorch/acquisition/input_constructors.py index 8257c806ee..8d9e4684e5 100644 --- a/botorch/acquisition/input_constructors.py +++ b/botorch/acquisition/input_constructors.py @@ -22,6 +22,7 @@ ExpectedImprovement, LogExpectedImprovement, LogNoisyExpectedImprovement, + LogProbabilityOfFeasibility, LogProbabilityOfImprovement, NoisyExpectedImprovement, PosteriorMean, @@ -42,6 +43,7 @@ from botorch.acquisition.logei import ( qLogExpectedImprovement, qLogNoisyExpectedImprovement, + qLogProbabilityOfFeasibility, TAU_MAX, TAU_RELU, ) @@ -336,6 +338,30 @@ def construct_inputs_best_f( } +@acqf_input_constructor( + LogProbabilityOfFeasibility, +) +def construct_inputs_pof( + model: Model, + constraints: dict[int, tuple[float | None, float | None]], +) -> dict[str, Any]: + r"""Construct kwargs for the log probability of feasibility acquisition function. + + Args: + model: The model to be used in the acquisition function. + constraints: A dictionary of the form `{i: [lower, upper]}`, where `i` is the + output index, and `lower` and `upper` are lower and upper bounds on that + output (resp. interpreted as -Inf / Inf if None). + + Returns: + A dict mapping kwarg names of the constructor to values. + """ + return { + "model": model, + "constraints": constraints, + } + + @acqf_input_constructor(UpperConfidenceBound) def construct_inputs_ucb( model: Model, @@ -569,6 +595,55 @@ def construct_inputs_qLogEI( } +@acqf_input_constructor(qLogProbabilityOfFeasibility) +def construct_inputs_LogPF( + model: Model, + constraints: list[Callable[[Tensor], Tensor]], + posterior_transform: PosteriorTransform | None = None, + X_pending: Tensor | None = None, + sampler: MCSampler | None = None, + eta: Tensor | float = 1e-3, + fat: bool = True, + tau_max: float = TAU_MAX, +) -> dict[str, Any]: + r"""Construct kwargs for the `qExpectedImprovement` constructor. + + Args: + model: The model to be used in the acquisition function. + constraints: A list of constraint callables which map a Tensor of posterior + samples of dimension `sample_shape x batch-shape x q x m`-dim to a + `sample_shape x batch-shape x q`-dim Tensor. The associated constraints + are considered satisfied if the output is less than zero. + posterior_transform: The posterior transform to be used in the + acquisition function. + X_pending: A `m x d`-dim Tensor of `m` design points that have been + submitted for function evaluation but have not yet been evaluated. + Concatenated into X upon forward call. + sampler: The sampler used to draw base samples. If omitted, uses + the acquisition functions's default sampler. + eta: Temperature parameter(s) governing the smoothness of the sigmoid + approximation to the constraint indicators. For more details, on this + parameter, see the docs of `compute_smoothed_feasibility_indicator`. + fat: Toggles the logarithmic / linear asymptotic behavior of the smooth + approximation to the ReLU. + tau_max: Temperature parameter controlling the sharpness of the smooth + approximations to max. + + Returns: + A dictionary mapping kwarg names of the constructor to values. + """ + return { + "model": model, + "constraints": constraints, + "posterior_transform": posterior_transform, + "X_pending": X_pending, + "sampler": sampler, + "eta": eta, + "fat": fat, + "tau_max": tau_max, + } + + @acqf_input_constructor(qNoisyExpectedImprovement) def construct_inputs_qNEI( model: Model, diff --git a/botorch/acquisition/logei.py b/botorch/acquisition/logei.py index 3deae8c4df..62821e461e 100644 --- a/botorch/acquisition/logei.py +++ b/botorch/acquisition/logei.py @@ -27,6 +27,7 @@ from botorch.acquisition.monte_carlo import SampleReducingMCAcquisitionFunction from botorch.acquisition.objective import ( ConstrainedMCObjective, + IdentityMCObjective, MCAcquisitionObjective, PosteriorTransform, ) @@ -37,6 +38,7 @@ from botorch.exceptions.errors import BotorchError from botorch.models.model import Model from botorch.sampling.base import MCSampler +from botorch.utils.objective import compute_smoothed_feasibility_indicator from botorch.utils.safe_math import ( fatmax, log_fatplus, @@ -86,7 +88,8 @@ def __init__( fat: bool = True, tau_max: float = TAU_MAX, ) -> None: - r""" + r"""Constructor of the base class for LogEI acquisition functions. + Args: model: A fitted model. sampler: The sampler used to draw base samples. If not given, @@ -570,6 +573,98 @@ def _compute_best_feasible_objective(self, samples: Tensor, obj: Tensor) -> Tens ) +class qLogProbabilityOfFeasibility(LogImprovementMCAcquisitionFunction): + r"""MC-based batch LogProbabilityOfFeasibility. + + This computes the log probability of feasibility by + (1) sampling the joint posterior over q points + (2) evaluating the feasibility of each sample + (3) averaging over the sample and batch dimensions. + + `log_prob_feas(X) = log(P(f(X) <= 0)), where f(X) ~ GP`. + """ + + def __init__( + self, + model: Model, + constraints: list[Callable[[Tensor], Tensor]], + sampler: MCSampler | None = None, + objective: MCAcquisitionObjective | None = None, + posterior_transform: PosteriorTransform | None = None, + X_pending: Tensor | None = None, + eta: Tensor | float = 1e-3, + fat: bool = True, + tau_max: float = TAU_MAX, + ): + r"""Constructor of the batch log probability of feasibility acquisition + function. + + Args: + model: A fitted model. + constraints: A list of constraint callables which map a Tensor of posterior + samples of dimension `sample_shape x batch-shape x q x m`-dim to a + `sample_shape x batch-shape x q`-dim Tensor. The associated constraints + are satisfied if `constraint(samples) < 0`. + sampler: The sampler used to draw base samples. If not given, + a sampler is generated using `get_sampler`. + NOTE: For posteriors that do not support base samples, + a sampler compatible with intended use case must be provided. + See `ForkedRNGSampler` and `StochasticSampler` as examples. + objective: Not used, kept for compatibility with interface. + posterior_transform: A PosteriorTransform (optional). + X_pending: A `batch_shape, m x d`-dim Tensor of `m` design points + that have points that have been submitted for function evaluation + but have not yet been evaluated. + eta: Temperature parameter(s) governing the smoothness of the sigmoid + approximation to the constraint indicators. See the docs of + `compute_(log_)constraint_indicator` for more details on this parameter. + fat: Toggles the logarithmic / linear asymptotic behavior of the smooth + approximation to the ReLU. + tau_max: Temperature parameter controlling the sharpness of the + approximation to the `max` operator over the `q` candidate points. + """ + super().__init__( + model=model, + sampler=sampler, + # theoretically we don't need an objective, but MCAcquisitionFunction will + # throw an error if the model has multiple outputs and the objective is None + objective=IdentityMCObjective(), + posterior_transform=posterior_transform, + X_pending=X_pending, + constraints=constraints, + eta=eta, + fat=fat, + tau_max=tau_max, + ) + + def _non_reduced_forward(self, X: Tensor) -> Tensor: + """Compute the feasibility values at the MC-sample and batch (q) level. + + Args: + X: A `batch_shape x q x d` Tensor of t-batches with `q` `d`-dim + design points each. + + Returns: + A Tensor with shape `sample_sample x batch_shape x q`. + """ + posterior = self.model.posterior( + X=X, posterior_transform=self.posterior_transform + ) + return compute_smoothed_feasibility_indicator( + constraints=self._constraints, + samples=self.get_posterior_samples(posterior), + eta=self._eta, + log=self._log, + fat=self._fat, + ) + + def _sample_forward(self, obj: Tensor) -> Tensor: + """Not necessary, since we are implementing `_non_reduced_forward` without it. + Need to provide an implementation to avoid an abstract method error. + """ + raise NotImplementedError("This should be dead code.") # pragma: no cover + + """ ###################################### utils ########################################## """ diff --git a/botorch/utils/probability/utils.py b/botorch/utils/probability/utils.py index 3db4a5c2b5..55da40842e 100644 --- a/botorch/utils/probability/utils.py +++ b/botorch/utils/probability/utils.py @@ -355,9 +355,19 @@ def compute_log_prob_feas_from_bounds( equal in dimension to con_upper_inds. con_both: 2d Tensor of "both" bounds on the constraints equal in length to con_both_inds. + Returns: A `b`-dim tensor of log feasibility probabilities """ + # indices are integers, so we don't cast the type + con_upper_inds = con_upper_inds.to(device=means.device) + con_lower_inds = con_lower_inds.to(device=means.device) + con_both_inds = con_both_inds.to(device=means.device) + + con_lower = con_lower.to(device=means.device, dtype=means.dtype) + con_upper = con_upper.to(device=means.device, dtype=means.dtype) + con_both = con_both.to(device=means.device, dtype=means.dtype) + log_prob = torch.zeros_like(means[..., 0]) if len(con_lower_inds) > 0: i = con_lower_inds diff --git a/test/acquisition/test_analytic.py b/test/acquisition/test_analytic.py index dcee53e8a4..0e43653877 100644 --- a/test/acquisition/test_analytic.py +++ b/test/acquisition/test_analytic.py @@ -6,21 +6,23 @@ import itertools import math +from unittest import mock from warnings import catch_warnings, simplefilter import torch from botorch.acquisition import qAnalyticProbabilityOfImprovement from botorch.acquisition.analytic import ( _check_noisy_ei_model, - _compute_log_prob_feas, _ei_helper, _log_ei_helper, AnalyticAcquisitionFunction, + ConstrainedAnalyticAcquisitionFunctionMixin, ConstrainedExpectedImprovement, ExpectedImprovement, LogConstrainedExpectedImprovement, LogExpectedImprovement, LogNoisyExpectedImprovement, + LogProbabilityOfFeasibility, LogProbabilityOfImprovement, NoisyExpectedImprovement, PosteriorMean, @@ -48,6 +50,7 @@ ) from gpytorch.module import Module from gpytorch.priors.torch_priors import GammaPrior +from torch import Tensor NEI_NOISE = [ @@ -613,8 +616,105 @@ def test_upper_confidence_bound_batch(self): UpperConfidenceBound(model=mm2, beta=1.0) +class ConstrainedAnalyticAcquisitionFunction( + ConstrainedAnalyticAcquisitionFunctionMixin +): + """Derivative of the mixin class which implements register_buffer without inheriting + from nn.Module, in order to test the methods of the mixin class in isolation. + """ + + def register_buffer(self, name: str, tensor: Tensor) -> None: + setattr(self, name, tensor) + + class TestConstrainedExpectedImprovement(BotorchTestCase): - def test_constrained_expected_improvement(self): + def test_constrained_analytic_acquisition_function_mixin(self) -> None: + for dtype in (torch.float, torch.double): + with self.subTest(dtype=dtype): + self._test_constrained_analytic_acquisition_function_mixin(dtype=dtype) + + def _test_constrained_analytic_acquisition_function_mixin( + self, dtype: torch.dtype + ) -> None: + # numerical stress test for _compute_log_prob_feas, which gets added to + # log_ei in the forward pass, a quantity we already tested above + # the limits here are determined by the largest power of ten x, such that + # x - (b - a) < x + # evaluates to true. In this test, the bounds are a, b = -digits, digits. + means, sigmas, X_positive = self._get_numerical_stress_test_data(dtype=dtype) + constraints = {0: [-5, 5]} + + mixin = ConstrainedAnalyticAcquisitionFunction(constraints=constraints) + # test initialization + for k in [ + "con_lower_inds", + "con_upper_inds", + "con_both_inds", + "con_both", + "con_lower", + "con_upper", + ]: + self.assertTrue(hasattr(mixin, k)) + self.assertIsInstance(getattr(mixin, k), torch.Tensor) + + log_prob = mixin._compute_log_prob_feas(means=means, sigmas=sigmas) + log_prob.sum().backward() + + self.assertFalse(log_prob.isnan().any()) + self.assertFalse(log_prob.isinf().any()) + self.assertFalse(means.grad.isnan().any()) + self.assertFalse(means.grad.isinf().any()) + # probability of feasibility increases until X = 0, decreases from there on + prob_diff = log_prob.diff() + k = len(X_positive) + eps = 1e-6 if dtype == torch.float32 else 1e-15 + self.assertTrue((prob_diff[:k] > -eps).all()) + self.assertTrue((means.grad[:k] > -eps).all()) + # probability has stationary point at zero + mean_grad_at_zero = means.grad[len(X_positive)] + self.assertTrue( + torch.allclose(mean_grad_at_zero, torch.zeros_like(mean_grad_at_zero)) + ) + # probability increases again + self.assertTrue((prob_diff[-k:] < eps).all()) + self.assertTrue((means.grad[-k:] < eps).all()) + + def _get_numerical_stress_test_data(self, dtype: torch.dtype) -> Tensor: + digits = 10 if dtype == torch.float64 else 5 + zero = torch.tensor([0], dtype=dtype, device=self.device) + ten = torch.tensor(10, dtype=dtype, device=self.device) + digits_tensor = 1 + torch.arange( + -digits, digits, dtype=dtype, device=self.device + ) + X_positive = ten ** (digits_tensor) + # flipping -X_positive so that elements are in increasing order + means = torch.cat((-X_positive.flip(-1), zero, X_positive)).unsqueeze(-1) + means.requires_grad = True + sigmas = torch.ones_like(means) + return means, sigmas, X_positive + + def test_log_probability_of_feasibility(self) -> None: + for dtype in (torch.float, torch.double): + with self.subTest(dtype=dtype): + self._test_log_probability_of_feasibility(dtype=dtype) + + def _test_log_probability_of_feasibility(self, dtype: torch.dtype) -> None: + mean = torch.tensor([[-0.5]], device=self.device) + variance = torch.ones(1, 1, device=self.device) + model = MockModel(MockPosterior(mean=mean, variance=variance)) + + means, sigmas, _ = self._get_numerical_stress_test_data(dtype=dtype) + constraints = {0: [-5, 5]} + mixin = ConstrainedAnalyticAcquisitionFunction(constraints=constraints) + acqf = LogProbabilityOfFeasibility(model=model, constraints=constraints) + X = torch.randn_like(means).unsqueeze(-2) # n x q=1 x d=1 + with mock.patch.object(acqf, "_mean_and_sigma", return_value=(means, sigmas)): + acq_val = acqf(X) + log_prob = mixin._compute_log_prob_feas(means=means, sigmas=sigmas) + + self.assertAllClose(acq_val, log_prob, atol=1e-4) + + def test_constrained_expected_improvement(self) -> None: mean = torch.tensor([[-0.5]], device=self.device) variance = torch.ones(1, 1, device=self.device) model = MockModel(MockPosterior(mean=mean, variance=variance)) @@ -648,7 +748,6 @@ def _test_constrained_expected_improvement(self, dtype: torch.dtype) -> None: } module = ConstrainedExpectedImprovement(**kwargs) log_module = LogConstrainedExpectedImprovement(**kwargs) - # test initialization for k in [ "con_lower_inds", @@ -759,50 +858,6 @@ def _test_constrained_expected_improvement(self, dtype: torch.dtype) -> None: constraints={1: [1.0, -1.0]}, ) - # numerical stress test for _compute_log_prob_feas, which gets added to - # log_ei in the forward pass, a quantity we already tested above - # the limits here are determined by the largest power of ten x, such that - # x - (b - a) < x - # evaluates to true. In this test, the bounds are a, b = -digits, digits. - digits = 10 if dtype == torch.float64 else 5 - zero = torch.tensor([0], dtype=dtype, device=self.device) - ten = torch.tensor(10, dtype=dtype, device=self.device) - digits_tensor = 1 + torch.arange( - -digits, digits, dtype=dtype, device=self.device - ) - X_positive = ten ** (digits_tensor) - # flipping -X_positive so that elements are in increasing order - means = torch.cat((-X_positive.flip(-1), zero, X_positive)).unsqueeze(-1) - means.requires_grad = True - log_module = LogConstrainedExpectedImprovement( - model=mm, - best_f=0.0, - objective_index=1, - constraints={0: [-5, 5]}, - ) - log_prob = _compute_log_prob_feas( - log_module, means=means, sigmas=torch.ones_like(means) - ) - log_prob.sum().backward() - self.assertFalse(log_prob.isnan().any()) - self.assertFalse(log_prob.isinf().any()) - self.assertFalse(means.grad.isnan().any()) - self.assertFalse(means.grad.isinf().any()) - # probability of feasibility increases until X = 0, decreases from there on - prob_diff = log_prob.diff() - k = len(X_positive) - eps = 1e-6 if dtype == torch.float32 else 1e-15 - self.assertTrue((prob_diff[:k] > -eps).all()) - self.assertTrue((means.grad[:k] > -eps).all()) - # probability has stationary point at zero - mean_grad_at_zero = means.grad[len(X_positive)] - self.assertTrue( - torch.allclose(mean_grad_at_zero, torch.zeros_like(mean_grad_at_zero)) - ) - # probability increases again - self.assertTrue((prob_diff[-k:] < eps).all()) - self.assertTrue((means.grad[-k:] < eps).all()) - def test_constrained_expected_improvement_batch(self): for dtype in (torch.float, torch.double): with catch_warnings(): diff --git a/test/acquisition/test_factory.py b/test/acquisition/test_factory.py index 65aefe2285..687414f0da 100644 --- a/test/acquisition/test_factory.py +++ b/test/acquisition/test_factory.py @@ -694,3 +694,40 @@ def test_GetUnknownAcquisitionFunction(self): mc_samples=self.mc_samples, seed=self.seed, ) + + @mock.patch(f"{logei.__name__}.qLogProbabilityOfFeasibility") + def test_GetQLogPF(self, mock_acqf): + acqf_name = "qLogPF" + # basic test + n = len(self.X_observed) + mean = torch.arange(n, dtype=torch.double).view(-1, 1) + var = torch.ones_like(mean) + self.model = MockModel(MockPosterior(mean=mean, variance=var)) + common_kwargs = { + "model": self.model, + "objective": self.objective, + "X_observed": self.X_observed, + "X_pending": self.X_pending, + "mc_samples": self.mc_samples, + "seed": self.seed, + } + # with constraints + upper_bound = self.Y[0, 0] + 1 / 2 # = 1.5 + constraints = [lambda samples: samples[..., 0] - upper_bound] + eta = math.pi * 1e-2 # testing non-standard eta + acqf = get_acquisition_function( + acquisition_function_name=acqf_name, + **common_kwargs, + constraints=constraints, + eta=eta, + ) + self.assertEqual(acqf, mock_acqf.return_value) + mock_acqf.assert_called_with( + model=self.model, + constraints=constraints, + sampler=mock.ANY, + objective=self.objective, + posterior_transform=None, + X_pending=self.X_pending, + eta=eta, + ) diff --git a/test/acquisition/test_input_constructors.py b/test/acquisition/test_input_constructors.py index e20b2b0223..a986c4be0b 100644 --- a/test/acquisition/test_input_constructors.py +++ b/test/acquisition/test_input_constructors.py @@ -26,6 +26,7 @@ ExpectedImprovement, LogExpectedImprovement, LogNoisyExpectedImprovement, + LogProbabilityOfFeasibility, LogProbabilityOfImprovement, NoisyExpectedImprovement, PosteriorMean, @@ -56,6 +57,7 @@ from botorch.acquisition.logei import ( qLogExpectedImprovement, qLogNoisyExpectedImprovement, + qLogProbabilityOfFeasibility, TAU_MAX, TAU_RELU, ) @@ -122,6 +124,7 @@ NondominatedPartitioning, ) from botorch.utils.testing import BotorchTestCase, MockModel, MockPosterior +from torch import Tensor class DummyAcquisitionFunction(AcquisitionFunction): ... @@ -597,6 +600,48 @@ def test_construct_inputs_mc_base(self) -> None: # TODO: Test passing through of sampler + def test_construct_inputs_qLogPOF(self) -> None: + c = get_acqf_input_constructor(qLogProbabilityOfFeasibility) + mock_model = self.mock_model + + def constraint(Y: Tensor) -> Tensor: + return Y[..., 0] - 0.5 + + kwargs = c(model=mock_model, constraints=[constraint]) + self.assertEqual( + set(kwargs.keys()), + { + "model", + "constraints", + "posterior_transform", + "X_pending", + "sampler", + "eta", + "fat", + "tau_max", + }, + ) + + self.assertIs(kwargs["model"], mock_model) + self.assertListEqual(kwargs["constraints"], [constraint]) + self.assertIsNone(kwargs["posterior_transform"]) + self.assertIsNone(kwargs["X_pending"]) + self.assertIsNone(kwargs["sampler"]) + self.assertIsInstance(kwargs["eta"], float) + self.assertGreater(kwargs["eta"], 0.0) + self.assertTrue(kwargs["fat"]) + self.assertIsInstance(kwargs["tau_max"], float) + self.assertGreater(kwargs["tau_max"], 0.0) + + def test_construct_inputs_LogPOF(self) -> None: + c = get_acqf_input_constructor(LogProbabilityOfFeasibility) + mock_model = self.mock_model + constraints = {1: [None, 0]} + kwargs = c(model=mock_model, constraints=constraints) + self.assertEqual(set(kwargs.keys()), {"model", "constraints"}) + self.assertIs(kwargs["model"], mock_model) + self.assertEqual(kwargs["constraints"], constraints) + def test_construct_inputs_qEI(self) -> None: c = get_acqf_input_constructor(qExpectedImprovement) mock_model = self.mock_model @@ -1768,6 +1813,20 @@ def setUp(self, suppress_input_warnings: bool = True) -> None: ], {"model": st_soo_model, "training_data": self.blockX_blockY}, ) + + self.cases["LogPoF"] = ( + [LogProbabilityOfFeasibility], + {"model": st_soo_model, "constraints": {0: [-5, 5]}}, + ) + + def constraint(X: Tensor) -> Tensor: + return X[..., 0].abs() - 5 + + self.cases["qLogPoF"] = ( + [qLogProbabilityOfFeasibility], + {"model": st_soo_model, "constraints": [constraint]}, + ) + bounds = torch.ones((1, 2)) kg_model = SingleTaskGP(train_X=torch.rand((3, 1)), train_Y=torch.rand((3, 1))) self.cases["Look-ahead"] = ( diff --git a/test/acquisition/test_logei.py b/test/acquisition/test_logei.py index 667ee3e0c0..09bd67d253 100644 --- a/test/acquisition/test_logei.py +++ b/test/acquisition/test_logei.py @@ -24,6 +24,7 @@ NoisyExpectedImprovement, ) from botorch.acquisition.input_constructors import ACQF_INPUT_CONSTRUCTOR_REGISTRY +from botorch.acquisition.logei import qLogProbabilityOfFeasibility from botorch.acquisition.monte_carlo import ( qExpectedImprovement, qNoisyExpectedImprovement, @@ -45,6 +46,7 @@ from botorch.models import ModelListGP, SingleTaskGP from botorch.sampling.normal import IIDNormalSampler, SobolQMCNormalSampler from botorch.utils.low_rank import sample_cached_cholesky +from botorch.utils.objective import compute_smoothed_feasibility_indicator from botorch.utils.testing import BotorchTestCase, MockModel, MockPosterior from botorch.utils.transforms import standardize from torch import Tensor @@ -760,6 +762,87 @@ def test_cache_root(self): # TODO: Test different objectives (incl. constraints) +class TestQLogProbabilityOfFeasibility(BotorchTestCase): + def test_q_log_probability_of_feasibility(self): + self.assertIn( + qLogProbabilityOfFeasibility, ACQF_INPUT_CONSTRUCTOR_REGISTRY.keys() + ) + for dtype in (torch.float, torch.double): + with self.subTest(dtype=dtype): + self._test_q_log_probability_of_feasibility(dtype=dtype) + + def _test_q_log_probability_of_feasibility(self, dtype=torch.double): + tkwargs = {"device": self.device, "dtype": dtype} + sample_shape = torch.Size([2]) + samples = torch.zeros(1, 1, **tkwargs) + mm = MockModel(MockPosterior(samples=samples)) + # X is `q x d` = 1 x 1. X is a dummy and unused b/c of mocking + X = torch.zeros(1, 1, **tkwargs) + + # basic test + torch.manual_seed(1234) + sampler = IIDNormalSampler(sample_shape=sample_shape) + + def satisfied_constraint(Y: Tensor) -> Tensor: + return torch.full_like(Y[..., 0], 10.0) + + def barely_satisfied_constraint(Y: Tensor) -> Tensor: + return torch.full_like(Y[..., 0], 3 * eta) + + def uncertain_constraint(Y: Tensor) -> Tensor: + return torch.full_like(Y[..., 0], 0.0) + + def barely_unsatisfied_constraint(Y: Tensor) -> Tensor: + return torch.full_like(Y[..., 0], -torch.pi * eta) + + acqf = qLogProbabilityOfFeasibility( + model=mm, + constraints=[satisfied_constraint], + sampler=sampler, + ) + # test initialization + self.assertTrue(acqf._fat) # default behavior + self.assertIn("sampler", acqf._modules) + self.assertIn("objective", acqf._modules) + # objective is always set to IdentityMCObjective + self.assertIsInstance(acqf.objective, IdentityMCObjective) + self.assertTrue(acqf._log) + + constraints_list = [ + [barely_satisfied_constraint, uncertain_constraint], + [barely_unsatisfied_constraint, barely_satisfied_constraint], + [ + barely_unsatisfied_constraint, + uncertain_constraint, + barely_satisfied_constraint, + ], + [uncertain_constraint], + [satisfied_constraint], + ] + + eta = torch.pi * 1e-3 # not the default + fat = True + for constraints in constraints_list: + acqf = qLogProbabilityOfFeasibility( + model=mm, + constraints=constraints, + sampler=sampler, + eta=eta, + ) + res = acqf(X) + log_sample_feasibility = compute_smoothed_feasibility_indicator( + constraints=constraints, + samples=samples.expand(*sample_shape, *samples.shape), + eta=eta, + log=True, + fat=fat, + ) + log_feasibility = acqf._sample_reduction( + acqf._q_reduction(log_sample_feasibility) + ) + self.assertAllClose(res, log_feasibility) + + class TestIsLog(BotorchTestCase): def test_is_log(self): # the flag is False by default @@ -790,6 +873,9 @@ def test_is_log(self): acqf = acqf_class(model, X) self.assertFalse(acqf._log) + # probability of feasibility + self.assertTrue(qLogProbabilityOfFeasibility._log) + # multi-objective case model_list = ModelListGP(model, model) ref_point = [4, 2] # the meaning of life