Skip to content


[ENH] ClaSP: Adds parallelization for distance computations and numba…
Browse files Browse the repository at this point in the history
…rize function calls (#1692)

A performance enhancement of the ClaSP implementation.
- Parallelizing ClaSP distance computations
- Numbarizing methods.
- exposing n_jobs to ClaSP-Constructor class
- convert ClaSPTransformer to BaseSeriesTransformer
  • Loading branch information
patrickzib authored Jun 21, 2024
1 parent 710293e commit 2b48e53
Show file tree
Hide file tree
Showing 6 changed files with 229 additions and 112 deletions.
15 changes: 10 additions & 5 deletions aeon/segmentation/
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import pandas as pd

from aeon.segmentation.base import BaseSegmenter
from aeon.transformations.clasp import ClaSPTransformer
from aeon.transformations.series import ClaSPTransformer

def find_dominant_window_sizes(X, offset=0.05):
Expand Down Expand Up @@ -66,7 +66,7 @@ def _is_trivial_match(candidate, change_points, n_timepoints, exclusion_radius=0
List of change points chosen so far
n_timepoints : int
Total length
exclusion_radius : int
exclusion_radius : float
Exclusion Radius for change points to be non-trivial matches
Expand Down Expand Up @@ -97,7 +97,7 @@ def _segmentation(X, clasp, n_change_points=None, exclusion_radius=0.05):
the transformer
n_change_points : int
the number of change points to find
exclusion_radius :
exclusion_radius : float
the exclusion zone
Expand Down Expand Up @@ -183,6 +183,8 @@ class ClaSPSegmenter(BaseSegmenter):
The number of change points to search.
exclusion_radius : int
Exclusion Radius for change points to be non-trivial matches.
n_jobs : int, default=1
Number of jobs to be used.
Expand All @@ -206,10 +208,11 @@ class ClaSPSegmenter(BaseSegmenter):

_tags = {"fit_is_empty": True} # for unit test cases

def __init__(self, period_length=10, n_cps=1, exclusion_radius=0.05):
def __init__(self, period_length=10, n_cps=1, exclusion_radius=0.05, n_jobs=1):
self.period_length = int(period_length)
self.n_cps = n_cps
self.exclusion_radius = exclusion_radius
self.n_jobs = n_jobs
super().__init__(axis=1, n_segments=n_cps + 1)

def _predict(self, X: np.ndarray):
Expand Down Expand Up @@ -263,7 +266,9 @@ def get_fitted_params(self):

def _run_clasp(self, X):
clasp_transformer = ClaSPTransformer(
window_length=self.period_length, exclusion_radius=self.exclusion_radius

self.found_cps, self.profiles, self.scores = _segmentation(
Expand Down
2 changes: 2 additions & 0 deletions aeon/transformations/series/
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Expand All @@ -16,6 +17,7 @@
from aeon.transformations.series._clasp import ClaSPTransformer
from aeon.transformations.series._clear_sky import ClearSkyTransformer
from aeon.transformations.series._dobin import Dobin
from aeon.transformations.series._matrix_profile import MatrixProfileSeriesTransformer
Expand Down
178 changes: 101 additions & 77 deletions aeon/transformations/ → aeon/transformations/series/
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
import warnings

import numpy as np
import numpy.fft as fft
import pandas as pd
from numba import njit
from numba import njit, objmode, prange

from aeon.transformations.base import BaseTransformer
from aeon.transformations.collection.matrix_profile import _sliding_dot_products
from aeon.transformations.series.base import BaseSeriesTransformer

def _sliding_window(X, m):
Expand All @@ -45,6 +45,34 @@ def _sliding_window(X, m):
return np.lib.stride_tricks.as_strided(X, shape=shape, strides=strides)

@njit(fastmath=True, cache=True)
def _sliding_dot_product(query, time_series):
m = len(query)
n = len(time_series)

time_series_add = 0
if n % 2 == 1:
time_series = np.concatenate((np.array([0]), time_series))
time_series_add = 1

q_add = 0
if m % 2 == 1:
query = np.concatenate((np.array([0]), query))
q_add = 1

query = query[::-1]

query = np.concatenate((query, np.zeros(n - m + time_series_add - q_add)))

trim = m - 1 + time_series_add

with objmode(dot_product="float64[:]"):
dot_product = fft.irfft(fft.rfft(time_series) * fft.rfft(query))

return dot_product[trim:]

@njit(fastmath=True, cache=True)
def _sliding_mean_std(X, m):
"""Return the sliding mean and std for a time series and a window size.
Expand All @@ -60,20 +88,20 @@ def _sliding_mean_std(X, m):
Tuple (float, float)
The moving mean and moving std
s = np.insert(np.cumsum(X), 0, 0)
sSq = np.insert(np.cumsum(X**2), 0, 0)
s = np.concatenate((np.zeros(1, dtype=np.float64), np.cumsum(X)))
sSq = np.concatenate((np.zeros(1, dtype=np.float64), np.cumsum(X**2)))
segSum = s[m:] - s[:-m]
segSumSq = sSq[m:] - sSq[:-m]
movmean = segSum / m
movstd = np.sqrt(np.clip(segSumSq / m - (segSum / m) ** 2, 0, None)) # at least 0

# avoid dividing by too small std, like 0
movstd = np.where(abs(movstd) < 0.001, 1, movstd)

movstd = np.sqrt(np.clip(segSumSq / m - (segSum / m) ** 2, 0, None))
movstd = np.where(np.abs(movstd) < 0.001, 1, movstd)
return [movmean, movstd]

def _compute_distances_iterative(X, m, k):
@njit(fastmath=True, cache=True, parallel=True)
def _compute_distances_iterative(X, m, k, n_jobs=1, slack=0.5):
"""Compute kNN indices with dot-product.
No-loops implementation for a time series, given
Expand All @@ -87,52 +115,61 @@ def _compute_distances_iterative(X, m, k):
The window size to generate sliding windows
k : int
The number of nearest neighbors
n_jobs : int, default=1
Number of jobs to be used.
slack: float
Defines an exclusion zone around each subsequence to avoid trivial matches.
Defined as percentage of m. E.g. 0.5 is equal to half the window length.
knns : array-like, shape = [n-m+1, k], dtype=int
The knns (offsets!) for each subsequence in X
length = len(X) - m + 1
knns = np.zeros(shape=(length, k), dtype=np.int64)
n = np.int32(X.shape[0] - m + 1)
halve_m = int(m * slack)

dot_prev = None
means, stds = _sliding_mean_std(X, m)
knns = np.zeros(shape=(n, k), dtype=np.int64)

for order in range(0, length):
# first iteration O(n log n)
if order == 0:
# dot_first = _sliding_dot_product(X[:m], X)
dot_first = _sliding_dot_products(X[:m], X, len(X[:m]), len(X))
dot_rolled = dot_first
# O(1) further operations
dot_rolled = (
np.roll(dot_prev, 1)
+ X[order + m - 1] * X[m - 1 : length + m]
- X[order - 1] * np.roll(X[:length], 1)
means, stds = _sliding_mean_std(X, m)
dot_first = _sliding_dot_product(X[:m], X)
bin_size = X.shape[0] // n_jobs

for idx in prange(n_jobs):
start = idx * bin_size
end = min((idx + 1) * bin_size, X.shape[0] - m + 1)

dot_prev = None
for order in np.arange(start, end):
if order == start:
# first iteration O(n log n)
dot_rolled = _sliding_dot_product(X[start : start + m], X)
# constant time O(1) operations
dot_rolled = (
np.roll(dot_prev, 1)
+ X[order + m - 1] * X[m - 1 : n + m]
- X[order - 1] * np.roll(X[:n], 1)
dot_rolled[0] = dot_first[order]

x_mean = means[order]
x_std = stds[order]

dist = 2 * m * (1 - (dot_rolled - m * means * x_mean) / (m * stds * x_std))

# self-join: exclusion zone
trivialMatchRange = (
int(max(0, order - halve_m)),
int(min(order + halve_m + 1, n)),
dot_rolled[0] = dot_first[order]
dist[trivialMatchRange[0] : trivialMatchRange[1]] = np.inf
dot_prev = dot_rolled

x_mean = means[order]
x_std = stds[order]

dist = 2 * m * (1 - (dot_rolled - m * means * x_mean) / (m * stds * x_std))

# self-join: exclusion zone
trivialMatchRange = (
int(max(0, order - np.round(m / 2, 0))),
int(min(order + np.round(m / 2 + 1, 0), length)),
dist[trivialMatchRange[0] : trivialMatchRange[1]] = np.inf

if len(dist) >= k:
idx = np.argpartition(dist, k)
idx = np.arange(len(dist))

knns[order, :] = idx[:k]
dot_prev = dot_rolled
if dist.shape[0] >= k:
knns[order] = np.argpartition(dist, k)[:k]
knns[order] = np.arange(dist.shape[0], dtype=np.int64)

return knns

Expand Down Expand Up @@ -316,6 +353,7 @@ def clasp(
"""Calculate ClaSP for a time series and a window size.
Expand All @@ -333,25 +371,27 @@ def clasp(
Interpolate the profile
exclusion_radius : int
Blind spot of the profile to the corners
n_jobs : int
Number of jobs to be used.
Tuple (array-like of shape [n], array-like of shape [k_neighbours, n])
The ClaSP and the knn_mask
knn_mask = _compute_distances_iterative(X, m, k_neighbours).T
knn_mask = _compute_distances_iterative(X, m, k_neighbours, n_jobs=n_jobs).T

n_timepoints = knn_mask.shape[1]
exclusion_zone = max(m, np.int64(n_timepoints * exclusion_radius))

profile = _calc_profile(m, knn_mask, score, exclusion_zone)

if interpolate is True:
if interpolate:
profile = pd.Series(profile).interpolate(limit_direction="both").to_numpy()
return profile, knn_mask

class ClaSPTransformer(BaseTransformer):
class ClaSPTransformer(BaseSeriesTransformer):
"""ClaSP (Classification Score Profile) Transformer.
Implementation of the Classification Score Profile of a time series.
Expand All @@ -368,6 +408,8 @@ class ClaSPTransformer(BaseTransformer):
the scoring metric to use in ClaSP - choose from ROC_AUC or F1
exclusion_radius : int
Exclusion Radius for change points to be non-trivial matches
n_jobs : int
Number of jobs to be used.
Expand All @@ -381,7 +423,7 @@ class ClaSPTransformer(BaseTransformer):
>>> from aeon.transformations.clasp import ClaSPTransformer
>>> from aeon.transformations.series import ClaSPTransformer
>>> from aeon.segmentation import find_dominant_window_sizes
>>> from aeon.datasets import load_electric_devices_segmentation
>>> X, true_period_size, true_cps = load_electric_devices_segmentation()
Expand All @@ -392,23 +434,25 @@ class ClaSPTransformer(BaseTransformer):

_tags = {
"input_data_type": "Series",
# what is the abstract type of X: Series, or Panel
"output_data_type": "Series",
# what abstract type is returned: Primitives, Series, Panel
"instancewise": True,
"X_inner_type": "np.ndarray",
"y_inner_type": "None",
"capability:multivariate": False,
"fit_is_empty": True,
"requires_y": False,
"capability:inverse_transform": False,

def __init__(
self, window_length=10, scoring_metric="ROC_AUC", exclusion_radius=0.05
self.window_length = int(window_length)
self.scoring_metric = scoring_metric
self.exclusion_radius = exclusion_radius
self.n_jobs = n_jobs

def _transform(self, X, y=None):
"""Compute ClaSP.
Expand Down Expand Up @@ -443,6 +487,7 @@ def _transform(self, X, y=None):

return Xt
Expand Down Expand Up @@ -470,24 +515,3 @@ def _check_scoring_metric(self, scoring_metric):
return _roc_auc_score
elif scoring_metric == "F1":
return _binary_f1_score

def get_test_params(cls, parameter_set="default"):
"""Return testing parameter settings for the estimator.
parameter_set : str, default="default"
Name of the set of test parameters to return, for use in tests. If no
special parameters are defined for a value, will return `"default"` set.
params : dict or list of dict, default = {}
Parameters to create testing instances of the class
Each dict are parameters to construct an "interesting" test instance, i.e.,
`MyClass(**params)` or `MyClass(**params[i])` creates a valid test instance.
`create_test_instance` uses the first (or only) dictionary in `params`
return {"window_length": 5}
2 changes: 1 addition & 1 deletion docs/api_reference/transformations.rst
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ Outlier detection, changepoint detection


.. currentmodule:: aeon.transformations.clasp
.. currentmodule:: aeon.transformations.series._clasp

.. autosummary::
:toctree: auto_generated/
Expand Down

0 comments on commit 2b48e53

Please sign in to comment.