From 11c24d96c84e7fa5d37fafbf3c8492f002b2d1d0 Mon Sep 17 00:00:00 2001 From: nils Date: Fri, 8 Jun 2018 07:47:05 +0200 Subject: [PATCH 1/5] first buggy draft of changepoints --- src/kerndisc/expansion/grammars/_kernels.py | 44 +++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/src/kerndisc/expansion/grammars/_kernels.py b/src/kerndisc/expansion/grammars/_kernels.py index 3b3ffed..8ba1a78 100644 --- a/src/kerndisc/expansion/grammars/_kernels.py +++ b/src/kerndisc/expansion/grammars/_kernels.py @@ -1,8 +1,10 @@ """Module that maintains all kernels that are available for kernel construction.""" +from functools import reduce from typing import Dict import gpflow from gpflow.kernels import (ArcCosine, + Combination, Constant, Cosine, Exponential, @@ -14,6 +16,9 @@ RationalQuadratic, RBF, White) +from gpflow.params import Param +import tensorflow as tf + _CONSTANT: Dict[str, gpflow.kernels.Kernel] = { 'constant': Constant, @@ -36,3 +41,42 @@ 'white': White, **_CONSTANT, } + + +class ChangePoint(Combination): + """Changepoing Kernel that switches from one kernel to another at a certain offset.""" + + def __init__(self, kern_list, offset, variance): + """Initialize Changepoing at location `offset`, with variance `variance`.""" + super(ChangePoint, self).__init__(kern_list) + assert len(self.kern_list) == 2 # Has to transition between exactly 2 kernels. + self.offset = Param(offset) + self.variance = Param(variance) + + def K(self, X, X2=None, presliced=False): + """Calculate covariance matrix.""" + if X2 is None: + X2 = X + XXT = self.variance * tf.squeeze(tf.matmul(tf.expand_dims(X, axis=1), tf.expand_dims(X, axis=2))) + self.offset + XXT2 = self.variance * tf.squeeze(tf.matmul(tf.expand_dims(X2, axis=1), tf.expand_dims(X2, axis=2))) + self.offset + sigm = tf.sigmoid(XXT) + sigm2 = tf.sigmoid(XXT2) + sig1 = tf.matmul(tf.expand_dims(sigm, axis=1), + tf.expand_dims(sigm2, axis=0)) + sig2 = tf.matmul(tf.expand_dims((1. - sigm), axis=1), + tf.expand_dims((1. - sigm2), axis=0)) + a1 = reduce(tf.mul, + [sig1, self.kern_list[0].K(X, X2)]) + a2 = reduce(tf.mul, + [sig2, self.kern_list[1].K(X, X2)]) + return reduce(tf.add, [a1, a2]) + + def Kdiag(self, X, presliced=False): + """Calculate diagonal of covariance matrix only, convenience method.""" + XXT = self.variance * tf.squeeze(tf.matmul(tf.expand_dims(X, axis=1), tf.expand_dims(X, axis=2))) + self.offset + sigm = tf.sigmoid(XXT) + a1 = reduce(tf.mul, [sigm, self.kern_list[0].Kdiag(X), + sigm]) + a2 = reduce(tf.mul, [(1. - sigm), self.kern_list[1].Kdiag(X), + 1. - sigm]) + return reduce(tf.add, [a1, a2]) From 39ce1cbe9d5d70f9db069db522f5fd294559fa18 Mon Sep 17 00:00:00 2001 From: nils Date: Fri, 8 Jun 2018 07:48:18 +0200 Subject: [PATCH 2/5] small bug fix in kernel file --- src/kerndisc/expansion/grammars/_kernels.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/kerndisc/expansion/grammars/_kernels.py b/src/kerndisc/expansion/grammars/_kernels.py index 8ba1a78..1bd243a 100644 --- a/src/kerndisc/expansion/grammars/_kernels.py +++ b/src/kerndisc/expansion/grammars/_kernels.py @@ -3,6 +3,7 @@ from typing import Dict import gpflow +from gpflow import Param from gpflow.kernels import (ArcCosine, Combination, Constant, @@ -16,7 +17,6 @@ RationalQuadratic, RBF, White) -from gpflow.params import Param import tensorflow as tf From 0b298f05c8a31849f81ea611307b2e51dac587b7 Mon Sep 17 00:00:00 2001 From: nils Date: Tue, 12 Jun 2018 12:30:34 +0200 Subject: [PATCH 3/5] stub for cw, rough implementation of cp --- src/kerndisc/expansion/grammars/_kernels.py | 65 ++++++++++++++++----- 1 file changed, 52 insertions(+), 13 deletions(-) diff --git a/src/kerndisc/expansion/grammars/_kernels.py b/src/kerndisc/expansion/grammars/_kernels.py index 1bd243a..5b323d6 100644 --- a/src/kerndisc/expansion/grammars/_kernels.py +++ b/src/kerndisc/expansion/grammars/_kernels.py @@ -17,6 +17,7 @@ RationalQuadratic, RBF, White) +import numpy as np import tensorflow as tf @@ -44,16 +45,15 @@ class ChangePoint(Combination): - """Changepoing Kernel that switches from one kernel to another at a certain offset.""" + """Changepoint Kernel that switches from one kernel to another at a certain offset.""" def __init__(self, kern_list, offset, variance): - """Initialize Changepoing at location `offset`, with variance `variance`.""" + """Initialize changepoint at location `offset`, with variance `variance`.""" super(ChangePoint, self).__init__(kern_list) - assert len(self.kern_list) == 2 # Has to transition between exactly 2 kernels. - self.offset = Param(offset) - self.variance = Param(variance) + self.offset = Param(np.array(offset).astype(np.float64)) + self.variance = Param(np.array(variance).astype(np.float64)) - def K(self, X, X2=None, presliced=False): + def K(self, X, X2=None): """Calculate covariance matrix.""" if X2 is None: X2 = X @@ -65,18 +65,57 @@ def K(self, X, X2=None, presliced=False): tf.expand_dims(sigm2, axis=0)) sig2 = tf.matmul(tf.expand_dims((1. - sigm), axis=1), tf.expand_dims((1. - sigm2), axis=0)) - a1 = reduce(tf.mul, + a1 = reduce(tf.multiply, [sig1, self.kern_list[0].K(X, X2)]) - a2 = reduce(tf.mul, + a2 = reduce(tf.multiply, [sig2, self.kern_list[1].K(X, X2)]) return reduce(tf.add, [a1, a2]) - def Kdiag(self, X, presliced=False): + def Kdiag(self, X): """Calculate diagonal of covariance matrix only, convenience method.""" XXT = self.variance * tf.squeeze(tf.matmul(tf.expand_dims(X, axis=1), tf.expand_dims(X, axis=2))) + self.offset sigm = tf.sigmoid(XXT) - a1 = reduce(tf.mul, [sigm, self.kern_list[0].Kdiag(X), - sigm]) - a2 = reduce(tf.mul, [(1. - sigm), self.kern_list[1].Kdiag(X), - 1. - sigm]) + a1 = reduce(tf.multiply, [sigm, self.kern_list[0].Kdiag(X), + sigm]) + a2 = reduce(tf.multiply, [(1. - sigm), self.kern_list[1].Kdiag(X), + 1. - sigm]) + return reduce(tf.add, [a1, a2]) + + +class ChangeWindow(Combination): + """Changewindow kernel that applies a kernel only in a range from some startingpoint `t_s` to endpoint `t_e`.""" + + def __init__(self, kern_list, kernel_start, kernel_end, variance): + """Initialize changewindow with some kernel which only applies in a certain window, then the other kernel applies again.""" + super(ChangePoint, self).__init__(kern_list) + self.start = Param(np.array(kernel_start).astype(np.float64)) + self.end = Param(np.array(kernel_end).astype(np.float64)) + self.variance = Param(np.array(variance).astype(np.float64)) + + def K(self, X, X2=None): + """Calculate covariance matrix.""" + if X2 is None: + X2 = X + XXT = self.variance * tf.squeeze(tf.matmul(tf.expand_dims(X, axis=1), tf.expand_dims(X, axis=2))) + self.offset + XXT2 = self.variance * tf.squeeze(tf.matmul(tf.expand_dims(X2, axis=1), tf.expand_dims(X2, axis=2))) + self.offset + sigm = tf.sigmoid(XXT) + sigm2 = tf.sigmoid(XXT2) + sig1 = tf.matmul(tf.expand_dims(sigm, axis=1), + tf.expand_dims(sigm2, axis=0)) + sig2 = tf.matmul(tf.expand_dims((1. - sigm), axis=1), + tf.expand_dims((1. - sigm2), axis=0)) + a1 = reduce(tf.multiply, + [sig1, self.kern_list[0].K(X, X2)]) + a2 = reduce(tf.multiply, + [sig2, self.kern_list[1].K(X, X2)]) + return reduce(tf.add, [a1, a2]) + + def Kdiag(self, X): + """Calculate diagonal of covariance matrix only, convenience method.""" + XXT = self.variance * tf.squeeze(tf.matmul(tf.expand_dims(X, axis=1), tf.expand_dims(X, axis=2))) + self.offset + sigm = tf.sigmoid(XXT) + a1 = reduce(tf.multiply, [sigm, self.kern_list[0].Kdiag(X), + sigm]) + a2 = reduce(tf.multiply, [(1. - sigm), self.kern_list[1].Kdiag(X), + 1. - sigm]) return reduce(tf.add, [a1, a2]) From 2f1d61f0ce646dda249cc0de73b11257e3d21856 Mon Sep 17 00:00:00 2001 From: nils Date: Tue, 12 Jun 2018 19:28:43 +0200 Subject: [PATCH 4/5] continued with cp, cw --- src/kerndisc/expansion/grammars/_kernels.py | 140 +++++++++++--------- 1 file changed, 77 insertions(+), 63 deletions(-) diff --git a/src/kerndisc/expansion/grammars/_kernels.py b/src/kerndisc/expansion/grammars/_kernels.py index 5b323d6..7246fc6 100644 --- a/src/kerndisc/expansion/grammars/_kernels.py +++ b/src/kerndisc/expansion/grammars/_kernels.py @@ -1,9 +1,10 @@ """Module that maintains all kernels that are available for kernel construction.""" +import copy from functools import reduce -from typing import Dict +from typing import Dict, Optional import gpflow -from gpflow import Param +from gpflow import Param, transforms from gpflow.kernels import (ArcCosine, Combination, Constant, @@ -45,77 +46,90 @@ class ChangePoint(Combination): - """Changepoint Kernel that switches from one kernel to another at a certain offset.""" + """Changepoint (CP) Kernel that switches from one kernel to another at a certain offset. - def __init__(self, kern_list, offset, variance): + This kernels output is defined as: + ``` + CP(k1, k2, θ)(X, X') = σ(X, θ) * k1(X, X') * σ(X', θ) + σ(X, θ) * k2(X, X') * σ(X', θ) + ``` + Where `k1`, `k2` are kernel functions, `σ` is the sigmoid smooth step function. This kernel is useful + for quick transitions between two kernel functions. `θ = (offset, variance)` are parameters of the sigmoid function + that dictate at what (time-)point the transition occurs (`offset`) and what variance is added. `θ` is learned + during training and applied to `X`, `X2` before `σ` is calculated. + + """ + + def __init__(self, k1: gpflow.kernels.Kernel, k2: gpflow.kernels.Kernel, offset: float, variance: float=1.0): """Initialize changepoint at location `offset`, with variance `variance`.""" - super(ChangePoint, self).__init__(kern_list) - self.offset = Param(np.array(offset).astype(np.float64)) - self.variance = Param(np.array(variance).astype(np.float64)) + super(ChangePoint, self).__init__([k1, k2]) + + self.offset = Param(offset, transform=transforms.positive) + self.variance = Param(variance, transform=transforms.positive) - def K(self, X, X2=None): + def K(self, X: np.ndarray, X2: Optional[np.ndarray]=None): # noqa: N802, N803 """Calculate covariance matrix.""" + sigmoid_at_X = tf.sigmoid(self.variance * tf.squeeze(tf.expand_dims(X, axis=1) @ tf.expand_dims(X, axis=2)) + self.offset) # noqa: N806 + if X2 is None: - X2 = X - XXT = self.variance * tf.squeeze(tf.matmul(tf.expand_dims(X, axis=1), tf.expand_dims(X, axis=2))) + self.offset - XXT2 = self.variance * tf.squeeze(tf.matmul(tf.expand_dims(X2, axis=1), tf.expand_dims(X2, axis=2))) + self.offset - sigm = tf.sigmoid(XXT) - sigm2 = tf.sigmoid(XXT2) - sig1 = tf.matmul(tf.expand_dims(sigm, axis=1), - tf.expand_dims(sigm2, axis=0)) - sig2 = tf.matmul(tf.expand_dims((1. - sigm), axis=1), - tf.expand_dims((1. - sigm2), axis=0)) - a1 = reduce(tf.multiply, - [sig1, self.kern_list[0].K(X, X2)]) - a2 = reduce(tf.multiply, - [sig2, self.kern_list[1].K(X, X2)]) - return reduce(tf.add, [a1, a2]) - - def Kdiag(self, X): + sigmoid_at_X2 = sigmoid_at_X # noqa: N806 + else: + sigmoid_at_X2 = tf.sigmoid(self.variance * tf.squeeze(tf.expand_dims(X2, axis=1) @ tf.expand_dims(X2, axis=2)) + self.offset) # noqa: N806 + + k1_multiplier = tf.expand_dims(sigmoid_at_X, axis=1) @ tf.expand_dims(sigmoid_at_X2, axis=0) + k2_multiplier = tf.expand_dims((1. - sigmoid_at_X), axis=1) @ tf.expand_dims((1. - sigmoid_at_X2), axis=0) + + summand_1 = reduce(tf.multiply, [k1_multiplier, self.kern_list[0].K(X, X2)]) + summand_2 = reduce(tf.multiply, [k2_multiplier, self.kern_list[1].K(X, X2)]) + + return reduce(tf.add, [summand_1, summand_2]) + + def Kdiag(self, X: np.ndarray): # noqa: N802, N803 """Calculate diagonal of covariance matrix only, convenience method.""" - XXT = self.variance * tf.squeeze(tf.matmul(tf.expand_dims(X, axis=1), tf.expand_dims(X, axis=2))) + self.offset - sigm = tf.sigmoid(XXT) - a1 = reduce(tf.multiply, [sigm, self.kern_list[0].Kdiag(X), - sigm]) - a2 = reduce(tf.multiply, [(1. - sigm), self.kern_list[1].Kdiag(X), - 1. - sigm]) - return reduce(tf.add, [a1, a2]) + sigmoid_at_X = tf.sigmoid(self.variance * tf.squeeze(tf.expand_dims(X, axis=1) @ tf.expand_dims(X, axis=2)) + self.offset) # noqa: N806 + + summand_1 = reduce(tf.multiply, [sigmoid_at_X, self.kern_list[0].Kdiag(X), sigmoid_at_X]) + summand_2 = reduce(tf.multiply, [(1. - sigmoid_at_X), self.kern_list[1].Kdiag(X), 1. - sigmoid_at_X]) + + return reduce(tf.add, [summand_1, summand_2]) class ChangeWindow(Combination): - """Changewindow kernel that applies a kernel only in a range from some startingpoint `t_s` to endpoint `t_e`.""" + """Changewindow (CW) Kernel that switches from one kernel to another in a certain range. - def __init__(self, kern_list, kernel_start, kernel_end, variance): - """Initialize changewindow with some kernel which only applies in a certain window, then the other kernel applies again.""" - super(ChangePoint, self).__init__(kern_list) - self.start = Param(np.array(kernel_start).astype(np.float64)) - self.end = Param(np.array(kernel_end).astype(np.float64)) - self.variance = Param(np.array(variance).astype(np.float64)) + This kernels output is defined as: + ``` + CW(k1, k2)(X, X') = CP(CP(k1, k2, θ_1), k1, θ_2)(X, X') + ``` + Where `CP` is the changepoint function from above, `θ_1 = (offset_cp_1, variance_cp_1)` dictates + at what point the transition from `k1` to `k2` occurs, `θ_2 = (offset_cp_2, variance_cp_2)` then + forces the second transition from `k2` to `k1` at point `offset_cp_2`. Thus `offset_cp_1 < offset_cp_2` + must hold. - def K(self, X, X2=None): + """ + + def __init__(self, k1: gpflow.kernels.Kernel, k2: gpflow.kernels.Kernel, offset_cp_1: float, offset_cp_2: float, + variance: float=1.0, variance_cp_1: float=1.0, variance_cp_2: float=1.0): + """Initialize changewindow.""" + if offset_cp_1 >= offset_cp_2: + raise RuntimeError(f'Changewindow kernel was initialized with `offset_cp_1 = {offset_cp_1} >= offset_cp_2 = {offset_cp_2}`.') + + super(ChangeWindow, self).__init__([k1, k2]) + + self.variance = Param(variance, transform=transforms.positive) + + self.cp_lower = ChangePoint(k1, k2, offset_cp_1, variance_cp_1) + self.cp_upper = ChangePoint(self.cp_lower, copy.copy(k1), offset_cp_2, variance_cp_2) + + self.remaining_margin = Param(self.offset_margin, transform=transforms.positive) + + @property + def offset_margin(self): + return self.cp_upper.offset.value - self.cp_lower.offset.value + + def K(self, X, X2=None): # noqa: N802, N803 """Calculate covariance matrix.""" - if X2 is None: - X2 = X - XXT = self.variance * tf.squeeze(tf.matmul(tf.expand_dims(X, axis=1), tf.expand_dims(X, axis=2))) + self.offset - XXT2 = self.variance * tf.squeeze(tf.matmul(tf.expand_dims(X2, axis=1), tf.expand_dims(X2, axis=2))) + self.offset - sigm = tf.sigmoid(XXT) - sigm2 = tf.sigmoid(XXT2) - sig1 = tf.matmul(tf.expand_dims(sigm, axis=1), - tf.expand_dims(sigm2, axis=0)) - sig2 = tf.matmul(tf.expand_dims((1. - sigm), axis=1), - tf.expand_dims((1. - sigm2), axis=0)) - a1 = reduce(tf.multiply, - [sig1, self.kern_list[0].K(X, X2)]) - a2 = reduce(tf.multiply, - [sig2, self.kern_list[1].K(X, X2)]) - return reduce(tf.add, [a1, a2]) - - def Kdiag(self, X): + return self.variance * self.cp_upper.K(X, X2) + + def Kdiag(self, X): # noqa: N802, N803 """Calculate diagonal of covariance matrix only, convenience method.""" - XXT = self.variance * tf.squeeze(tf.matmul(tf.expand_dims(X, axis=1), tf.expand_dims(X, axis=2))) + self.offset - sigm = tf.sigmoid(XXT) - a1 = reduce(tf.multiply, [sigm, self.kern_list[0].Kdiag(X), - sigm]) - a2 = reduce(tf.multiply, [(1. - sigm), self.kern_list[1].Kdiag(X), - 1. - sigm]) - return reduce(tf.add, [a1, a2]) + return self.variance * self.cp_upper.Kdiag(X) From 3509897cc0fa200883119f42411e21485888b82d Mon Sep 17 00:00:00 2001 From: nils Date: Fri, 15 Jun 2018 13:26:53 +0200 Subject: [PATCH 5/5] reworked cp, cw, readded all kernels to search, added new builder --- src/kerndisc/_discover.py | 3 ++- src/kerndisc/evaluation/_evaluate.py | 8 +++---- src/kerndisc/expansion/grammars/__init__.py | 22 +++++++++++++++++++ .../expansion/grammars/_grammar_duvenaud.py | 17 ++++++++++---- tests/expansion/grammars/test_kernels.py | 12 ++++++++++ 5 files changed, 52 insertions(+), 10 deletions(-) create mode 100644 tests/expansion/grammars/test_kernels.py diff --git a/src/kerndisc/_discover.py b/src/kerndisc/_discover.py index 1a36589..3ebd140 100644 --- a/src/kerndisc/_discover.py +++ b/src/kerndisc/_discover.py @@ -6,6 +6,7 @@ from ._preprocessing import preprocess from ._util import n_best_scored_kernel_expressions from .evaluation import evaluate +from .evaluation.scoring import get_current_metric from .expansion import expand_kernel_expressions @@ -63,7 +64,7 @@ def discover(x: np.ndarray, y: np.ndarray, search_depth: int=10, kernels_per_dep } for k_exp in expanded_exps if k_exp not in scored_kernel_expressions } - _LOGGER.info(f'Depth `{depth}`: Scoring unscored kernels.') + _LOGGER.info(f'Depth `{depth}`: Scoring unscored kernels using metric `{get_current_metric()}`.') if unscored_exps: for k_exp, score in evaluate(x, y, list(unscored_exps)): diff --git a/src/kerndisc/evaluation/_evaluate.py b/src/kerndisc/evaluation/_evaluate.py index 0e0b66a..a48922a 100644 --- a/src/kerndisc/evaluation/_evaluate.py +++ b/src/kerndisc/evaluation/_evaluate.py @@ -9,7 +9,7 @@ from ._util import add_jitter_to_model from .scoring import score_model -from ..expansion.grammars import get_parser, get_transformer +from ..expansion.grammars import get_builder _CORES = int(os.environ.get('CORES', 1)) @@ -85,8 +85,7 @@ def _make_evaluator(x: np.ndarray, y: np.ndarray, add_jitter: bool) -> Callable: Evaluates a kernel expression passed to it. """ - parser = get_parser() - transformer = get_transformer() + build = get_builder() optimizer = gpflow.train.ScipyOptimizer() def _evaluator(kernel_expression) -> Tuple[str, float]: @@ -101,8 +100,7 @@ def _evaluator(kernel_expression) -> Tuple[str, float]: Kernel expression to be evaluated. """ - kernel = transformer.transform(parser.parse(kernel_expression)) - model = gpflow.models.GPR(x, y, kern=kernel) + model = gpflow.models.GPR(x, y, kern=build(kernel_expression)) if add_jitter: add_jitter_to_model(model) diff --git a/src/kerndisc/expansion/grammars/__init__.py b/src/kerndisc/expansion/grammars/__init__.py index 31c0bb3..ad07b20 100644 --- a/src/kerndisc/expansion/grammars/__init__.py +++ b/src/kerndisc/expansion/grammars/__init__.py @@ -91,6 +91,28 @@ def get_transformer() -> Transformer: return _GRAMMARS[get_current_grammar()]['transformer']() +@lru_cache(maxsize=1) +def get_builder() -> Callable: + """Get a builder that parses and transforms a kernel expression. + + Mostly a utility function that combines `parser.parse` and `transformer.transform`. + + Returns + ------- + builder: Callable + Builder to parse and transform (build) a kernel in one step on call. + + + """ + parser = get_parser() + transformer = get_transformer() + + def _build(kernel_expression: str) -> gpflow.kernels.Kernel: + """Functions that parses and transforms a kernel expression in one step using the current grammar.""" + return transformer.transform(parser.parse(kernel_expression)) + return _build + + @lru_cache(maxsize=1) def get_extender() -> Callable: """Get extender of currently selected grammar. diff --git a/src/kerndisc/expansion/grammars/_grammar_duvenaud.py b/src/kerndisc/expansion/grammars/_grammar_duvenaud.py index 8c1b474..3955909 100644 --- a/src/kerndisc/expansion/grammars/_grammar_duvenaud.py +++ b/src/kerndisc/expansion/grammars/_grammar_duvenaud.py @@ -1,3 +1,4 @@ + import logging from typing import Dict, List @@ -5,11 +6,19 @@ from lark import Lark, ParseError, Transformer, UnexpectedInput from lark.lexer import Token -from ._kernels import BASE_KERNELS +from ._kernels import BASE_KERNELS, ChangePoint from ._util import find_closing_bracket -_IMPLEMENTED_KERNEL_EXPRESSIONS = ('cosine', 'linear', 'matern12', 'matern32', 'matern52', 'periodic', 'rbf', 'white', 'rationalquadratic') +_IMPLEMENTED_KERNEL_EXPRESSIONS = ('cosine', + 'linear', + 'matern12', + 'matern32', + 'matern52', + 'periodic', + 'rbf', + 'white', + 'rationalquadratic') _LOGGER = logging.getLogger(__package__) GRAMMAR = f"""// This kernel grammar is a close implementation of the grammar first defined by David Duvenaud et al. [2013], // in their paper: [Structure discovery in Nonparametric Regression through Compositional Kernel Search](https://arxiv.org/pdf/1302.4922.pdf) and @@ -132,7 +141,7 @@ def extender(kernel_expression: str) -> List[str]: kernel_alterations.extend([ f'{kernel_expression} + {kernel_exp}', # C1 f'({kernel_expression}) * {kernel_exp}', # C2 - # f'cp({kernel_expression}, {kernel_expression})', # C4 + f'cp({kernel_expression}, {kernel_expression})', # C4 # f'cw({kernel_expression}, {kernel_expression}', # C5 # f'cw({kernel_expression}, constant)', # C6 # f'cw(constant, {kernel_expression})', # C7 @@ -241,7 +250,7 @@ def lax_mul(self, kernels: List[gpflow.kernels.Kernel]): def cp(self, kernels: List[gpflow.kernels.Kernel]): # pragma: no cover """Changepoint from the first kernel to the second kernel in the above list.""" - raise NotImplementedError + return ChangePoint(kernels[0], kernels[1], offset=1600., variance=5.0) def cw(self, kernels: List[gpflow.kernels.Kernel]): # pragma: no cover """Changewindow from the first kernel to the second kernel in the above list.""" diff --git a/tests/expansion/grammars/test_kernels.py b/tests/expansion/grammars/test_kernels.py new file mode 100644 index 0000000..7a039d8 --- /dev/null +++ b/tests/expansion/grammars/test_kernels.py @@ -0,0 +1,12 @@ +"""Tests here are only for the manually implemented kernels from `_kernels.py`.""" +import gpflow + +from kerndisc.expansion.grammars._kernels import ChangePoint # noqa: I202, I100 + + +def test_changepoint(): + offset = 5. + k1 = gpflow.kernels.Constant(1) + k2 = gpflow.kernels.Constant(1) + + changepoint_k = ChangePoint(k1, k2, offset=offset)