|
| 1 | +"""Implements an adapter for PyOD models to be used in the Aeon framework.""" |
| 2 | + |
| 3 | +from __future__ import annotations |
| 4 | + |
| 5 | +__maintainer__ = ["CodeLionX"] |
| 6 | +__all__ = ["STOMP"] |
| 7 | + |
| 8 | +import numpy as np |
| 9 | + |
| 10 | +from aeon.anomaly_detection.base import BaseAnomalyDetector |
| 11 | +from aeon.utils.validation._dependencies import _check_soft_dependencies |
| 12 | +from aeon.utils.windowing import reverse_windowing |
| 13 | + |
| 14 | + |
| 15 | +class STOMP(BaseAnomalyDetector): |
| 16 | + """STOMP anomaly detector. |
| 17 | +
|
| 18 | + STOMP calculates the matrix profile of a time series which is the distance to the |
| 19 | + nearest neighbor of each subsequence in the time series. The matrix profile is then |
| 20 | + used to calculate the anomaly score for each time point. The larger the distance to |
| 21 | + the nearest neighbor, the more anomalous the time point is. |
| 22 | +
|
| 23 | + STOMP supports univariate time series only. |
| 24 | +
|
| 25 | + .. list-table:: Capabilities |
| 26 | + :stub-columns: 1 |
| 27 | +
|
| 28 | + * - Input data format |
| 29 | + - univariate |
| 30 | + * - Output data format |
| 31 | + - anomaly scores |
| 32 | + * - Learning Type |
| 33 | + - unsupervised |
| 34 | +
|
| 35 | +
|
| 36 | + Parameters |
| 37 | + ---------- |
| 38 | + window_size : int, default=10 |
| 39 | + Size of the sliding window. |
| 40 | + ignore_trivial : bool, default=True |
| 41 | + Whether to ignore trivial matches in the matrix profile. |
| 42 | + normalize : bool, default=True |
| 43 | + Whether to normalize the windows before computing the distance. |
| 44 | + p : float, default=2.0 |
| 45 | + The p-norm to use for the distance calculation. |
| 46 | + k : int, default=1 |
| 47 | + The number of top distances to return. |
| 48 | +
|
| 49 | + Examples |
| 50 | + -------- |
| 51 | + >>> import numpy as np |
| 52 | + >>> from aeon.anomaly_detection import STOMP # doctest: +SKIP |
| 53 | + >>> X = np.random.default_rng(42).random((10, 2), dtype=np.float_) |
| 54 | + >>> detector = STOMP(X, window_size=2) # doctest: +SKIP |
| 55 | + >>> detector.fit_predict(X, axis=0) # doctest: +SKIP |
| 56 | + array([1.02352234 1.00193038 0.98584441 0.99630753 1.00656619 1.00682081 1.00781515 |
| 57 | + 0.99709741 0.98878895 0.99723947]) |
| 58 | +
|
| 59 | + References |
| 60 | + ---------- |
| 61 | + .. [1] Zhu, Yan and Zimmerman, Zachary and Senobari, Nader Shakibay and Yeh, |
| 62 | + Chin-Chia Michael and Funning, Gareth and Mueen, Abdullah and Brisk, |
| 63 | + Philip and Keogh, Eamonn. "Matrix Profile II: Exploiting a Novel |
| 64 | + Algorithm and GPUs to Break the One Hundred Million Barrier for Time |
| 65 | + Series Motifs and Joins." In Proceedings of the 16th International |
| 66 | + Conference on Data Mining (ICDM), 2016. |
| 67 | + """ |
| 68 | + |
| 69 | + _tags = { |
| 70 | + "capability:univariate": True, |
| 71 | + "capability:multivariate": False, |
| 72 | + "capability:missing_values": False, |
| 73 | + "fit_is_empty": True, |
| 74 | + "python_dependencies": ["stumpy"], |
| 75 | + } |
| 76 | + |
| 77 | + def __init__( |
| 78 | + self, |
| 79 | + window_size: int = 10, |
| 80 | + ignore_trivial: bool = True, |
| 81 | + normalize: bool = True, |
| 82 | + p: float = 2.0, |
| 83 | + k: int = 1, |
| 84 | + ): |
| 85 | + self.mp: np.ndarray | None = None |
| 86 | + self.window_size = window_size |
| 87 | + self.ignore_trivial = ignore_trivial |
| 88 | + self.normalize = normalize |
| 89 | + self.p = p |
| 90 | + self.k = k |
| 91 | + |
| 92 | + super().__init__(axis=0) |
| 93 | + |
| 94 | + def _predict(self, X: np.ndarray) -> np.ndarray: |
| 95 | + _check_soft_dependencies("stumpy", severity="error") |
| 96 | + import stumpy |
| 97 | + |
| 98 | + self._check_params(X) |
| 99 | + self.mp = stumpy.stump( |
| 100 | + X[:, 0], |
| 101 | + m=self.window_size, |
| 102 | + ignore_trivial=self.ignore_trivial, |
| 103 | + normalize=self.normalize, |
| 104 | + p=self.p, |
| 105 | + k=self.k, |
| 106 | + ) |
| 107 | + point_anomaly_scores = reverse_windowing(self.mp[:, 0], self.window_size) |
| 108 | + return point_anomaly_scores |
| 109 | + |
| 110 | + def _check_params(self, X: np.ndarray) -> None: |
| 111 | + if self.window_size < 1 or self.window_size > X.shape[0]: |
| 112 | + raise ValueError( |
| 113 | + "The window size must be at least 1 and at most the length of the " |
| 114 | + "time series." |
| 115 | + ) |
| 116 | + |
| 117 | + if self.k < 1 or self.k > X.shape[0] - self.window_size: |
| 118 | + raise ValueError( |
| 119 | + "The top `k` distances must be at least 1 and at most the length of " |
| 120 | + "the time series minus the window size." |
| 121 | + ) |
| 122 | + |
| 123 | + @classmethod |
| 124 | + def get_test_params(cls, parameter_set="default"): |
| 125 | + """Return testing parameter settings for the estimator. |
| 126 | +
|
| 127 | + Parameters |
| 128 | + ---------- |
| 129 | + parameter_set : str, default="default" |
| 130 | + Name of the set of test parameters to return, for use in tests. If no |
| 131 | + special parameters are defined for a value, will return `"default"` set. |
| 132 | +
|
| 133 | + Returns |
| 134 | + ------- |
| 135 | + params : dict or list of dict, default={} |
| 136 | + Parameters to create testing instances of the class. |
| 137 | + Each dict are parameters to construct an "interesting" test instance, i.e., |
| 138 | + `MyClass(**params)` or `MyClass(**params[i])` creates a valid test instance. |
| 139 | + `create_test_instance` uses the first (or only) dictionary in `params`. |
| 140 | + """ |
| 141 | + _check_soft_dependencies(*cls._tags["python_dependencies"]) |
| 142 | + |
| 143 | + return { |
| 144 | + "window_size": 10, |
| 145 | + "ignore_trivial": True, |
| 146 | + "normalize": True, |
| 147 | + "p": 2.0, |
| 148 | + "k": 1, |
| 149 | + } |
0 commit comments