Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
python-version: ${{ matrix.python-version }}
- name: Install Poetry
run: |
pipx install poetry==2.1.0
pipx install poetry==2.1.3

- name: Install dependencies
run: |
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ scipy = "^1.14.0"
matplotlib = "^3.9.1"
scikit-learn = "^1.5.2"
PyQt5 = "^5.15.11"
statsmodels = "^0.14.5"

[tool.poetry.group.dev.dependencies]
pytest = "^8.2.2"
Expand Down Expand Up @@ -60,6 +61,7 @@ ignore = ["PLR0913"]
files = "pysatl_cpd"
mypy_path = "pysatl_cpd"
strict = true
ignore_missing_imports = true


[build-system]
Expand Down
Empty file.
Empty file.
84 changes: 84 additions & 0 deletions pysatl_cpd/core/algorithms/arima/models/arima.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
"""
Module for the implementation of an ARIMA prediction model for the Predict&Compare CPD algorithm.
"""

__author__ = "Aleksandra Ri"
__copyright__ = "Copyright (c) 2025 PySATL project"
__license__ = "SPDX-License-Identifier: MIT"


import warnings
from typing import Any, Optional

import numpy as np
from statsmodels.tools.sm_exceptions import ConvergenceWarning
from statsmodels.tsa.arima.model import ARIMA, ARIMAResults


class ArimaModel:
"""
A wrapper for the statsmodels ARIMA model to simplify its use in online
change point detection tasks.
"""

def __init__(self) -> None:
"""
Initializes the ArimaModel instance, without any concrete values.
"""
self.__training_data: list[np.float64] = []
self.__results: Optional[ARIMAResults] = None
self.__order: Optional[tuple[int, int, int]] = None

def clear(self) -> None:
"""
Resets the model's state by clearing training data and results.
:return:
"""
self.__training_data = []
self.__results = None

def fit(self, training_data: list[np.float64], order: Optional[tuple[int, int, int]] = None) -> Any:
"""
Fits the ARIMA model on the provided training data.

If the data variance is very low, it fits the model as a simple mean model.
Otherwise, the standard ARIMA(0,0,0) model is used, which is suitable for stationary data.
Convergence warnings are suppressed to avoid cluttering output.

:param training_data: the time series data to train the model on.
:param order: the (p,d,q) order of the model for the autoregressive, differences, and moving average components.
:return: the residuals of the model after fitting.
"""
self.__training_data = training_data
if order:
self.__order = order

assert self.__order is not None, "Model order was not specified."

with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=UserWarning, message=".*Non-(stationary|invertible) starting.*")
warnings.filterwarnings("ignore", category=ConvergenceWarning)
model = ARIMA(self.__training_data, order=self.__order)
self.__results = model.fit()

return self.__results.resid

def predict(self, steps: int) -> Any:
"""
Forecasts future values of the time series.
:param steps: the number of steps to forecast ahead.
:return: a list of forecasted values.
"""
assert self.__results is not None, "Model must be fitted before prediction."

return self.__results.forecast(steps=steps)

def update(self, observation: list[np.float64]) -> Any:
"""
Appends a new observation to the fitted ARIMA model.
:param observation: the new observation to add to the model.
:return:
"""
assert self.__results is not None, "Model must be fitted before prediction."

self.__results = self.__results.append(observation)
Empty file.
100 changes: 100 additions & 0 deletions pysatl_cpd/core/algorithms/arima/stats_tests/cusum.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""
Module for implementation of the CUSUM statistical test for Predict&Compare CPD algorithm.
"""

__author__ = "Aleksandra Ri"
__copyright__ = "Copyright (c) 2025 PySATL project"
__license__ = "SPDX-License-Identifier: MIT"


import numpy as np

ZERO = np.float64(0.0)


class CuSum:
"""
Implements the cumulative sum (CUSUM) algorithm to detect shifts in the value of a process.

This class tracks two cumulative sums: one for detecting an increase (positive CUSUM)
and one for detecting a decrease (negative CUSUM). An alarm is triggered if either
sum exceeds a predefined threshold.
"""

def __init__(self) -> None:
self.__k_threshold: np.float64 = ZERO
self.__h_threshold: np.float64 = ZERO

self.__positive_cusum: np.float64 = ZERO
self.__negative_cusum: np.float64 = ZERO

# History tracks when the CUSUM was at 0.
# Start with True to ensure a valid index is found on the first detection
self.__positive_reset_history: list[bool] = [True]
self.__negative_reset_history: list[bool] = [True]

def update(self, residual: np.float64) -> None:
"""
Updates the CUSUM statistics with a new residual value.
:param residual: the difference between the observed and expected value.
"""
# Update the positive CUSUM, resetting to 0 if it goes negative.
self.__positive_cusum = np.maximum(ZERO, self.__positive_cusum + residual - self.__k_threshold)

# Update the negative CUSUM, resetting to 0 if it goes positive.
self.__negative_cusum = np.minimum(ZERO, self.__negative_cusum + residual + self.__k_threshold)

self.__positive_reset_history.append(self.__positive_cusum == ZERO)
self.__negative_reset_history.append(self.__negative_cusum == ZERO)

def is_change_detected(self) -> np.bool:
"""
Checks if either CUSUM statistic has exceeded its threshold.
:return: True if a change is detected, False otherwise.
"""
return self.__positive_cusum > self.__h_threshold or self.__negative_cusum < -self.__h_threshold

def get_last_reset_index(self) -> int:
"""
Finds the number of time steps since the last CUSUM reset.
After a change is detected (i.e., `is_change_detected()` is True), this
method calculates how many steps have passed since the cumulative sum value
was last at zero. This is essential for localizing the starting point of the
detected anomaly.
:return: the number of steps back from the current time to the last reset.
"""
assert self.is_change_detected(), "This method should only be called when a change is detected."

last_reset_index: int
if self.__positive_cusum > self.__h_threshold:
last_reset_index = list(reversed(self.__positive_reset_history)).index(True)
elif self.__negative_cusum < -self.__h_threshold:
last_reset_index = list(reversed(self.__negative_reset_history)).index(True)
else: # This branch is unreachable if the assertion holds, but NUMPY
last_reset_index = len(self.__positive_reset_history)

return last_reset_index

def update_thresholds(self, k: np.float64, h: np.float64) -> None:
"""
Updates the 'k' and 'h' thresholds for the CUSUM detector.
:param k: the new value for the allowable slack.
:param h: the new value for the decision threshold.
:return:
"""
self.__k_threshold = k
self.__h_threshold = h

def clear(self) -> None:
"""
Resets the CUSUM detector to its initial state.
:return:
"""
self.__k_threshold = ZERO
self.__h_threshold = ZERO

self.__positive_cusum = ZERO
self.__negative_cusum = ZERO

self.__positive_reset_history = [True]
self.__negative_reset_history = [True]
Loading