Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH/DOC] Unsupervised and semi-supervised usage of PyODAdapter #1932

Merged
merged 3 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 36 additions & 22 deletions aeon/anomaly_detection/_kmeans.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,46 +110,60 @@ def __init__(
self.estimator_: Optional[KMeans] = None

def _fit(self, X: np.ndarray, y: Optional[np.ndarray] = None) -> "KMeansAD":
self._check_params(X)
_X, _ = sliding_windows(
X, window_size=self.window_size, stride=self.stride, axis=0
)
self.estimator_ = KMeans(
n_clusters=self.n_clusters,
random_state=self.random_state,
init="k-means++",
n_init=10,
max_iter=300,
tol=1e-4,
verbose=0,
algorithm="lloyd",
)
self.estimator_.fit(_X)
self._inner_fit(_X)
return self

def _predict(self, X) -> np.ndarray:
_X, padding = sliding_windows(
X, window_size=self.window_size, stride=self.stride, axis=0
)
clusters = self.estimator_.predict(_X)
window_scores = np.linalg.norm(
_X - self.estimator_.cluster_centers_[clusters], axis=1
)
point_anomaly_scores = reverse_windowing(
window_scores, self.window_size, np.nanmean, self.stride, padding
)
point_anomaly_scores = self._inner_predict(_X, padding)
return point_anomaly_scores

def _fit_predict(self, X: np.ndarray, y: Optional[np.ndarray] = None) -> np.ndarray:
self._check_params(X)
_X, padding = sliding_windows(
X, window_size=self.window_size, stride=self.stride, axis=0
)
self._inner_fit(_X)
point_anomaly_scores = self._inner_predict(_X, padding)
return point_anomaly_scores

def _check_params(self, X: np.ndarray) -> None:
if self.window_size < 1 or self.window_size > X.shape[0]:
raise ValueError(
"The window size must be at least 1 and at most the length of the "
"time series."
)

if self.stride < 1 or self.stride > self.window_size:
raise ValueError(
"The stride must be at least 1 and at most the window size."
)
if self.n_clusters < 1:
raise ValueError("The number of clusters must be at least 1.")

def _inner_fit(self, X: np.ndarray) -> None:
self.estimator_ = KMeans(
n_clusters=self.n_clusters, random_state=self.random_state
n_clusters=self.n_clusters,
random_state=self.random_state,
init="k-means++",
n_init=10,
max_iter=300,
tol=1e-4,
verbose=0,
algorithm="lloyd",
)
self.estimator_.fit(_X)
clusters = self.estimator_.predict(_X)
self.estimator_.fit(X)

def _inner_predict(self, X: np.ndarray, padding: int) -> np.ndarray:
clusters = self.estimator_.predict(X)
window_scores = np.linalg.norm(
_X - self.estimator_.cluster_centers_[clusters], axis=1
X - self.estimator_.cluster_centers_[clusters], axis=1
)
point_anomaly_scores = reverse_windowing(
window_scores, self.window_size, np.nanmean, self.stride, padding
Expand Down
62 changes: 51 additions & 11 deletions aeon/anomaly_detection/_pyodadapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ class PyODAdapter(BaseAnomalyDetector):
series the adapter concatenates the data points of each channel in the window to
a single univariate feature vector per window as input to the PyOD model.

The PyOD adapter supports unsupervised and semi-supervised learning. The adapter
can be fitted on a reference time series and used to detect anomalies in a different
target time series with the same number of dimensions. The reference (or training)
time series does not need to be clean for most PyOD models. However, knowledge in
form of anomaly labels about the potential existing anomalies in the reference time
series are not used during the fitting process. Use `fit` to fit the model on the
reference time series and `predict` to detect anomalies in the target time series.
For unsupervised anomaly detection, use `fit_predict` directly on the target time
series.

.. list-table:: Capabilities
:stub-columns: 1

Expand All @@ -42,7 +52,7 @@ class PyODAdapter(BaseAnomalyDetector):
* - Output data format
- anomaly scores
* - Learning Type
- unsupervised
- unsupervised or semi-supervised


Parameters
Expand Down Expand Up @@ -70,6 +80,7 @@ class PyODAdapter(BaseAnomalyDetector):
"capability:multivariate": True,
"capability:univariate": True,
"capability:missing_values": False,
"fit_is_empty": False,
# Omit the version specification until PyOD has __version__
# (https://github.com/yzhao062/pyod/pull/584 in dev but not released yet)
# "python_dependencies": ["pyod>=1.1.3"]
Expand All @@ -83,7 +94,6 @@ def __init__(
self.window_size = window_size
self.stride = stride

self._padding_length = 0
super().__init__(axis=0)

@staticmethod
Expand All @@ -93,7 +103,37 @@ def _is_pyod_model(model: Any) -> bool:

return isinstance(model, BaseDetector)

def _predict(self, X) -> np.ndarray:
def _fit(self, X: np.ndarray, y: np.ndarray | None = None) -> None:
self._check_params(X)
_X, _ = sliding_windows(
X, window_size=self.window_size, stride=self.stride, axis=0
)
self._inner_fit(_X)

def _predict(self, X: np.ndarray) -> np.ndarray:
_X, padding = sliding_windows(
X, window_size=self.window_size, stride=self.stride, axis=0
)
window_anomaly_scores = self.pyod_model.decision_function(_X)
point_anomaly_scores = reverse_windowing(
window_anomaly_scores, self.window_size, np.nanmean, self.stride, padding
)
return point_anomaly_scores

def _fit_predict(self, X: np.ndarray, y: np.ndarray | None = None) -> np.ndarray:
self._check_params(X)
_X, padding = sliding_windows(
X, window_size=self.window_size, stride=self.stride, axis=0
)
self._inner_fit(_X)

window_anomaly_scores = self.pyod_model.decision_scores_
point_anomaly_scores = reverse_windowing(
window_anomaly_scores, self.window_size, np.nanmean, self.stride, padding
)
return point_anomaly_scores

def _check_params(self, X: np.ndarray) -> None:
if not self._is_pyod_model(self.pyod_model):
raise ValueError("The provided model is not a compatible PyOD model.")

Expand All @@ -108,15 +148,15 @@ def _predict(self, X) -> np.ndarray:
"The stride must be at least 1 and at most the window size."
)

_X, self._padding_length = sliding_windows(
X, window_size=self.window_size, stride=self.stride, axis=0
)
self.pyod_model.fit(_X)
scores = self.pyod_model.decision_scores_
scores = reverse_windowing(
scores, self.window_size, np.nanmean, self.stride, self._padding_length
def _inner_fit(self, X: np.ndarray) -> None:
self.pyod_model.fit(X)

def _inner_predict(self, X: np.ndarray, padding: int) -> np.ndarray:
window_anomaly_scores = self.pyod_model.decision_function(X)
point_anomaly_scores = reverse_windowing(
window_anomaly_scores, self.window_size, np.nanmean, self.stride, padding
)
return scores
return point_anomaly_scores

@classmethod
def get_test_params(cls, parameter_set="default"):
Expand Down
10 changes: 7 additions & 3 deletions aeon/anomaly_detection/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,23 @@ class BaseAnomalyDetector(BaseSeriesEstimator, ABC):
Unsupervised (default):
Unsupervised detectors do not require any training data and can directly be
used on the target time series. Their tags are set to ``fit_is_empty=True``
and ``requires_y=False``.
and ``requires_y=False``. You would usually call the ``fit_predict`` method
on these detectors.
Semi-supervised:
Semi-supervised detectors require a training step on a time series without
anomalies (normal behaving time series). The target value ``y`` would
consist of only zeros. Thus, these algorithms have logic in the ``fit``
method, but do not require the target values. Their tags are set to
``fit_is_empty=False`` and ``requires_y=False``.
``fit_is_empty=False`` and ``requires_y=False``. You would usually first
call the ``fit`` method on the training data and then the ``predict``
method for your target time series.
Supervised:
Supervised detectors require a training step on a time series with known
anomalies (anomalies should be present and must be annotated). The detector
implements the ``fit`` method, and the target value ``y`` consists of zeros
and ones. Their tags are, thus, set to ``fit_is_empty=False`` and
``requires_y=True``.
``requires_y=True``. You would usually first call the ``fit`` method on the
training data and then the ``predict`` method for your target time series.

Parameters
----------
Expand Down
64 changes: 53 additions & 11 deletions aeon/anomaly_detection/tests/test_pyod_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ def test_pyod_adapter_default():
series[50:58] -= 2

ad = PyODAdapter(LOF(), window_size=10, stride=1)
pred = ad.predict(series, axis=0)
pred = ad.fit_predict(series, axis=0)

assert pred.shape == (80,)
assert pred.dtype == np.float_
assert 50 <= np.argmax(pred) <= 60
assert hasattr(ad, "pyod_model")
assert ad.pyod_model.decision_scores_.shape == (71,)


@pytest.mark.skipif(
Expand All @@ -45,13 +44,12 @@ def test_pyod_adapter_multivariate():
series[50:58, 0] -= 2

ad = PyODAdapter(LOF(), window_size=10, stride=1)
pred = ad.predict(series, axis=0)
pred = ad.fit_predict(series, axis=0)

assert pred.shape == (80,)
assert pred.dtype == np.float_
assert 50 <= np.argmax(pred) <= 60
assert hasattr(ad, "pyod_model")
assert ad.pyod_model.decision_scores_.shape == (71,)


@pytest.mark.skipif(
Expand All @@ -66,13 +64,12 @@ def test_pyod_adapter_no_window_univariate():
series[50:58] -= 2

ad = PyODAdapter(LOF(), window_size=1, stride=1)
pred = ad.predict(series, axis=0)
pred = ad.fit_predict(series, axis=0)

assert pred.shape == (80,)
assert pred.dtype == np.float_
assert 50 <= np.argmax(pred) <= 60
assert hasattr(ad, "pyod_model")
assert ad.pyod_model.decision_scores_.shape == (80,)


@pytest.mark.skipif(
Expand All @@ -89,13 +86,12 @@ def test_pyod_adapter_no_window_multivariate():
series[50:58, 0] -= 2

ad = PyODAdapter(LOF(), window_size=1, stride=1)
pred = ad.predict(series, axis=0)
pred = ad.fit_predict(series, axis=0)

assert pred.shape == (80,)
assert pred.dtype == np.float_
assert 50 <= np.argmax(pred) <= 60
assert hasattr(ad, "pyod_model")
assert ad.pyod_model.decision_scores_.shape == (80,)


@pytest.mark.skipif(
Expand All @@ -110,13 +106,12 @@ def test_pyod_adapter_stride_univariate():
series[50:58] -= 2

ad = PyODAdapter(LOF(), window_size=10, stride=5)
pred = ad.predict(series, axis=0)
pred = ad.fit_predict(series, axis=0)

assert pred.shape == (80,)
assert pred.dtype == np.float_
assert 50 <= np.argmax(pred) <= 60
assert hasattr(ad, "pyod_model")
assert ad.pyod_model.decision_scores_.shape == (15,)


@pytest.mark.skipif(
Expand All @@ -133,10 +128,57 @@ def test_pyod_adapter_stride_multivariate():
series[50:58, 0] -= 2

ad = PyODAdapter(LOF(), window_size=10, stride=5)
pred = ad.fit_predict(series, axis=0)

assert pred.shape == (80,)
assert pred.dtype == np.float_
assert 50 <= np.argmax(pred) <= 60
assert hasattr(ad, "pyod_model")


@pytest.mark.skipif(
not _check_soft_dependencies("pyod", severity="none"),
reason="required soft dependency PyOD not available",
)
def test_pyod_adapter_semi_supervised_univariate():
"""Test PyODAdapter in semi-supervised mode."""
from pyod.models.lof import LOF

series = make_series(n_timepoints=80, return_numpy=True, random_state=0)
series[50:58] -= 2
train_series = make_series(n_timepoints=100, return_numpy=True, random_state=1)

ad = PyODAdapter(LOF(), window_size=10)
ad.fit(train_series, axis=0)
pred = ad.predict(series, axis=0)

assert pred.shape == (80,)
assert pred.dtype == np.float_
assert 50 <= np.argmax(pred) <= 60
assert hasattr(ad, "pyod_model")


@pytest.mark.skipif(
not _check_soft_dependencies("pyod", severity="none"),
reason="required soft dependency PyOD not available",
)
def test_pyod_adapter_semi_supervised_multivariate():
"""Test PyODAdapter in semi-supervised mode (multivariate)."""
from pyod.models.lof import LOF

series = make_series(
n_timepoints=80, n_columns=2, return_numpy=True, random_state=0
)
series[50:58, 0] -= 2
train_series = make_series(
n_timepoints=100, n_columns=2, return_numpy=True, random_state=1
)

ad = PyODAdapter(LOF(), window_size=10, stride=5)
ad.fit(train_series, axis=0)
pred = ad.predict(series, axis=0)

assert pred.shape == (80,)
assert pred.dtype == np.float_
assert 50 <= np.argmax(pred) <= 60
assert hasattr(ad, "pyod_model")
assert ad.pyod_model.decision_scores_.shape == (15,)
2 changes: 1 addition & 1 deletion aeon/datasets/dataset_collections.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
List of datasets available for classification, regression and forecasting archives.

The data can also be used for clustering.
The classification and regression data can also be used for clustering.

Classification data can be downloaded directly from the timeseriesclassification.com
archive.
Expand Down
Loading