Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
270 changes: 180 additions & 90 deletions botorch/acquisition/analytic.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

import math

from abc import ABC
from abc import ABC, abstractmethod
from contextlib import nullcontext
from copy import deepcopy

Expand Down Expand Up @@ -415,7 +415,112 @@ def forward(self, X: Tensor) -> Tensor:
return _log_ei_helper(u) + sigma.log()


class LogConstrainedExpectedImprovement(AnalyticAcquisitionFunction):
class ConstrainedAnalyticAcquisitionFunctionMixin(ABC):
r"""Base class for constrained analytic acquisition functions."""

def __init__(
self,
constraints: dict[int, tuple[float | None, float | None]],
) -> None:
r"""Analytic Log Probability of Feasibility.

Args:
model: A fitted multi-output model.
constraints: A dictionary of the form `{i: [lower, upper]}`, where
`i` is the output index, and `lower` and `upper` are lower and upper
bounds on that output (resp. interpreted as -Inf / Inf if None).
"""
self.constraints = constraints
self._preprocess_constraint_bounds(constraints=constraints)

@abstractmethod
def register_buffer(self, name: str, value: Tensor) -> None:
"""Add a buffer that can be accessed by `self.name` and stores the Tensor
`value`, usually provided by derivatives that also inherit from `nn.Module`.
"""

def _preprocess_constraint_bounds(
self,
constraints: dict[int, tuple[float | None, float | None]],
) -> None:
r"""Set up constraint bounds.

Args:
constraints: A dictionary of the form `{i: [lower, upper]}`, where
`i` is the output index, and `lower` and `upper` are lower and upper
bounds on that output (resp. interpreted as -Inf / Inf if None)
"""
con_lower, con_lower_inds = [], []
con_upper, con_upper_inds = [], []
con_both, con_both_inds = [], []
con_indices = list(constraints.keys())
if len(con_indices) == 0:
raise ValueError("There must be at least one constraint.")
# CEI, LogCEI have an objective index, but LogPOF does not.
if hasattr(self, "objective_index") and self.objective_index in con_indices:
raise ValueError(
"Output corresponding to objective should not be a constraint."
)
for k in con_indices:
if constraints[k][0] is not None and constraints[k][1] is not None:
if constraints[k][1] <= constraints[k][0]:
raise ValueError("Upper bound is less than the lower bound.")
con_both_inds.append(k)
con_both.append([constraints[k][0], constraints[k][1]])
elif constraints[k][0] is not None:
con_lower_inds.append(k)
con_lower.append(constraints[k][0])
elif constraints[k][1] is not None:
con_upper_inds.append(k)
con_upper.append(constraints[k][1])

for name, value in [
("con_lower_inds", con_lower_inds),
("con_upper_inds", con_upper_inds),
("con_both_inds", con_both_inds),
("con_both", con_both),
("con_lower", con_lower),
("con_upper", con_upper),
]:
# tensor-based indexing is much faster than list-based advanced indexing
self.register_buffer(name, tensor=torch.as_tensor(value))

def _compute_log_prob_feas(
self,
means: Tensor,
sigmas: Tensor,
) -> Tensor:
r"""Compute logarithm of the feasibility probability for each batch of X.

Args:
X: A `(b) x 1 x d`-dim Tensor of `(b)` t-batches of `d`-dim design
points each.
means: A `(b) x m`-dim Tensor of means.
sigmas: A `(b) x m`-dim Tensor of standard deviations.

Returns:
A `b`-dim tensor of log feasibility probabilities

Note: This function does case-work for upper bound, lower bound, and both-sided
bounds. Another way to do it would be to use 'inf' and -'inf' for the
one-sided bounds and use the logic for the both-sided case. But this
causes an issue with autograd since we get 0 * inf.
"""
return compute_log_prob_feas_from_bounds(
con_lower_inds=self.con_lower_inds,
con_upper_inds=self.con_upper_inds,
con_both_inds=self.con_both_inds,
con_lower=self.con_lower,
con_upper=self.con_upper,
con_both=self.con_both,
means=means,
sigmas=sigmas,
)


class LogConstrainedExpectedImprovement(
AnalyticAcquisitionFunction, ConstrainedAnalyticAcquisitionFunctionMixin
):
r"""Log Constrained Expected Improvement (feasibility-weighted).

Computes the logarithm of the analytic expected improvement for a Normal posterior
Expand Down Expand Up @@ -464,13 +569,12 @@ def __init__(
maximize: If True, consider the problem a maximization problem.
"""
# Use AcquisitionFunction constructor to avoid check for posterior transform.
super(AnalyticAcquisitionFunction, self).__init__(model=model)
AcquisitionFunction.__init__(self, model=model)
self.posterior_transform = None
self.maximize = maximize
self.objective_index = objective_index
self.constraints = constraints
self.register_buffer("best_f", torch.as_tensor(best_f))
_preprocess_constraint_bounds(self, constraints=constraints)
ConstrainedAnalyticAcquisitionFunctionMixin.__init__(self, constraints)
self.register_forward_pre_hook(convert_to_target_pre_hook)

@t_batch_mode_transform(expected_q=1)
Expand All @@ -490,11 +594,77 @@ def forward(self, X: Tensor) -> Tensor:
mean_obj, sigma_obj = means[..., ind], sigmas[..., ind]
u = _scaled_improvement(mean_obj, sigma_obj, self.best_f, self.maximize)
log_ei = _log_ei_helper(u) + sigma_obj.log()
log_prob_feas = _compute_log_prob_feas(self, means=means, sigmas=sigmas)
log_prob_feas = self._compute_log_prob_feas(means=means, sigmas=sigmas)
return log_ei + log_prob_feas


class ConstrainedExpectedImprovement(AnalyticAcquisitionFunction):
class LogProbabilityOfFeasibility(
AnalyticAcquisitionFunction, ConstrainedAnalyticAcquisitionFunctionMixin
):
r"""Log Probability of Feasbility.

Computes the logarithm of the analytic probability of feasibility for a Normal
posterior distribution weighted by a probability of feasibility. The objective and
constraints are assumed to be independent and have Gaussian posterior
distributions. Only supports non-batch mode (i.e. `q=1`). The model should be
multi-outcome, with the index of the objective and constraints passed to
the constructor.

See [Ament2023logei]_ for details. Formally,

`LogPF(x) = Sum_i log(P(y_i \in [lower_i, upper_i]))`,

where `y_i ~ constraint_i(x)` and `lower_i`, `upper_i` are the lower and
upper bounds for the i-th constraint, respectively.

Example:
# example where the 0th output has a non-negativity constraint
>>> model = SingleTaskGP(train_X, train_Y)
>>> constraints = {0: (0.0, None)}
>>> LogPOF = LogProbabilityOfFeasibility(model, constraints)
>>> cei = LogPF(test_X)
"""

_log: bool = True

def __init__(
self,
model: Model,
constraints: dict[int, tuple[float | None, float | None]],
) -> None:
r"""Analytic Log Probability of Feasibility.

Args:
model: A fitted multi-output model.
constraints: A dictionary of the form `{i: [lower, upper]}`, where
`i` is the output index, and `lower` and `upper` are lower and upper
bounds on that output (resp. interpreted as -Inf / Inf if None)
"""
# Use AcquisitionFunction constructor to avoid check for posterior transform.
AcquisitionFunction.__init__(self, model=model)
self.posterior_transform = None
ConstrainedAnalyticAcquisitionFunctionMixin.__init__(self, constraints)
self.register_forward_pre_hook(convert_to_target_pre_hook)

@t_batch_mode_transform(expected_q=1)
def forward(self, X: Tensor) -> Tensor:
r"""Evaluate Constrained Log Probability of Feasibility on the candidate set X.

Args:
X: A `(b) x 1 x d`-dim Tensor of `(b)` t-batches of `d`-dim design
points each.

Returns:
A `(b)`-dim Tensor of Log Probability of Feasibility values at the given
design points `X`.
"""
means, sigmas = self._mean_and_sigma(X) # (b) x 1 + (m = num constraints)
return self._compute_log_prob_feas(means=means, sigmas=sigmas)


class ConstrainedExpectedImprovement(
AnalyticAcquisitionFunction, ConstrainedAnalyticAcquisitionFunctionMixin
):
r"""Constrained Expected Improvement (feasibility-weighted).

Computes the analytic expected improvement for a Normal posterior
Expand Down Expand Up @@ -543,13 +713,12 @@ def __init__(
"""
legacy_ei_numerics_warning(legacy_name=type(self).__name__)
# Use AcquisitionFunction constructor to avoid check for posterior transform.
super(AnalyticAcquisitionFunction, self).__init__(model=model)
AcquisitionFunction.__init__(self, model=model)
self.posterior_transform = None
self.maximize = maximize
self.objective_index = objective_index
self.constraints = constraints
self.register_buffer("best_f", torch.as_tensor(best_f))
_preprocess_constraint_bounds(self, constraints=constraints)
ConstrainedAnalyticAcquisitionFunctionMixin.__init__(self, constraints)
self.register_forward_pre_hook(convert_to_target_pre_hook)

@t_batch_mode_transform(expected_q=1)
Expand All @@ -569,7 +738,7 @@ def forward(self, X: Tensor) -> Tensor:
mean_obj, sigma_obj = means[..., ind], sigmas[..., ind]
u = _scaled_improvement(mean_obj, sigma_obj, self.best_f, self.maximize)
ei = sigma_obj * _ei_helper(u)
log_prob_feas = _compute_log_prob_feas(self, means=means, sigmas=sigmas)
log_prob_feas = self._compute_log_prob_feas(means=means, sigmas=sigmas)
return ei.mul(log_prob_feas.exp())


Expand Down Expand Up @@ -1131,82 +1300,3 @@ def _get_noiseless_fantasy_model(
fantasy_model.likelihood.noise_covar.noise = Yvar

return fantasy_model


def _preprocess_constraint_bounds(
acqf: LogConstrainedExpectedImprovement | ConstrainedExpectedImprovement,
constraints: dict[int, tuple[float | None, float | None]],
) -> None:
r"""Set up constraint bounds.

Args:
constraints: A dictionary of the form `{i: [lower, upper]}`, where
`i` is the output index, and `lower` and `upper` are lower and upper
bounds on that output (resp. interpreted as -Inf / Inf if None)
"""
con_lower, con_lower_inds = [], []
con_upper, con_upper_inds = [], []
con_both, con_both_inds = [], []
con_indices = list(constraints.keys())
if len(con_indices) == 0:
raise ValueError("There must be at least one constraint.")
if acqf.objective_index in con_indices:
raise ValueError(
"Output corresponding to objective should not be a constraint."
)
for k in con_indices:
if constraints[k][0] is not None and constraints[k][1] is not None:
if constraints[k][1] <= constraints[k][0]:
raise ValueError("Upper bound is less than the lower bound.")
con_both_inds.append(k)
con_both.append([constraints[k][0], constraints[k][1]])
elif constraints[k][0] is not None:
con_lower_inds.append(k)
con_lower.append(constraints[k][0])
elif constraints[k][1] is not None:
con_upper_inds.append(k)
con_upper.append(constraints[k][1])
# tensor-based indexing is much faster than list-based advanced indexing
for name, indices in [
("con_lower_inds", con_lower_inds),
("con_upper_inds", con_upper_inds),
("con_both_inds", con_both_inds),
("con_both", con_both),
("con_lower", con_lower),
("con_upper", con_upper),
]:
acqf.register_buffer(name, tensor=torch.as_tensor(indices))


def _compute_log_prob_feas(
acqf: LogConstrainedExpectedImprovement | ConstrainedExpectedImprovement,
means: Tensor,
sigmas: Tensor,
) -> Tensor:
r"""Compute logarithm of the feasibility probability for each batch of X.

Args:
X: A `(b) x 1 x d`-dim Tensor of `(b)` t-batches of `d`-dim design
points each.
means: A `(b) x m`-dim Tensor of means.
sigmas: A `(b) x m`-dim Tensor of standard deviations.
Returns:
A `b`-dim tensor of log feasibility probabilities

Note: This function does case-work for upper bound, lower bound, and both-sided
bounds. Another way to do it would be to use 'inf' and -'inf' for the
one-sided bounds and use the logic for the both-sided case. But this
causes an issue with autograd since we get 0 * inf.
TODO: Investigate further.
"""
acqf.to(device=means.device)
return compute_log_prob_feas_from_bounds(
acqf.con_lower_inds,
acqf.con_upper_inds,
acqf.con_both_inds,
acqf.con_lower,
acqf.con_upper,
acqf.con_both,
means,
sigmas,
)
10 changes: 10 additions & 0 deletions botorch/acquisition/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,16 @@ def get_acquisition_function(
constraints=constraints,
eta=eta,
)
elif acquisition_function_name == "qLogPF":
return logei.qLogProbabilityOfFeasibility(
model=model,
constraints=constraints,
sampler=sampler,
objective=objective,
posterior_transform=posterior_transform,
X_pending=X_pending,
eta=eta,
)
elif acquisition_function_name in ["qNEI", "qLogNEI"]:
acqf_class = (
monte_carlo.qNoisyExpectedImprovement
Expand Down
Loading