From 8321a6b05bb9247e1731e90c396743511777f706 Mon Sep 17 00:00:00 2001 From: Niclas Rieger Date: Sun, 25 Aug 2024 12:20:18 +0200 Subject: [PATCH 1/3] refactor(whitener): reuse utility functions --- xeofs/preprocessing/whitener.py | 203 +++++++++++++++++++++++++++----- 1 file changed, 173 insertions(+), 30 deletions(-) diff --git a/xeofs/preprocessing/whitener.py b/xeofs/preprocessing/whitener.py index e983a3e..c81c60e 100644 --- a/xeofs/preprocessing/whitener.py +++ b/xeofs/preprocessing/whitener.py @@ -1,5 +1,7 @@ +import warnings from typing import Dict, Optional +import numpy as np import xarray as xr from typing_extensions import Self @@ -9,21 +11,28 @@ Dims, DimsList, ) +from ..utils.linalg import fractional_matrix_power from ..utils.sanity_checks import assert_single_dataarray from .transformer import Transformer class Whitener(Transformer): - """Whiten a 2D DataArray matrix using PCA. + """Fractional whitening of 2D DataArray. + + For large number of features, use PCA to reduce the dimensionality of the data before whitening. + + Note that for ``alpha=1`` (no whiteneing) and ``use_pca=False``, it just becomes the identity transformation. Parameters ---------- - n_modes: int | float - If int, number of components to keep. If float, fraction of variance to keep. + alpha: float, default=0 + Power parameter to perform fractional whitening, where 0 corresponds to full whitening (standardized and decorrelated) and 1 to no whitening. + use_pca: bool, default=False + If True, perform PCA before whitening to speed up the computation. This is the recommended setting for large number of features. Specify the number of components to keep in `n_modes`. + n_modes: int | float | str, default=None + If int, number of components to keep. If float, fraction of variance to keep. If `n_modes="all"`, keep all components. init_rank_reduction: float, default=0.3 Used only when `n_modes` is given as a float. Specifiy the initial PCA rank reduction before truncating the solution to the desired fraction of explained variance. Must be in the half open interval ]0, 1]. Lower values will speed up the computation. - alpha: float, default=0.0 - Power parameter to perform fractional whitening, where 0 corresponds to full PCA whitening and 1 to PCA without whitening. sample_name: str, default="sample" Name of the sample dimension. feature_name: str, default="feature" @@ -35,9 +44,10 @@ class Whitener(Transformer): def __init__( self, - n_modes: int | float, - init_rank_reduction: float = 0.3, alpha: float = 0.0, + use_pca: bool = False, + n_modes: int | float | str = "all", + init_rank_reduction: float = 0.3, sample_name: str = "sample", feature_name: str = "feature", solver_kwargs: Dict = {}, @@ -48,11 +58,25 @@ def __init__( if alpha < 0: raise ValueError("`alpha` must be greater than or equal to 0") + alpha = float(alpha) + + self.alpha = alpha + self.use_pca = use_pca self.n_modes = n_modes self.init_rank_reduction = init_rank_reduction - self.alpha = alpha self.solver_kwargs = solver_kwargs + # Check whether Whitener is identity transformation + self.is_identity = self._check_identity_transform() + + def _check_identity_transform(self) -> bool: + eps = np.finfo(self.alpha).eps + alpha_is_one = (1.0 - self.alpha) < eps + if not self.use_pca and alpha_is_one: + return True + else: + return False + def _sanity_check_input(self, X) -> None: assert_single_dataarray(X) @@ -66,8 +90,27 @@ def _sanity_check_input(self, X) -> None: ) ) + def _get_n_modes(self, X: xr.DataArray) -> int | float: + if isinstance(self.n_modes, str): + if self.n_modes == "all": + return min(X.shape) + else: + raise ValueError("`n_modes` must be an integer, float or 'all'") + else: + return self.n_modes + def get_serialization_attrs(self) -> Dict: - return dict(n_modes=self.n_modes, alpha=self.alpha) + return dict( + alpha=self.alpha, + n_modes=self.n_modes, + use_pca=self.use_pca, + is_identity=self.is_identity, + s=self.s, + T=self.T, + Tinv=self.Tinv, + feature_name=self.feature_name, + n_samples=self.n_samples, + ) def fit( self, @@ -76,27 +119,113 @@ def fit( feature_dims: Optional[DimsList] = None, ) -> Self: self._sanity_check_input(X) + n_samples, n_features = X.shape + self.n_samples = n_samples - decomposer = Decomposer( - n_modes=self.n_modes, - init_rank_reduction=self.init_rank_reduction, - **self.solver_kwargs, - ) - decomposer.fit(X, dims=(self.sample_name, self.feature_name)) + if self.is_identity: + self.T = xr.DataArray(1, name="identity") + self.Tinv = xr.DataArray(1, name="identity") + self.s = xr.DataArray(1, name="identity") - self.U = decomposer.U_ - self.s = decomposer.s_ - self.V = decomposer.V_ + else: + if self.use_pca: + # In case of "all" modes to the rank of the input data + self.n_modes = self._get_n_modes(X) + + decomposer = Decomposer( + n_modes=self.n_modes, + init_rank_reduction=self.init_rank_reduction, + **self.solver_kwargs, + ) + decomposer.fit(X, dims=(self.sample_name, self.feature_name)) + s = decomposer.s_ + V = decomposer.V_ + + n_c = np.sqrt(n_samples - 1) + self.T: DataArray = V * (s / n_c) ** (self.alpha - 1) + self.Tinv = (s / n_c) ** (1 - self.alpha) * V.conj().T + self.s = s + + # Without PCA compute the fractional whitening transformation directly based on covariance matrix + else: + if n_samples < n_features: + warnings.warn( + f"The number of samples ({n_samples}) is smaller than the number of features ({n_features}), leading to an ill-conditioned problem. This may cause unstable results. Consider using PCA to reduce dimensionality and stabilize the problem by setting `use_pca=True`." + ) + + self.T, self.Tinv = self._compute_whitener_transform(X) + self.s = xr.DataArray(1, name="identity") return self + def _compute_whitener_transform( + self, X: xr.DataArray + ) -> tuple[DataArray, DataArray]: + T, Tinv = xr.apply_ufunc( + self._compute_whitener_transform_numpy, + X, + input_core_dims=[[self.sample_name, self.feature_name]], + output_core_dims=[[self.feature_name, "mode"], ["mode", self.feature_name]], + dask="allowed", + ) + T = T.assign_coords(mode=X.coords[self.feature_name].data) + Tinv = Tinv.assign_coords(mode=X.coords[self.feature_name].data) + return T, Tinv + + def _compute_whitener_transform_numpy(self, X): + nc = X.shape[0] - 1 + C = X.conj().T @ X / nc + power = (self.alpha - 1) / 2 + T = fractional_matrix_power(C, power) + Tinv = np.linalg.inv(T) + return T, Tinv + + def _fractional_matrix_power(self, C, power): + V, s, _ = np.linalg.svd(C) + + # cut off small singular values + is_above_zero = s > np.finfo(s.dtype).eps + V = V[:, is_above_zero] + s = s[is_above_zero] + + # TODO: use hermitian=True for numpy>=2.0 + # V, s, _ = np.linalg.svd(C, hermitian=True) + C_scaled = V @ np.diag(s**power) @ V.conj().T + if np.iscomplexobj(C): + return C_scaled + else: + return C_scaled.real + + def get_Tinv(self, unwhiten_only=False) -> xr.DataArray: + """Get the inverse transformation to unwhiten the data without PC transform. + + In contrast to `inverse_transform()`, this method returns the inverse transformation matrix without the PC transformation. That is, for PC transormed data this transformation only unwhitens the data without transforming back into the input space. For non-PC transformed data, this transformation is equivalent to the inverse transformation. + """ + if self.use_pca and unwhiten_only: + n_c = np.sqrt(self.n_samples - 1) + Tinv = (self.s / n_c) ** (1 - self.alpha) + Tinv = xr.apply_ufunc( + np.diag, + Tinv, + input_core_dims=[["mode"]], + output_core_dims=[[self.feature_name, "mode"]], + dask="allowed", + ) + Tinv = Tinv.assign_coords({self.feature_name: self.s.coords["mode"].data}) + return Tinv + else: + return self.Tinv + def transform(self, X: xr.DataArray) -> DataArray: """Transform new data into the fractional whitened PC space.""" self._sanity_check_input(X) - - scores = xr.dot(X, self.V, dims=self.feature_name) * self.s ** (self.alpha - 1) - return scores.rename({"mode": self.feature_name}) + if self.is_identity: + return X + else: + transformed = xr.dot(X, self.T, dims=self.feature_name) + transformed.name = X.name + return transformed.rename({"mode": self.feature_name}) def fit_transform( self, @@ -106,20 +235,34 @@ def fit_transform( ) -> DataArray: return self.fit(X, sample_dims, feature_dims).transform(X) - def inverse_transform_data(self, X: DataArray) -> DataArray: - """Transform 2D data (sample x feature) from whitened PC space back into original space.""" + def inverse_transform_data(self, X: DataArray, unwhiten_only=False) -> DataArray: + """Transform 2D data (sample x feature) from whitened PC space back into original space. - X = X.rename({self.feature_name: "mode"}) - X_unwhitened = X * self.s ** (1 - self.alpha) - return xr.dot(X_unwhitened, self.V.conj().T, dims="mode") + Parameters + ---------- + X: xr.DataArray + Data to transform back into original space. + unwhiten_only: bool, default=False + If True, only unwhiten the data without transforming back into the input space. This is useful when the data was transformed with PCA before whitening and you need the unwhitened data in the PC space. + """ + T_inv = self.get_Tinv(unwhiten_only=unwhiten_only) + if self.is_identity: + return X + else: + X = X.rename({self.feature_name: "mode"}) + return xr.dot(X, T_inv, dims="mode") def inverse_transform_components(self, X: DataArray) -> DataArray: """Transform 2D components (feature x mode) from whitened PC space back into original space.""" - dummy_dim = "dummy_dim" - comps_pc_space = X.rename({self.feature_name: dummy_dim}) - V = self.V.rename({"mode": dummy_dim}) - return xr.dot(comps_pc_space, V.conj().T, dims=dummy_dim) + if self.is_identity: + return X + else: + dummy_dim = "dummy_dim" + comps_pc_space = X.rename({self.feature_name: dummy_dim}) + VS = self.Tinv.conj().T + VS = VS.rename({"mode": dummy_dim}) + return xr.dot(VS, comps_pc_space, dims=dummy_dim) def inverse_transform_scores(self, X: DataArray) -> DataArray: """Transform 2D scores (sample x mode) from whitened PC space back into original space.""" From 5b9b9a2ed135897d06fb4c552be04785d00a3920 Mon Sep 17 00:00:00 2001 From: Niclas Rieger Date: Sun, 25 Aug 2024 12:38:03 +0200 Subject: [PATCH 2/3] refactor(whitener): add fractional matrix power utility function --- xeofs/utils/linalg.py | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 xeofs/utils/linalg.py diff --git a/xeofs/utils/linalg.py b/xeofs/utils/linalg.py new file mode 100644 index 0000000..dba3495 --- /dev/null +++ b/xeofs/utils/linalg.py @@ -0,0 +1,30 @@ +import numpy as np + + +def fractional_matrix_power(C, power): + """Compute the fractional matrix power of a symmetric matrix using SVD. + + Note: This function is a simplified version of the fractional_matrix_power + function from the scipy library. However, the scipy function does not + support dask arrays due to the use of np.asarray. + """ + if C.shape[0] != C.shape[1]: + raise ValueError("Matrix must be square.") + + V, s, _ = np.linalg.svd(C) + + # cut off small singular values + is_above_zero = s > np.finfo(s.dtype).eps + V = V[:, is_above_zero] + s = s[is_above_zero] + + # TODO: use hermitian=True for numpy>=2.0 + # V, s, _ = np.linalg.svd(C, hermitian=True) + C_scaled = V @ np.diag(s**power) @ V.conj().T + + # Even if the input matrix is real and symmetric, the output matrix might + # be complex due to numerical errors. In this case, we return the real part + if np.iscomplexobj(C): + return C_scaled + else: + return C_scaled.real From 80fb9a7970caa92f8640ddf0b862abf32e7dab80 Mon Sep 17 00:00:00 2001 From: Niclas Rieger Date: Sun, 25 Aug 2024 12:41:59 +0200 Subject: [PATCH 3/3] test(whitener): add additional tests for Whitener class --- tests/preprocessing/test_whitener.py | 243 +++++++++++++++++++++++---- 1 file changed, 209 insertions(+), 34 deletions(-) diff --git a/tests/preprocessing/test_whitener.py b/tests/preprocessing/test_whitener.py index 1180806..a1511b9 100644 --- a/tests/preprocessing/test_whitener.py +++ b/tests/preprocessing/test_whitener.py @@ -1,5 +1,6 @@ import math +import numpy as np import pytest import xarray as xr @@ -32,27 +33,43 @@ ] +def generate_well_conditioned_data(lazy=False): + t = np.linspace(0, 50, 200) + std = 0.1 + X = np.sin(t)[:, None] + np.random.normal(0, std, size=(200, 3)) + X[:, 1] = X[:, 1] ** 3 + X[:, 2] = abs(X[:, 2]) ** (0.5) + X = xr.DataArray( + X, + dims=["sample", "feature"], + coords={"sample": np.arange(200), "feature": np.arange(3)}, + name="X", + ) + X = X - X.mean("sample") + if lazy: + X = X.chunk({"sample": 5, "feature": -1}) + return X + + # TESTS # ============================================================================= @pytest.mark.parametrize( - "synthetic_dataarray", - VALID_TEST_DATA, - indirect=["synthetic_dataarray"], + "lazy", + [False, True], ) -def test_fit(synthetic_dataarray): - data = synthetic_dataarray.rename({"sample0": "sample", "feature0": "feature"}) +def test_fit(lazy): + data = generate_well_conditioned_data(lazy) whitener = Whitener(n_modes=2) whitener.fit(data) @pytest.mark.parametrize( - "synthetic_dataarray", - VALID_TEST_DATA, - indirect=["synthetic_dataarray"], + "lazy", + [False, True], ) -def test_transform(synthetic_dataarray): - data = synthetic_dataarray.rename({"sample0": "sample", "feature0": "feature"}) +def test_transform(lazy): + data = generate_well_conditioned_data(lazy) whitener = Whitener(n_modes=2) whitener.fit(data) @@ -73,12 +90,11 @@ def test_transform(synthetic_dataarray): @pytest.mark.parametrize( - "synthetic_dataarray", - VALID_TEST_DATA, - indirect=["synthetic_dataarray"], + "lazy", + [False, True], ) -def test_fit_transform(synthetic_dataarray): - data = synthetic_dataarray.rename({"sample0": "sample", "feature0": "feature"}) +def test_fit_transform(lazy): + data = generate_well_conditioned_data(lazy) whitener = Whitener(n_modes=2) @@ -98,12 +114,11 @@ def test_fit_transform(synthetic_dataarray): @pytest.mark.parametrize( - "synthetic_dataarray", - VALID_TEST_DATA, - indirect=["synthetic_dataarray"], + "lazy", + [False, True], ) -def test_invserse_transform_data(synthetic_dataarray): - data = synthetic_dataarray.rename({"sample0": "sample", "feature0": "feature"}) +def test_invserse_transform_data(lazy): + data = generate_well_conditioned_data(lazy) whitener = Whitener(n_modes=2) whitener.fit(data) @@ -123,17 +138,26 @@ def test_invserse_transform_data(synthetic_dataarray): @pytest.mark.parametrize( - "alpha", - [0.0, 0.5, 1.0, 1.5], + "alpha,use_pca", + [ + (0.0, False), + (0.5, False), + (1.0, False), + (1.5, False), + (0.0, True), + (0.5, True), + (1.0, True), + (1.5, True), + ], ) -def test_transform_alpha(alpha): - data = generate_synthetic_dataarray(1, 1, "index", "no_nan", "no_dask") - data = data.rename({"sample0": "sample", "feature0": "feature"}) +def test_transform_alpha(alpha, use_pca): + data = data = generate_well_conditioned_data() - whitener = Whitener(n_modes=2, alpha=alpha) + whitener = Whitener(alpha=alpha, use_pca=use_pca) data_whitened = whitener.fit_transform(data) - norm = (data_whitened**2).sum("sample") + nc = data.shape[0] - 1 + norm = (data_whitened**2).sum("sample") / nc ones = norm / norm # Check that for alpha=0 full whitening is performed if math.isclose(alpha, 0.0, abs_tol=1e-6): @@ -141,14 +165,22 @@ def test_transform_alpha(alpha): @pytest.mark.parametrize( - "alpha", - [0.0, 0.5, 1.0, 1.5], + "alpha,use_pca", + [ + (0.0, False), + (0.5, False), + (1.0, False), + (1.5, False), + (0.0, True), + (0.5, True), + (1.0, True), + (1.5, True), + ], ) -def test_invserse_transform_alpha(alpha): - data = generate_synthetic_dataarray(1, 1, "index", "no_nan", "no_dask") - data = data.rename({"sample0": "sample", "feature0": "feature"}) - - whitener = Whitener(n_modes=6, alpha=alpha) +def test_inverse_transform_alpha(alpha, use_pca): + # Use data with more samples than features and high signal-to-noise ratio + data = generate_well_conditioned_data() + whitener = Whitener(alpha=alpha, use_pca=use_pca) data_whitened = whitener.fit_transform(data) data_unwhitened = whitener.inverse_transform_data(data_whitened) @@ -162,3 +194,146 @@ def test_invalid_alpha(): err_msg = "`alpha` must be greater than or equal to 0" with pytest.raises(ValueError, match=err_msg): Whitener(n_modes=2, alpha=-1.0) + + +def test_raise_warning_ill_conditioned(): + # Synthetic dataset has 6 samples and 7 features + data = generate_synthetic_dataarray(1, 1, "index", "no_nan", "no_dask") + data = data.rename({"sample0": "sample", "feature0": "feature"}) + + whitener = Whitener() + warn_msg = "The number of samples (.*) is smaller than the number of features (.*), leading to an ill-conditioned problem.*" + with pytest.warns(match=warn_msg): + _ = whitener.fit_transform(data) + + +@pytest.mark.parametrize( + "n_modes", + [1, 2, 3], +) +def test_transform_pca_n_modes(n_modes): + data = generate_well_conditioned_data() + + whitener = Whitener(use_pca=True, n_modes=n_modes) + transformed = whitener.fit_transform(data) + + # PCA-Whitener reduces dimensionality + assert transformed.shape[1] == n_modes + + +def test_whitener_identity_transformation(): + data = generate_well_conditioned_data() + + whitener = Whitener(alpha=1.0, use_pca=False) + transformed = whitener.fit_transform(data) + reconstructed = whitener.inverse_transform_data(transformed) + + # No transformation takes place + xr.testing.assert_identical(data, transformed) + xr.testing.assert_identical(data, reconstructed) + + +@pytest.mark.parametrize( + "alpha,use_pca", + [ + (0.0, True), + (0.5, True), + (1.0, True), + (1.5, True), + (0.0, False), + (0.5, False), + (1.0, False), + (1.5, False), + ], +) +def test_unwhiten_cross_covariance_matrix(alpha, use_pca): + def unwhiten_cov_mat(Cw, S1_inv, S2_inv): + n_dims_s1 = len(S1_inv.shape) + n_dims_s2 = len(S2_inv.shape) + + match n_dims_s1: + case 0: + C_unwhitened = Cw + case 1: + C_unwhitened = S1_inv[:, None] * Cw + case 2: + C_unwhitened = S1_inv @ Cw + case _: + raise ValueError("Invalid number of dimensions for S1_inv") + + match n_dims_s2: + case 0: + pass + case 1: + C_unwhitened = C_unwhitened * S2_inv[None, :] + case 2: + C_unwhitened = C_unwhitened @ S2_inv + case _: + raise ValueError("Invalid number of dimensions for S2_inv") + + return C_unwhitened + + """Test that we can uncover the original total amount of covariance between two datasets after whitening.""" + data1 = generate_well_conditioned_data() + data2 = generate_well_conditioned_data() ** 2 + + whitener1 = Whitener(alpha=0.5, use_pca=True, n_modes="all") + whitener2 = Whitener(alpha=alpha, use_pca=use_pca, n_modes="all") + + transformed1 = whitener1.fit_transform(data1) + transformed2 = whitener2.fit_transform(data2) + + S1_inv = whitener1.get_Tinv(unwhiten_only=True).values + S2_inv = whitener2.get_Tinv(unwhiten_only=True).values + + C = data1.values.T @ data2.values + Cw = transformed1.values.T @ transformed2.values + + C_rec = unwhiten_cov_mat(Cw, S1_inv, S2_inv) + + total_covariance = np.linalg.norm(C) + total_covariance_rec = np.linalg.norm(C_rec) + np.testing.assert_almost_equal(total_covariance, total_covariance_rec) + + +@pytest.mark.parametrize( + "use_pca", + [True, False], +) +def test_standardize_and_decorrelate(use_pca): + X = generate_well_conditioned_data() + n_features = X.shape[1] + + whitener = Whitener(alpha=0.0, use_pca=use_pca, n_modes="all") + transformed = whitener.fit_transform(X) + + # Check that transformed data is standardized + assert np.allclose(transformed.mean("sample").values, np.zeros(n_features)) + assert np.allclose(transformed.std("sample").values, np.ones(n_features), rtol=1e-2) + + # Check that transformed data is decorrelated + C = np.corrcoef(transformed.values, rowvar=False) + target = np.identity(n_features) + np.testing.assert_allclose(C, target, atol=1e-6) + + +@pytest.mark.parametrize( + "alpha,use_pca", + [ + (0.0, True), + (0.5, True), + (1.0, True), + (1.5, True), + (0.0, False), + (0.5, False), + (1.0, False), + (1.5, False), + ], +) +def test_transform_keep_coordinates(alpha, use_pca): + X = generate_well_conditioned_data() + + whitener = Whitener(alpha=alpha, use_pca=use_pca, n_modes="all") + transformed = whitener.fit_transform(X) + + assert len(transformed.coords) == len(X.coords)