diff --git a/src/kerndisc/_discover.py b/src/kerndisc/_discover.py index 1a36589..3ebd140 100644 --- a/src/kerndisc/_discover.py +++ b/src/kerndisc/_discover.py @@ -6,6 +6,7 @@ from ._preprocessing import preprocess from ._util import n_best_scored_kernel_expressions from .evaluation import evaluate +from .evaluation.scoring import get_current_metric from .expansion import expand_kernel_expressions @@ -63,7 +64,7 @@ def discover(x: np.ndarray, y: np.ndarray, search_depth: int=10, kernels_per_dep } for k_exp in expanded_exps if k_exp not in scored_kernel_expressions } - _LOGGER.info(f'Depth `{depth}`: Scoring unscored kernels.') + _LOGGER.info(f'Depth `{depth}`: Scoring unscored kernels using metric `{get_current_metric()}`.') if unscored_exps: for k_exp, score in evaluate(x, y, list(unscored_exps)): diff --git a/src/kerndisc/evaluation/_evaluate.py b/src/kerndisc/evaluation/_evaluate.py index 0e0b66a..a48922a 100644 --- a/src/kerndisc/evaluation/_evaluate.py +++ b/src/kerndisc/evaluation/_evaluate.py @@ -9,7 +9,7 @@ from ._util import add_jitter_to_model from .scoring import score_model -from ..expansion.grammars import get_parser, get_transformer +from ..expansion.grammars import get_builder _CORES = int(os.environ.get('CORES', 1)) @@ -85,8 +85,7 @@ def _make_evaluator(x: np.ndarray, y: np.ndarray, add_jitter: bool) -> Callable: Evaluates a kernel expression passed to it. """ - parser = get_parser() - transformer = get_transformer() + build = get_builder() optimizer = gpflow.train.ScipyOptimizer() def _evaluator(kernel_expression) -> Tuple[str, float]: @@ -101,8 +100,7 @@ def _evaluator(kernel_expression) -> Tuple[str, float]: Kernel expression to be evaluated. """ - kernel = transformer.transform(parser.parse(kernel_expression)) - model = gpflow.models.GPR(x, y, kern=kernel) + model = gpflow.models.GPR(x, y, kern=build(kernel_expression)) if add_jitter: add_jitter_to_model(model) diff --git a/src/kerndisc/expansion/grammars/__init__.py b/src/kerndisc/expansion/grammars/__init__.py index 31c0bb3..ad07b20 100644 --- a/src/kerndisc/expansion/grammars/__init__.py +++ b/src/kerndisc/expansion/grammars/__init__.py @@ -91,6 +91,28 @@ def get_transformer() -> Transformer: return _GRAMMARS[get_current_grammar()]['transformer']() +@lru_cache(maxsize=1) +def get_builder() -> Callable: + """Get a builder that parses and transforms a kernel expression. + + Mostly a utility function that combines `parser.parse` and `transformer.transform`. + + Returns + ------- + builder: Callable + Builder to parse and transform (build) a kernel in one step on call. + + + """ + parser = get_parser() + transformer = get_transformer() + + def _build(kernel_expression: str) -> gpflow.kernels.Kernel: + """Functions that parses and transforms a kernel expression in one step using the current grammar.""" + return transformer.transform(parser.parse(kernel_expression)) + return _build + + @lru_cache(maxsize=1) def get_extender() -> Callable: """Get extender of currently selected grammar. diff --git a/src/kerndisc/expansion/grammars/_grammar_duvenaud.py b/src/kerndisc/expansion/grammars/_grammar_duvenaud.py index 8c1b474..3955909 100644 --- a/src/kerndisc/expansion/grammars/_grammar_duvenaud.py +++ b/src/kerndisc/expansion/grammars/_grammar_duvenaud.py @@ -1,3 +1,4 @@ + import logging from typing import Dict, List @@ -5,11 +6,19 @@ from lark import Lark, ParseError, Transformer, UnexpectedInput from lark.lexer import Token -from ._kernels import BASE_KERNELS +from ._kernels import BASE_KERNELS, ChangePoint from ._util import find_closing_bracket -_IMPLEMENTED_KERNEL_EXPRESSIONS = ('cosine', 'linear', 'matern12', 'matern32', 'matern52', 'periodic', 'rbf', 'white', 'rationalquadratic') +_IMPLEMENTED_KERNEL_EXPRESSIONS = ('cosine', + 'linear', + 'matern12', + 'matern32', + 'matern52', + 'periodic', + 'rbf', + 'white', + 'rationalquadratic') _LOGGER = logging.getLogger(__package__) GRAMMAR = f"""// This kernel grammar is a close implementation of the grammar first defined by David Duvenaud et al. [2013], // in their paper: [Structure discovery in Nonparametric Regression through Compositional Kernel Search](https://arxiv.org/pdf/1302.4922.pdf) and @@ -132,7 +141,7 @@ def extender(kernel_expression: str) -> List[str]: kernel_alterations.extend([ f'{kernel_expression} + {kernel_exp}', # C1 f'({kernel_expression}) * {kernel_exp}', # C2 - # f'cp({kernel_expression}, {kernel_expression})', # C4 + f'cp({kernel_expression}, {kernel_expression})', # C4 # f'cw({kernel_expression}, {kernel_expression}', # C5 # f'cw({kernel_expression}, constant)', # C6 # f'cw(constant, {kernel_expression})', # C7 @@ -241,7 +250,7 @@ def lax_mul(self, kernels: List[gpflow.kernels.Kernel]): def cp(self, kernels: List[gpflow.kernels.Kernel]): # pragma: no cover """Changepoint from the first kernel to the second kernel in the above list.""" - raise NotImplementedError + return ChangePoint(kernels[0], kernels[1], offset=1600., variance=5.0) def cw(self, kernels: List[gpflow.kernels.Kernel]): # pragma: no cover """Changewindow from the first kernel to the second kernel in the above list.""" diff --git a/src/kerndisc/expansion/grammars/_kernels.py b/src/kerndisc/expansion/grammars/_kernels.py index 3b3ffed..7246fc6 100644 --- a/src/kerndisc/expansion/grammars/_kernels.py +++ b/src/kerndisc/expansion/grammars/_kernels.py @@ -1,8 +1,12 @@ """Module that maintains all kernels that are available for kernel construction.""" -from typing import Dict +import copy +from functools import reduce +from typing import Dict, Optional import gpflow +from gpflow import Param, transforms from gpflow.kernels import (ArcCosine, + Combination, Constant, Cosine, Exponential, @@ -14,6 +18,9 @@ RationalQuadratic, RBF, White) +import numpy as np +import tensorflow as tf + _CONSTANT: Dict[str, gpflow.kernels.Kernel] = { 'constant': Constant, @@ -36,3 +43,93 @@ 'white': White, **_CONSTANT, } + + +class ChangePoint(Combination): + """Changepoint (CP) Kernel that switches from one kernel to another at a certain offset. + + This kernels output is defined as: + ``` + CP(k1, k2, θ)(X, X') = σ(X, θ) * k1(X, X') * σ(X', θ) + σ(X, θ) * k2(X, X') * σ(X', θ) + ``` + Where `k1`, `k2` are kernel functions, `σ` is the sigmoid smooth step function. This kernel is useful + for quick transitions between two kernel functions. `θ = (offset, variance)` are parameters of the sigmoid function + that dictate at what (time-)point the transition occurs (`offset`) and what variance is added. `θ` is learned + during training and applied to `X`, `X2` before `σ` is calculated. + + """ + + def __init__(self, k1: gpflow.kernels.Kernel, k2: gpflow.kernels.Kernel, offset: float, variance: float=1.0): + """Initialize changepoint at location `offset`, with variance `variance`.""" + super(ChangePoint, self).__init__([k1, k2]) + + self.offset = Param(offset, transform=transforms.positive) + self.variance = Param(variance, transform=transforms.positive) + + def K(self, X: np.ndarray, X2: Optional[np.ndarray]=None): # noqa: N802, N803 + """Calculate covariance matrix.""" + sigmoid_at_X = tf.sigmoid(self.variance * tf.squeeze(tf.expand_dims(X, axis=1) @ tf.expand_dims(X, axis=2)) + self.offset) # noqa: N806 + + if X2 is None: + sigmoid_at_X2 = sigmoid_at_X # noqa: N806 + else: + sigmoid_at_X2 = tf.sigmoid(self.variance * tf.squeeze(tf.expand_dims(X2, axis=1) @ tf.expand_dims(X2, axis=2)) + self.offset) # noqa: N806 + + k1_multiplier = tf.expand_dims(sigmoid_at_X, axis=1) @ tf.expand_dims(sigmoid_at_X2, axis=0) + k2_multiplier = tf.expand_dims((1. - sigmoid_at_X), axis=1) @ tf.expand_dims((1. - sigmoid_at_X2), axis=0) + + summand_1 = reduce(tf.multiply, [k1_multiplier, self.kern_list[0].K(X, X2)]) + summand_2 = reduce(tf.multiply, [k2_multiplier, self.kern_list[1].K(X, X2)]) + + return reduce(tf.add, [summand_1, summand_2]) + + def Kdiag(self, X: np.ndarray): # noqa: N802, N803 + """Calculate diagonal of covariance matrix only, convenience method.""" + sigmoid_at_X = tf.sigmoid(self.variance * tf.squeeze(tf.expand_dims(X, axis=1) @ tf.expand_dims(X, axis=2)) + self.offset) # noqa: N806 + + summand_1 = reduce(tf.multiply, [sigmoid_at_X, self.kern_list[0].Kdiag(X), sigmoid_at_X]) + summand_2 = reduce(tf.multiply, [(1. - sigmoid_at_X), self.kern_list[1].Kdiag(X), 1. - sigmoid_at_X]) + + return reduce(tf.add, [summand_1, summand_2]) + + +class ChangeWindow(Combination): + """Changewindow (CW) Kernel that switches from one kernel to another in a certain range. + + This kernels output is defined as: + ``` + CW(k1, k2)(X, X') = CP(CP(k1, k2, θ_1), k1, θ_2)(X, X') + ``` + Where `CP` is the changepoint function from above, `θ_1 = (offset_cp_1, variance_cp_1)` dictates + at what point the transition from `k1` to `k2` occurs, `θ_2 = (offset_cp_2, variance_cp_2)` then + forces the second transition from `k2` to `k1` at point `offset_cp_2`. Thus `offset_cp_1 < offset_cp_2` + must hold. + + """ + + def __init__(self, k1: gpflow.kernels.Kernel, k2: gpflow.kernels.Kernel, offset_cp_1: float, offset_cp_2: float, + variance: float=1.0, variance_cp_1: float=1.0, variance_cp_2: float=1.0): + """Initialize changewindow.""" + if offset_cp_1 >= offset_cp_2: + raise RuntimeError(f'Changewindow kernel was initialized with `offset_cp_1 = {offset_cp_1} >= offset_cp_2 = {offset_cp_2}`.') + + super(ChangeWindow, self).__init__([k1, k2]) + + self.variance = Param(variance, transform=transforms.positive) + + self.cp_lower = ChangePoint(k1, k2, offset_cp_1, variance_cp_1) + self.cp_upper = ChangePoint(self.cp_lower, copy.copy(k1), offset_cp_2, variance_cp_2) + + self.remaining_margin = Param(self.offset_margin, transform=transforms.positive) + + @property + def offset_margin(self): + return self.cp_upper.offset.value - self.cp_lower.offset.value + + def K(self, X, X2=None): # noqa: N802, N803 + """Calculate covariance matrix.""" + return self.variance * self.cp_upper.K(X, X2) + + def Kdiag(self, X): # noqa: N802, N803 + """Calculate diagonal of covariance matrix only, convenience method.""" + return self.variance * self.cp_upper.Kdiag(X) diff --git a/tests/expansion/grammars/test_kernels.py b/tests/expansion/grammars/test_kernels.py new file mode 100644 index 0000000..7a039d8 --- /dev/null +++ b/tests/expansion/grammars/test_kernels.py @@ -0,0 +1,12 @@ +"""Tests here are only for the manually implemented kernels from `_kernels.py`.""" +import gpflow + +from kerndisc.expansion.grammars._kernels import ChangePoint # noqa: I202, I100 + + +def test_changepoint(): + offset = 5. + k1 = gpflow.kernels.Constant(1) + k2 = gpflow.kernels.Constant(1) + + changepoint_k = ChangePoint(k1, k2, offset=offset)