From 148cc13b22091b70c62f7dfea086d9baaf636b9e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=98yvind=20Eide?= Date: Thu, 25 Jan 2024 15:51:37 +0100 Subject: [PATCH] Remove row scaling --- src/ert/analysis/__init__.py | 2 - src/ert/analysis/_es_update.py | 644 +++++++----------- src/ert/analysis/configuration.py | 168 ----- src/ert/analysis/row_scaling.py | 70 -- src/ert/analysis/update.py | 19 - src/ert/enkf_main.py | 1 - .../workflows/disable_parameters.py | 1 - .../analysis/test_es_update.py | 14 - tests/performance_tests/test_memory_usage.py | 6 +- .../0/update_log | 10 +- .../test_update_report/0-False/update_log | 1 - .../test_update_report/0-True/update_log | 1 - tests/unit_tests/analysis/test_es_update.py | 224 +----- tests/unit_tests/analysis/test_row_scaling.py | 136 ---- .../test_row_scaling_increased_belief_case.py | 148 ---- .../analysis/test_update_configuration.py | 156 ----- .../scenarios/test_summary_response.py | 24 +- 17 files changed, 269 insertions(+), 1356 deletions(-) delete mode 100644 src/ert/analysis/configuration.py delete mode 100644 src/ert/analysis/row_scaling.py delete mode 100644 src/ert/analysis/update.py delete mode 100644 tests/unit_tests/analysis/test_row_scaling.py delete mode 100644 tests/unit_tests/analysis/test_row_scaling_increased_belief_case.py delete mode 100644 tests/unit_tests/analysis/test_update_configuration.py diff --git a/src/ert/analysis/__init__.py b/src/ert/analysis/__init__.py index cdc0cd99619..41d311511af 100644 --- a/src/ert/analysis/__init__.py +++ b/src/ert/analysis/__init__.py @@ -4,7 +4,6 @@ iterative_smoother_update, smoother_update, ) -from .configuration import UpdateConfiguration from .event import AnalysisEvent, AnalysisStatusEvent, AnalysisTimeEvent __all__ = [ @@ -13,7 +12,6 @@ "AnalysisTimeEvent", "ErtAnalysisError", "SmootherSnapshot", - "UpdateConfiguration", "smoother_update", "iterative_smoother_update", ] diff --git a/src/ert/analysis/_es_update.py b/src/ert/analysis/_es_update.py index ddf48d31a3f..e19a6c7c376 100644 --- a/src/ert/analysis/_es_update.py +++ b/src/ert/analysis/_es_update.py @@ -24,7 +24,6 @@ import xarray as xr from iterative_ensemble_smoother.experimental import ( AdaptiveESMDA, - ensemble_smoother_update_step_row_scaling, ) from ert.config import Field, GenKwConfig, SurfaceConfig @@ -32,13 +31,10 @@ from ..config.analysis_module import ESSettings, IESSettings from . import misfit_preprocessor from .event import AnalysisEvent, AnalysisStatusEvent, AnalysisTimeEvent -from .row_scaling import RowScaling -from .update import RowScalingParameter if TYPE_CHECKING: import numpy.typing as npt - from ert.analysis.configuration import UpdateConfiguration, UpdateStep from ert.storage import EnsembleAccessor, EnsembleReader _logger = logging.getLogger(__name__) @@ -170,58 +166,6 @@ def _all_parameters( return np.vstack(matrices) if matrices else None -def _get_param_with_row_scaling( - temp_storage: TempStorage, - parameter: RowScalingParameter, -) -> List[Tuple[npt.NDArray[np.double], RowScaling]]: - """The row-scaling functionality is implemented in C++ and is made - accessible through the pybind11 library. - pybind11 requires that numpy arrays passed to it are in - Fortran-contiguous order (column-major), which is different from - numpy's default row-major (C-contiguous) order. - To ensure compatibility, numpy arrays are explicitly converted to Fortran order. - It's important to note that if an array originally in C-contiguous - order is passed to a function expecting Fortran order, - pybind11 will automatically create a Fortran-ordered copy of the array. - """ - matrices = [] - - if parameter.index_list is None: - matrices.append( - ( - np.asfortranarray(temp_storage[parameter.name].astype(np.double)), - parameter.row_scaling, - ) - ) - else: - matrices.append( - ( - np.asfortranarray( - temp_storage[parameter.name][parameter.index_list, :].astype( - np.double - ) - ), - parameter.row_scaling, - ), - ) - - return matrices - - -def _save_to_temp_storage( - temp_storage: TempStorage, - parameter: RowScalingParameter, - A: Optional[npt.NDArray[np.double]], -) -> None: - if A is None: - return - active_indices = parameter.index_list - if active_indices is None: - temp_storage[parameter.name] = A - else: - temp_storage[parameter.name][active_indices, :] = A[active_indices, :] - - def _save_temp_storage_to_disk( target_fs: EnsembleAccessor, temp_storage: TempStorage, @@ -289,7 +233,7 @@ def _create_temporary_parameter_storage( def _get_obs_and_measure_data( source_fs: EnsembleReader, - selected_observations: List[Tuple[str, Optional[List[int]]]], + selected_observations: List[str], iens_active_index: npt.NDArray[np.int_], ) -> Tuple[ npt.NDArray[np.float_], @@ -302,15 +246,9 @@ def _get_obs_and_measure_data( observation_values = [] observation_errors = [] observations = source_fs.experiment.observations - for obs_key, obs_active_list in selected_observations: + for obs_key in selected_observations: observation = observations[obs_key] group = observation.attrs["response"] - if obs_active_list: - index = observation.coords.to_index()[obs_active_list] - sub_selection = { - name: list(set(index.get_level_values(name))) for name in index.names - } - observation = observation.sel(sub_selection) response = source_fs.load_responses(group, tuple(iens_active_index)) if "time" in observation.coords: response = response.reindex( @@ -349,9 +287,8 @@ def _load_observations_and_responses( std_cutoff: float, global_std_scaling: float, iens_ative_index: npt.NDArray[np.int_], - selected_observations: List[Tuple[str, Optional[List[int]]]], + selected_observations: List[str], misfit_process: bool, - update_step_name: str, ) -> Tuple[ npt.NDArray[np.float_], Tuple[ @@ -421,11 +358,6 @@ def _load_observations_and_responses( for missing_obs in obs_keys[~obs_mask]: _logger.warning(f"Deactivating observation: {missing_obs}") - if len(observations[obs_mask]) == 0: - raise ErtAnalysisError( - f"No active observations for update step: {update_step_name}" - ) - return S[obs_mask], ( observations[obs_mask], scaled_errors[obs_mask], @@ -463,48 +395,6 @@ def _split_by_batchsize( return np.array_split(arr, sections) -def _update_with_row_scaling( - update_step: UpdateStep, - source_fs: EnsembleReader, - target_fs: EnsembleAccessor, - iens_active_index: npt.NDArray[np.int_], - S: npt.NDArray[np.float_], - observation_errors: npt.NDArray[np.float_], - observation_values: npt.NDArray[np.float_], - truncation: float, - inversion_type: str, - progress_callback: Callable[[AnalysisEvent], None], - rng: Optional[np.random.Generator] = None, -) -> None: - for param_group in update_step.row_scaling_parameters: - source: Union[EnsembleReader, EnsembleAccessor] - if target_fs.has_parameter_group(param_group.name): - source = target_fs - else: - source = source_fs - temp_storage = _create_temporary_parameter_storage( - source, iens_active_index, param_group.name - ) - - # Initialize and run a smoother with row scaling - params_with_row_scaling = ensemble_smoother_update_step_row_scaling( - covariance=observation_errors**2, - observations=observation_values, - X_with_row_scaling=_get_param_with_row_scaling(temp_storage, param_group), - Y=S, - inversion=inversion_type, - truncation=truncation, - seed=rng, - ) - - # Store result - _save_to_temp_storage(temp_storage, param_group, params_with_row_scaling[0][0]) - progress_callback( - AnalysisStatusEvent(msg=f"Storing data for {param_group.name}..") - ) - _save_temp_storage_to_disk(target_fs, temp_storage, iens_active_index) - - def _copy_unupdated_parameters( all_parameter_groups: List[str], updated_parameter_groups: List[str], @@ -543,216 +433,186 @@ def _copy_unupdated_parameters( def analysis_ES( - updatestep: UpdateConfiguration, + parameters: List[str], + observations: List[str], rng: np.random.Generator, module: ESSettings, alpha: float, std_cutoff: float, global_scaling: float, - smoother_snapshot: SmootherSnapshot, ens_mask: npt.NDArray[np.bool_], source_fs: EnsembleReader, target_fs: EnsembleAccessor, + smoother_snapshot: SmootherSnapshot, progress_callback: Callable[[AnalysisEvent], None], misfit_process: bool, ) -> None: iens_active_index = np.flatnonzero(ens_mask) ensemble_size = ens_mask.sum() - updated_parameter_groups = [] - - for update_step in updatestep: - updated_parameter_groups.extend( - [param_group.name for param_group in update_step.parameters] - ) - progress_callback( - AnalysisStatusEvent(msg="Loading observations and responses..") - ) + progress_callback(AnalysisStatusEvent(msg="Loading observations and responses..")) + ( + S, ( - S, - ( - observation_values, - observation_errors, - update_snapshot, - ), - ) = _load_observations_and_responses( - source_fs, - alpha, - std_cutoff, - global_scaling, - iens_active_index, - update_step.observation_config(), - misfit_process, - update_step.name, - ) + observation_values, + observation_errors, + update_snapshot, + ), + ) = _load_observations_and_responses( + source_fs, + alpha, + std_cutoff, + global_scaling, + iens_active_index, + observations, + misfit_process, + ) + smoother_snapshot.update_step_snapshots = update_snapshot + num_obs = len(observation_values) - smoother_snapshot.update_step_snapshots[update_step.name] = update_snapshot + if num_obs == 0: + raise ErtAnalysisError("No active observations for update step") - num_obs = len(observation_values) + inversion_types = {0: "exact", 1: "subspace", 2: "subspace", 3: "subspace"} + try: + inversion_type = inversion_types[module.ies_inversion] + except KeyError as e: + raise ErtAnalysisError( + f"Mismatched inversion type for: " + f"Specified: {module.ies_inversion}, with possible: {inversion_types}" + ) from e - inversion_types = {0: "exact", 1: "subspace", 2: "subspace", 3: "subspace"} - try: - inversion_type = inversion_types[module.ies_inversion] - except KeyError as e: - raise ErtAnalysisError( - f"Mismatched inversion type for: " - f"Specified: {module.ies_inversion}, with possible: {inversion_types}" - ) from e + smoother_es = ies.ESMDA( + covariance=observation_errors**2, + observations=observation_values, + alpha=1, # The user is responsible for scaling observation covariance (esmda usage) + seed=rng, + inversion=inversion_type, + ) + truncation = module.enkf_truncation - smoother_es = ies.ESMDA( + if module.localization: + smoother_adaptive_es = AdaptiveESMDA( covariance=observation_errors**2, observations=observation_values, - alpha=1, # The user is responsible for scaling observation covariance (esmda usage) seed=rng, - inversion=inversion_type, ) - truncation = module.enkf_truncation - if module.localization: - smoother_adaptive_es = AdaptiveESMDA( - covariance=observation_errors**2, - observations=observation_values, - seed=rng, - ) + # Pre-calculate cov_YY + cov_YY = np.cov(S) - # Pre-calculate cov_YY - cov_YY = np.cov(S) + D = smoother_adaptive_es.perturb_observations( + ensemble_size=ensemble_size, alpha=1.0 + ) - D = smoother_adaptive_es.perturb_observations( - ensemble_size=ensemble_size, alpha=1.0 - ) + else: + # Compute transition matrix so that + # X_posterior = X_prior @ T + T = smoother_es.compute_transition_matrix(Y=S, alpha=1.0, truncation=truncation) + # Add identity in place for fast computation + np.fill_diagonal(T, T.diagonal() + 1) + for param_group in parameters: + source: Union[EnsembleReader, EnsembleAccessor] + if target_fs.has_parameter_group(param_group): + source = target_fs else: - # Compute transition matrix so that - # X_posterior = X_prior @ T - T = smoother_es.compute_transition_matrix( - Y=S, alpha=1.0, truncation=truncation - ) - # Add identity in place for fast computation - np.fill_diagonal(T, T.diagonal() + 1) - - for param_group in update_step.parameters: - source: Union[EnsembleReader, EnsembleAccessor] - if target_fs.has_parameter_group(param_group.name): - source = target_fs - else: - source = source_fs - temp_storage = _create_temporary_parameter_storage( - source, iens_active_index, param_group.name - ) - if module.localization: - num_params = temp_storage[param_group.name].shape[0] - - # Calculate adaptive batch size. - # Adaptive Localization calculates the cross-covariance between - # parameters and responses. - # Cross-covariance is a matrix with shape num_params x num_obs - # which may be larger than memory. - - # From `psutil` documentation: - # - available: - # the memory that can be given instantly to processes without the - # system going into swap. - # This is calculated by summing different memory values depending - # on the platform and it is supposed to be used to monitor actual - # memory usage in a cross platform fashion. - available_memory_bytes = psutil.virtual_memory().available - memory_safety_factor = 0.8 - bytes_in_float64 = 8 - batch_size = min( - int( - np.floor( - available_memory_bytes - * memory_safety_factor - / (num_obs * bytes_in_float64) - ) - ), - num_params, - ) - - batches = _split_by_batchsize(np.arange(0, num_params), batch_size) - - log_msg = f"Running localization on {num_params} parameters, {num_obs} responses, {ensemble_size} realizations and {len(batches)} batches" - _logger.info(log_msg) - progress_callback(AnalysisStatusEvent(msg=log_msg)) - - start = time.time() - for param_batch_idx in TimedIterator(batches, progress_callback): - X_local = temp_storage[param_group.name][param_batch_idx, :] - temp_storage[param_group.name][ - param_batch_idx, : - ] = smoother_adaptive_es.assimilate( - X=X_local, - Y=S, - D=D, - alpha=1.0, # The user is responsible for scaling observation covariance (esmda usage) - correlation_threshold=module.correlation_threshold, - cov_YY=cov_YY, - verbose=False, + source = source_fs + temp_storage = _create_temporary_parameter_storage( + source, iens_active_index, param_group + ) + if module.localization: + num_params = temp_storage[param_group].shape[0] + + # Calculate adaptive batch size. + # Adaptive Localization calculates the cross-covariance between + # parameters and responses. + # Cross-covariance is a matrix with shape num_params x num_obs + # which may be larger than memory. + + # From `psutil` documentation: + # - available: + # the memory that can be given instantly to processes without the + # system going into swap. + # This is calculated by summing different memory values depending + # on the platform and it is supposed to be used to monitor actual + # memory usage in a cross platform fashion. + available_memory_bytes = psutil.virtual_memory().available + memory_safety_factor = 0.8 + bytes_in_float64 = 8 + batch_size = min( + int( + np.floor( + available_memory_bytes + * memory_safety_factor + / (num_obs * bytes_in_float64) ) - _logger.info( - f"Adaptive Localization of {param_group} completed in {(time.time() - start) / 60} minutes" - ) - - else: - # Use low-level ies API to allow looping over parameters - if active_indices := param_group.index_list: - # The batch of parameters - X_local = temp_storage[param_group.name][active_indices, :] - - # Update manually using global transition matrix T - temp_storage[param_group.name][active_indices, :] = X_local @ T - - else: - # The batch of parameters - X_local = temp_storage[param_group.name] + ), + num_params, + ) - # Update manually using global transition matrix T - temp_storage[param_group.name] = X_local @ T + batches = _split_by_batchsize(np.arange(0, num_params), batch_size) - log_msg = f"Storing data for {param_group.name}.." + log_msg = f"Running localization on {num_params} parameters, {num_obs} responses, {ensemble_size} realizations and {len(batches)} batches" _logger.info(log_msg) progress_callback(AnalysisStatusEvent(msg=log_msg)) + start = time.time() - _save_temp_storage_to_disk(target_fs, temp_storage, iens_active_index) + for param_batch_idx in TimedIterator(batches, progress_callback): + X_local = temp_storage[param_group][param_batch_idx, :] + temp_storage[param_group][ + param_batch_idx, : + ] = smoother_adaptive_es.assimilate( + X=X_local, + Y=S, + D=D, + alpha=1.0, # The user is responsible for scaling observation covariance (esmda usage) + correlation_threshold=module.correlation_threshold, + cov_YY=cov_YY, + verbose=False, + ) _logger.info( - f"Storing data for {param_group.name} completed in {(time.time() - start) / 60} minutes" + f"Adaptive Localization of {param_group} completed in {(time.time() - start) / 60} minutes" ) - _copy_unupdated_parameters( - list(source_fs.experiment.parameter_configuration.keys()), - updated_parameter_groups, - iens_active_index, - source_fs, - target_fs, - ) + else: + # Use low-level ies API to allow looping over parameters + # The batch of parameters + X_local = temp_storage[param_group] + + # Update manually using global transition matrix T + temp_storage[param_group] = X_local @ T - _update_with_row_scaling( - update_step=update_step, - source_fs=source_fs, - target_fs=target_fs, - iens_active_index=iens_active_index, - S=S, - observation_errors=observation_errors, - observation_values=observation_values, - truncation=truncation, - inversion_type=inversion_type, - progress_callback=progress_callback, - rng=rng, + log_msg = f"Storing data for {param_group}.." + _logger.info(log_msg) + progress_callback(AnalysisStatusEvent(msg=log_msg)) + start = time.time() + _save_temp_storage_to_disk(target_fs, temp_storage, iens_active_index) + _logger.info( + f"Storing data for {param_group} completed in {(time.time() - start) / 60} minutes" ) + _copy_unupdated_parameters( + list(source_fs.experiment.parameter_configuration.keys()), + parameters, + iens_active_index, + source_fs, + target_fs, + ) + def analysis_IES( - update_config: UpdateConfiguration, + parameters: List[str], + observations: List[str], rng: np.random.Generator, analysis_config: IESSettings, alpha: float, std_cutoff: float, - smoother_snapshot: SmootherSnapshot, ens_mask: npt.NDArray[np.bool_], source_fs: EnsembleReader, target_fs: EnsembleAccessor, + smoother_snapshot: SmootherSnapshot, sies_smoother: Optional[ies.SIES], progress_callback: Callable[[AnalysisEvent], None], misfit_preprocessor: bool, @@ -760,7 +620,6 @@ def analysis_IES( initial_mask: npt.NDArray[np.bool_], ) -> ies.SIES: iens_active_index = np.flatnonzero(ens_mask) - updated_parameter_groups = [] # Pick out realizations that were among the initials that are still living # Example: initial_mask=[1,1,1,0,1], ens_mask=[0,1,1,0,1] # Then the result is [0,1,1,1] @@ -784,104 +643,83 @@ def analysis_IES( # It is not the iterations relating to IES or ESMDA. # It is related to functionality for turning on/off groups of parameters and observations. - for update_step in update_config: - updated_parameter_groups.extend( - [param_group.name for param_group in update_step.parameters] - ) - - progress_callback( - AnalysisStatusEvent(msg="Loading observations and responses..") - ) + progress_callback(AnalysisStatusEvent(msg="Loading observations and responses..")) + ( + S, ( - S, - ( - observation_values, - observation_errors, - update_snapshot, - ), - ) = _load_observations_and_responses( - source_fs, - alpha, - std_cutoff, - 1.0, - iens_active_index, - update_step.observation_config(), - misfit_preprocessor, - update_step.name, + observation_values, + observation_errors, + update_snapshot, + ), + ) = _load_observations_and_responses( + source_fs, + alpha, + std_cutoff, + 1.0, + iens_active_index, + observations, + misfit_preprocessor, + ) + smoother_snapshot.update_step_snapshots = update_snapshot + if len(observation_values) == 0: + raise ErtAnalysisError("No active observations for update step") + + # if the algorithm object is not passed, initialize it + if sies_smoother is None: + # The sies smoother must be initialized with the full parameter ensemble + # Get relevant active realizations + param_groups = list(source_fs.experiment.parameter_configuration.keys()) + parameter_ensemble_active = _all_parameters( + source_fs, iens_active_index, param_groups + ) + sies_smoother = ies.SIES( + parameters=parameter_ensemble_active, + covariance=observation_errors**2, + observations=observation_values, + seed=rng, + inversion=inversion_type, + truncation=analysis_config.enkf_truncation, ) - smoother_snapshot.update_step_snapshots[update_step.name] = update_snapshot - if len(observation_values) == 0: - raise ErtAnalysisError( - f"No active observations for update step: {update_step.name}." - ) + # Keep track of iterations to calculate step-lengths + sies_smoother.iteration = 1 - # if the algorithm object is not passed, initialize it - if sies_smoother is None: - # The sies smoother must be initialized with the full parameter ensemble - # Get relevant active realizations - param_groups = list(source_fs.experiment.parameter_configuration.keys()) - parameter_ensemble_active = _all_parameters( - source_fs, iens_active_index, param_groups - ) - sies_smoother = ies.SIES( - parameters=parameter_ensemble_active, - covariance=observation_errors**2, - observations=observation_values, - seed=rng, - inversion=inversion_type, - truncation=analysis_config.enkf_truncation, - ) + # Calculate step-lengths to scale SIES iteration + step_length = sies_step_length(sies_smoother.iteration) - # Keep track of iterations to calculate step-lengths - sies_smoother.iteration = 1 + # Propose a transition matrix using only active realizations + proposed_W = sies_smoother.propose_W_masked( + S, ensemble_mask=masking_of_initial_parameters, step_length=step_length + ) - # Calculate step-lengths to scale SIES iteration - step_length = sies_step_length(sies_smoother.iteration) + # Store transition matrix for later use on sies object + sies_smoother.W[:, masking_of_initial_parameters] = proposed_W - # Propose a transition matrix using only active realizations - proposed_W = sies_smoother.propose_W_masked( - S, ensemble_mask=masking_of_initial_parameters, step_length=step_length + for param_group in parameters: + source: Union[EnsembleReader, EnsembleAccessor] = target_fs + try: + target_fs.load_parameters(group=param_group, realizations=0)["values"] + except Exception: + source = source_fs + temp_storage = _create_temporary_parameter_storage( + source, iens_active_index, param_group + ) + X = temp_storage[param_group] + temp_storage[param_group] = X + X @ sies_smoother.W / np.sqrt( + len(iens_active_index) - 1 ) - # Store transition matrix for later use on sies object - sies_smoother.W[:, masking_of_initial_parameters] = proposed_W - - for param_group in update_step.parameters: - source: Union[EnsembleReader, EnsembleAccessor] = target_fs - try: - target_fs.load_parameters(group=param_group.name, realizations=0)[ - "values" - ] - except Exception: - source = source_fs - temp_storage = _create_temporary_parameter_storage( - source, iens_active_index, param_group.name - ) - if active_parameter_indices := param_group.index_list: - X = temp_storage[param_group.name][active_parameter_indices, :] - temp_storage[param_group.name][ - active_parameter_indices, : - ] = X + X @ sies_smoother.W / np.sqrt(len(iens_active_index) - 1) - else: - X = temp_storage[param_group.name] - temp_storage[param_group.name] = X + X @ sies_smoother.W / np.sqrt( - len(iens_active_index) - 1 - ) + progress_callback(AnalysisStatusEvent(msg=f"Storing data for {param_group}..")) + _save_temp_storage_to_disk(target_fs, temp_storage, iens_active_index) - progress_callback( - AnalysisStatusEvent(msg=f"Storing data for {param_group.name}..") - ) - _save_temp_storage_to_disk(target_fs, temp_storage, iens_active_index) - - _copy_unupdated_parameters( - list(source_fs.experiment.parameter_configuration.keys()), - updated_parameter_groups, - iens_active_index, - source_fs, - target_fs, - ) + _copy_unupdated_parameters( + list(source_fs.experiment.parameter_configuration.keys()), + parameters, + iens_active_index, + source_fs, + target_fs, + ) assert sies_smoother is not None, "sies_smoother should be initialized" @@ -895,40 +733,39 @@ def analysis_IES( def _write_update_report(path: Path, snapshot: SmootherSnapshot, run_id: str) -> None: fname = path / f"{run_id}.txt" fname.parent.mkdir(parents=True, exist_ok=True) - for update_step_name, update_step in snapshot.update_step_snapshots.items(): - with open(fname, "a", encoding="utf-8") as fout: - fout.write("=" * 150 + "\n") - timestamp = datetime.now().strftime("%Y.%m.%d %H:%M:%S") - fout.write(f"Time: {timestamp}\n") - fout.write(f"Parent ensemble: {snapshot.source_case}\n") - fout.write(f"Target ensemble: {snapshot.target_case}\n") - fout.write(f"Alpha: {snapshot.alpha}\n") - fout.write(f"Global scaling: {snapshot.global_scaling}\n") - fout.write(f"Standard cutoff: {snapshot.std_cutoff}\n") - fout.write(f"Run id: {run_id}\n") - fout.write(f"Update step: {update_step_name:<10}\n") - fout.write("-" * 150 + "\n") + update_step = snapshot.update_step_snapshots + with open(fname, "w", encoding="utf-8") as fout: + fout.write("=" * 150 + "\n") + timestamp = datetime.now().strftime("%Y.%m.%d %H:%M:%S") + fout.write(f"Time: {timestamp}\n") + fout.write(f"Parent ensemble: {snapshot.source_case}\n") + fout.write(f"Target ensemble: {snapshot.target_case}\n") + fout.write(f"Alpha: {snapshot.alpha}\n") + fout.write(f"Global scaling: {snapshot.global_scaling}\n") + fout.write(f"Standard cutoff: {snapshot.std_cutoff}\n") + fout.write(f"Run id: {run_id}\n") + fout.write("-" * 150 + "\n") + fout.write( + "Observed history".rjust(56) + + "|".rjust(17) + + "Simulated data".rjust(32) + + "|".rjust(13) + + "Status".rjust(12) + + "\n" + ) + fout.write("-" * 150 + "\n") + for nr, step in enumerate(update_step): + obs_std = ( + f"{step.obs_std:.3f}" + if step.obs_scaling == 1 + else f"{step.obs_std * step.obs_scaling:.3f} ({step.obs_std:<.3f} * {step.obs_scaling:.3f})" + ) fout.write( - "Observed history".rjust(56) - + "|".rjust(17) - + "Simulated data".rjust(32) - + "|".rjust(13) - + "Status".rjust(12) - + "\n" + f"{nr+1:^6}: {step.obs_name:20} {step.obs_val:>16.3f} +/- " + f"{obs_std:<21} | {step.response_mean:>21.3f} +/- " + f"{step.response_std:<16.3f} {'|':<6} " + f"{step.status.capitalize()}\n" ) - fout.write("-" * 150 + "\n") - for nr, step in enumerate(update_step): - obs_std = ( - f"{step.obs_std:.3f}" - if step.obs_scaling == 1 - else f"{step.obs_std * step.obs_scaling:.3f} ({step.obs_std:<.3f} * {step.obs_scaling:.3f})" - ) - fout.write( - f"{nr+1:^6}: {step.obs_name:20} {step.obs_val:>16.3f} +/- " - f"{obs_std:<21} | {step.response_mean:>21.3f} +/- " - f"{step.response_std:<16.3f} {'|':<6} " - f"{step.status.capitalize()}\n" - ) def _assert_has_enough_realizations( @@ -961,7 +798,8 @@ def smoother_update( prior_storage: EnsembleReader, posterior_storage: EnsembleAccessor, run_id: str, - updatestep: UpdateConfiguration, + observations: List[str], + parameters: List[str], analysis_config: Optional[UpdateSettings] = None, es_settings: Optional[ESSettings] = None, rng: Optional[np.random.Generator] = None, @@ -986,16 +824,17 @@ def smoother_update( ) analysis_ES( - updatestep, + parameters, + observations, rng, es_settings, analysis_config.alpha, analysis_config.std_cutoff, global_scaling, - smoother_snapshot, ens_mask, prior_storage, posterior_storage, + smoother_snapshot, progress_callback, analysis_config.misfit_preprocess, ) @@ -1015,7 +854,8 @@ def iterative_smoother_update( posterior_storage: EnsembleAccessor, sies_smoother: Optional[ies.SIES], run_id: str, - update_config: UpdateConfiguration, + parameters: List[str], + observations: List[str], update_settings: UpdateSettings, analysis_config: IESSettings, sies_step_length: Callable[[int], float], @@ -1030,11 +870,6 @@ def iterative_smoother_update( if rng is None: rng = np.random.default_rng() - if len(update_config) > 1: - raise ErtAnalysisError( - "Can not combine IES_ENKF modules with multi step updates" - ) - ens_mask = prior_storage.get_realization_mask_with_responses() _assert_has_enough_realizations(ens_mask, update_settings) @@ -1046,15 +881,16 @@ def iterative_smoother_update( ) sies_smoother = analysis_IES( - update_config=update_config, + parameters=parameters, + observations=observations, rng=rng, analysis_config=analysis_config, alpha=update_settings.alpha, std_cutoff=update_settings.std_cutoff, - smoother_snapshot=smoother_snapshot, ens_mask=ens_mask, source_fs=prior_storage, target_fs=posterior_storage, + smoother_snapshot=smoother_snapshot, sies_smoother=sies_smoother, progress_callback=progress_callback, misfit_preprocessor=update_settings.misfit_preprocess, diff --git a/src/ert/analysis/configuration.py b/src/ert/analysis/configuration.py deleted file mode 100644 index 470ac2276c4..00000000000 --- a/src/ert/analysis/configuration.py +++ /dev/null @@ -1,168 +0,0 @@ -from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Tuple, Union - -from pydantic import ( - BaseModel, - conlist, - field_validator, - model_validator, -) -from typing_extensions import Self - -from .update import Parameter, RowScalingParameter - - -class Observation(BaseModel): - name: str - index_list: List[int] = [] - - -if TYPE_CHECKING: - ConstrainedObservationList = List[Observation] -else: - ConstrainedObservationList = conlist(Observation, min_length=1) - - -class UpdateStep(BaseModel): - name: str - observations: ConstrainedObservationList - parameters: List[Parameter] = [] - row_scaling_parameters: List[RowScalingParameter] = [] - - @field_validator("parameters", mode="before") - def transform_parameters(cls, parameters: Any) -> List[Parameter]: - if parameters is None: - return [] - if not isinstance(parameters, (list, tuple)): - raise ValueError("value is not a valid list") - values = [] - for parameter in parameters: - if isinstance(parameter, str): - values.append(Parameter(parameter)) - elif len(parameter) == 1: - values.append(Parameter(parameter[0])) - elif len(parameter) == 2: - values.append(Parameter(parameter[0], parameter[1])) - return values - - @field_validator("row_scaling_parameters", mode="before") - def transform_row_scaling_parameters( - cls, parameters: Any - ) -> List[RowScalingParameter]: - if parameters is None: - return [] - if not isinstance(parameters, (list, tuple)): - raise ValueError("value is not a valid list") - values = [] - for parameter in parameters: - if len(parameter) == 2: - values.append(RowScalingParameter(parameter[0], parameter[1])) - elif len(parameter) == 3: - values.append( - RowScalingParameter(parameter[0], parameter[1], parameter[2]) - ) - else: - raise ValueError("Misconfigured row scaling parameters") - return values - - @model_validator(mode="after") - def check_parameters(self) -> Self: - """ - Because the user only has to specify parameters or row_scaling_parameters, we - check that they have at least configured one parameter - """ - if not self.parameters and not self.row_scaling_parameters: - raise ValueError("Must provide at least one parameter") - return self - - @field_validator("observations", mode="before") - def check_arguments( - cls, - observations: Union[ - str, - Dict[str, Union[str, List[int]]], - List[Union[str, List[int]]], - Tuple[Union[str, List[int]]], - ], - ) -> List[Dict[str, Union[str, List[int]]]]: - """Because most of the time the users will configure observations as only a name - we convert positional arguments to named arguments""" - values: List[Dict[str, Union[str, List[int]]]] = [] - for observation in observations: - if isinstance(observation, str): - values.append({"name": observation}) - elif isinstance(observation, dict): - values.append(observation) - else: - assert isinstance(observation, (tuple, list)) - if len(observation) == 1: - name = observation[0] - assert isinstance(name, str) - values.append({"name": name}) - elif len(observation) == 2: - name, index_list = observation - assert isinstance(name, str) - assert isinstance(index_list, list) - values.append({"name": name, "index_list": index_list}) - else: - raise ValueError( - f"Unexpected observation length {len(observation)}" - ) - return values - - def observation_config(self) -> List[Tuple[str, Optional[List[int]]]]: - return [ - (observation.name, observation.index_list) - for observation in self.observations - ] - - -if TYPE_CHECKING: - ConstrainedUpdateStepList = List[UpdateStep] -else: - ConstrainedUpdateStepList = conlist(UpdateStep, min_length=1) - - -class UpdateConfiguration(BaseModel): - update_steps: ConstrainedUpdateStepList - - def __iter__(self) -> Iterator[UpdateStep]: # type: ignore - yield from self.update_steps - - def __getitem__(self, item: int) -> UpdateStep: - return self.update_steps[item] - - def __len__(self) -> int: - return len(self.update_steps) - - def context_validate( - self, valid_observations: List[str], valid_parameters: List[str] - ) -> None: - errors = [] - for update_step in self.update_steps: - for observation in update_step.observations: - if observation.name not in valid_observations: - errors.append( - f"Observation: {observation} not in valid observations" - ) - for parameter in ( - update_step.parameters + update_step.row_scaling_parameters - ): - if parameter.name not in valid_parameters: - errors.append(f"Parameter: {parameter} not in valid parameters") - if errors: - raise ValueError( - f"Update configuration not valid, " - f"valid observations: {valid_observations}, " - f"valid parameters: {valid_parameters}, errors: {errors}" - ) - - @classmethod - def global_update_step(cls, observations: List[str], parameters: List[str]) -> Self: - global_update_step = [ - UpdateStep( - name="ALL_ACTIVE", - observations=observations, - parameters=parameters, - ) - ] - return cls(update_steps=global_update_step) diff --git a/src/ert/analysis/row_scaling.py b/src/ert/analysis/row_scaling.py deleted file mode 100644 index e1807f25cd2..00000000000 --- a/src/ert/analysis/row_scaling.py +++ /dev/null @@ -1,70 +0,0 @@ -from typing import no_type_check - -from ert._clib.local.row_scaling import RowScaling - - -@no_type_check -def assign(self, target_size, func): - """Assign tapering value for all elements. - - The assign() method is the main function used to assign a row scaling - value to be used as tapering in the update. The first argument is the - number of elements in the target parameter, and the second argument is - a callable which will be called with element index as argument. - - In the example below we will assume that tapering is for a field - variable and we will scale the update with the function exp( -r/r0 ) - where r is the distance from some point and r0 is length scale. - - >>> def sqr(x): - ... return x*x - - >>> def exp_decay(grid, pos, r0, data_index): - ... x,y,z = grid.get_xyz( active_index = data_index) - ... r = math.sqrt( sqr(pos[0] - x) + sqr(pos[1] - y) + sqr(pos[2] - z)) - ... return math.exp( -r/r0 ) - - - >>> ens_config = ert.ensembleConfig() - >>> config_node = ens_config["PORO"] - >>> field_config = config.node.getFieldModelConfig() - >>> grid = ert.ens_config.grid - >>> pos = grid.get_xyz(ijk=(10,10,1)) - >>> r0 = 10 - - >>> if grid.get_num_active() != field_config.get_data_size(): - ... raise ValuError( - ... "Fatal error - inconsistent field size for: {}".format( - ... config_node.observation_key - ... ) - ... ) - - >>> # Some local configuration boilerplate has been skipped here. - >>> update_configuration = main.update_configuration - >>> local_data = update_configuration.createDataset("LOCAL") - >>> row_scaling = local_data.row_scaling("PORO") - >>> row_scaling.assign( - ... field_config.get_data_size(), - ... functools.partial(exp_decay, grid, pos, r0) - ... ) - - - In the example below functools.partial() is used to create a callable - which has access to the necessary context, another alternative would be - to use a class instance which implements the __call__() method. - - It is an important point that the assign() method does not have any - context for error checking, i.e. if you call it with an incorrect value - for the size argument things will silently pass initially but might - blow up in the subsequent update step. - - """ - - for index in range(target_size): - self[index] = func(index) - - -RowScaling.assign = assign -del assign - -__all__ = ["RowScaling"] diff --git a/src/ert/analysis/update.py b/src/ert/analysis/update.py deleted file mode 100644 index 56a6e5ccc4a..00000000000 --- a/src/ert/analysis/update.py +++ /dev/null @@ -1,19 +0,0 @@ -from typing import List, Optional - -from pydantic import ConfigDict -from pydantic.dataclasses import dataclass - -from .row_scaling import RowScaling - - -@dataclass -class Parameter: - name: str - index_list: Optional[List[int]] = None - - -@dataclass(config=ConfigDict(arbitrary_types_allowed=True)) -class RowScalingParameter: - name: str - row_scaling: RowScaling - index_list: Optional[List[int]] = None diff --git a/src/ert/enkf_main.py b/src/ert/enkf_main.py index 4a6cbf03d98..e90b3330bf1 100644 --- a/src/ert/enkf_main.py +++ b/src/ert/enkf_main.py @@ -12,7 +12,6 @@ import numpy as np from numpy.random import SeedSequence -from .analysis.configuration import UpdateConfiguration, UpdateStep from .config import ParameterConfig from .job_queue import WorkflowRunner from .run_context import RunContext diff --git a/src/ert/shared/hook_implementations/workflows/disable_parameters.py b/src/ert/shared/hook_implementations/workflows/disable_parameters.py index bc1c014b704..f9f6cc507d2 100644 --- a/src/ert/shared/hook_implementations/workflows/disable_parameters.py +++ b/src/ert/shared/hook_implementations/workflows/disable_parameters.py @@ -1,5 +1,4 @@ from ert import ErtScript -from ert.analysis.configuration import UpdateStep class DisableParametersUpdate(ErtScript): diff --git a/tests/integration_tests/analysis/test_es_update.py b/tests/integration_tests/analysis/test_es_update.py index 6dee7b8cf8d..faa62088740 100644 --- a/tests/integration_tests/analysis/test_es_update.py +++ b/tests/integration_tests/analysis/test_es_update.py @@ -17,7 +17,6 @@ UpdateSettings, _create_temporary_parameter_storage, ) -from ert.analysis.configuration import UpdateStep from ert.cli import ENSEMBLE_SMOOTHER_MODE from ert.cli.main import run_cli from ert.config import AnalysisConfig, ErtConfig, GenDataConfig, GenKwConfig @@ -25,19 +24,6 @@ from ert.storage import open_storage -@pytest.fixture -def update_config(): - return UpdateConfiguration( - update_steps=[ - UpdateStep( - name="ALL_ACTIVE", - observations=["OBSERVATION"], - parameters=["PARAMETER"], - ) - ] - ) - - @pytest.fixture def uniform_parameter(): return GenKwConfig( diff --git a/tests/performance_tests/test_memory_usage.py b/tests/performance_tests/test_memory_usage.py index 9ebdc5a67f6..05b7cdf4ccb 100644 --- a/tests/performance_tests/test_memory_usage.py +++ b/tests/performance_tests/test_memory_usage.py @@ -59,10 +59,8 @@ def test_memory_smoothing(poly_template): prior_ens, posterior_ens, str(uuid.uuid4()), - UpdateConfiguration.global_update_step( - list(ert_config.observations.keys()), - list(ert_config.ensemble_config.parameters), - ), + list(ert_config.observations.keys()), + list(ert_config.ensemble_config.parameters), ) diff --git a/tests/unit_tests/analysis/snapshots/test_es_update/test_update_only_using_subset_observations/0/update_log b/tests/unit_tests/analysis/snapshots/test_es_update/test_update_only_using_subset_observations/0/update_log index c39a3f8e30e..dad79c1ba43 100644 --- a/tests/unit_tests/analysis/snapshots/test_es_update/test_update_only_using_subset_observations/0/update_log +++ b/tests/unit_tests/analysis/snapshots/test_es_update/test_update_only_using_subset_observations/0/update_log @@ -6,12 +6,10 @@ Alpha: 3.0 Global scaling: 1.0 Standard cutoff: 1e-06 Run id: id -Update step: DISABLED_OBSERVATIONS ------------------------------------------------------------------------------------------------------------------------------------------------------ Observed history | Simulated data | Status ------------------------------------------------------------------------------------------------------------------------------------------------------ - 1 : FOPR 0.008 +/- 0.100 | 0.079 +/- 0.107 | Active - 2 : WPR_DIFF_1 0.000 +/- 0.100 | -0.011 +/- 0.060 | Active - 3 : WPR_DIFF_1 0.100 +/- 0.200 | 0.081 +/- 0.126 | Active - 4 : WPR_DIFF_1 0.200 +/- 0.150 | 0.073 +/- 0.130 | Active - 5 : WPR_DIFF_1 0.000 +/- 0.050 | 0.127 +/- 0.125 | Active + 1 : WPR_DIFF_1 0.000 +/- 0.100 | -0.011 +/- 0.060 | Active + 2 : WPR_DIFF_1 0.100 +/- 0.200 | 0.081 +/- 0.126 | Active + 3 : WPR_DIFF_1 0.200 +/- 0.150 | 0.073 +/- 0.130 | Active + 4 : WPR_DIFF_1 0.000 +/- 0.050 | 0.127 +/- 0.125 | Active diff --git a/tests/unit_tests/analysis/snapshots/test_es_update/test_update_report/0-False/update_log b/tests/unit_tests/analysis/snapshots/test_es_update/test_update_report/0-False/update_log index ac823345200..cd303a12b7b 100644 --- a/tests/unit_tests/analysis/snapshots/test_es_update/test_update_report/0-False/update_log +++ b/tests/unit_tests/analysis/snapshots/test_es_update/test_update_report/0-False/update_log @@ -6,7 +6,6 @@ Alpha: 3.0 Global scaling: 1.0 Standard cutoff: 1e-06 Run id: id -Update step: ALL_ACTIVE ------------------------------------------------------------------------------------------------------------------------------------------------------ Observed history | Simulated data | Status ------------------------------------------------------------------------------------------------------------------------------------------------------ diff --git a/tests/unit_tests/analysis/snapshots/test_es_update/test_update_report/0-True/update_log b/tests/unit_tests/analysis/snapshots/test_es_update/test_update_report/0-True/update_log index ce88046d969..25d90aec601 100644 --- a/tests/unit_tests/analysis/snapshots/test_es_update/test_update_report/0-True/update_log +++ b/tests/unit_tests/analysis/snapshots/test_es_update/test_update_report/0-True/update_log @@ -6,7 +6,6 @@ Alpha: 3.0 Global scaling: 1.0 Standard cutoff: 1e-06 Run id: id -Update step: ALL_ACTIVE ------------------------------------------------------------------------------------------------------------------------------------------------------ Observed history | Simulated data | Status ------------------------------------------------------------------------------------------------------------------------------------------------------ diff --git a/tests/unit_tests/analysis/test_es_update.py b/tests/unit_tests/analysis/test_es_update.py index dbc4182e8b7..940185c0f4e 100644 --- a/tests/unit_tests/analysis/test_es_update.py +++ b/tests/unit_tests/analysis/test_es_update.py @@ -13,31 +13,13 @@ from ert.analysis import ( ErtAnalysisError, - UpdateConfiguration, iterative_smoother_update, smoother_update, ) from ert.analysis._es_update import UpdateSettings -from ert.analysis.configuration import UpdateStep -from ert.analysis.row_scaling import RowScaling from ert.config import Field, GenDataConfig, GenKwConfig from ert.config.analysis_module import ESSettings, IESSettings from ert.field_utils import Shape -from ert.storage import open_storage -from ert.storage.realization_storage_state import RealizationStorageState - - -@pytest.fixture -def update_config(): - return UpdateConfiguration( - update_steps=[ - UpdateStep( - name="ALL_ACTIVE", - observations=["OBSERVATION"], - parameters=["PARAMETER"], - ) - ] - ) @pytest.fixture @@ -96,10 +78,8 @@ def test_update_report( prior_ens, posterior_ens, "id", - UpdateConfiguration.global_update_step( - list(ert_config.observations.keys()), - ert_config.ensemble_config.parameters, - ), + list(ert_config.observations.keys()), + ert_config.ensemble_config.parameters, UpdateSettings(misfit_preprocess=misfit_preprocess), ESSettings(ies_inversion=1), log_path=Path("update_log"), @@ -124,7 +104,7 @@ def test_update_report( @pytest.mark.parametrize( - "module, expected_gen_kw, row_scaling", + "module, expected_gen_kw", [ ( "IES_ENKF", @@ -140,7 +120,6 @@ def test_update_report( -0.7171489000139469, 0.7287252249699406, ], - False, ), ( "STD_ENKF", @@ -156,23 +135,6 @@ def test_update_report( -1.2603717378211836, 1.2014197463741136, ], - False, - ), - ( - "STD_ENKF", - [ - 0.7194682979730067, - -0.5643616537018902, - -1.341635690332394, - -1.6888363123882548, - -0.9922000342169071, - 0.6511460884255119, - -2.5957226375270688, - 1.6899446147608206, - -0.8679310950640513, - 1.2136685857887182, - ], - True, ), ], ) @@ -181,7 +143,6 @@ def test_update_snapshot( snake_oil_storage, module, expected_gen_kw, - row_scaling, ): """ Note that this is now a snapshot test, so there is no guarantee that the @@ -189,24 +150,6 @@ def test_update_snapshot( """ ert_config = snake_oil_case_storage - # Making sure that row scaling with a row scaling factor of 1.0 - # results in the same update as with ES. - # Note: seed must be the same! - if row_scaling: - row_scaling = RowScaling() - row_scaling.assign(10, lambda x: 1.0) - update_step = UpdateStep( - name="Row scaling only", - observations=list(ert_config.observations.keys()), - row_scaling_parameters=[("SNAKE_OIL_PARAM", row_scaling)], - ) - update_configuration = UpdateConfiguration(update_steps=[update_step]) - else: - update_configuration = UpdateConfiguration.global_update_step( - list(ert_config.observations.keys()), - list(ert_config.ensemble_config.parameters), - ) - prior_ens = snake_oil_storage.get_ensemble_by_name("default_0") posterior_ens = snake_oil_storage.create_ensemble( prior_ens.experiment_id, @@ -235,7 +178,8 @@ def test_update_snapshot( posterior_storage=posterior_ens, sies_smoother=sies_smoother, run_id="id", - update_config=update_configuration, + parameters=list(ert_config.ensemble_config.parameters), + observations=list(ert_config.observations.keys()), update_settings=UpdateSettings(), analysis_config=IESSettings(ies_inversion=1), sies_step_length=sies_step_length, @@ -247,7 +191,8 @@ def test_update_snapshot( prior_ens, posterior_ens, "id", - update_configuration, + list(ert_config.observations.keys()), + list(ert_config.ensemble_config.parameters), UpdateSettings(), ESSettings(ies_inversion=1), rng=rng, @@ -268,125 +213,6 @@ def test_update_snapshot( assert target_gen_kw == pytest.approx(expected_gen_kw) -@pytest.mark.parametrize( - "expected_target_gen_kw, update_step", - [ - ( - [ - 0.5895781800838542, - -0.4369791317440733, - -1.370782409107295, - 0.7564469588868706, - 0.21572672272162152, - -0.24082711750101563, - -1.1445220433012324, - -1.03467093177391, - -0.17607955213742074, - 0.02826184434039854, - ], - [ - { - "name": "update_step_LOCA", - "observations": ["WOPR_OP1_72"], - "parameters": [("SNAKE_OIL_PARAM", [1, 2])], - } - ], - ), - ( - [ - -4.47905516481858, - -0.4369791317440733, - 1.1932696713609265, - 0.7564469588868706, - 0.21572672272162152, - -0.24082711750101563, - -1.1445220433012324, - -1.03467093177391, - -0.17607955213742074, - 0.02826184434039854, - ], - [ - { - "name": "update_step_LOCA1", - "observations": ["WOPR_OP1_72"], - "parameters": [("SNAKE_OIL_PARAM", [1, 2])], - }, - { - "name": "update_step_LOCA2", - "observations": ["WOPR_OP1_108"], - "parameters": [("SNAKE_OIL_PARAM", [0, 2])], - }, - ], - ), - ], -) -def test_localization( - snake_oil_case_storage, - snake_oil_storage, - expected_target_gen_kw, - update_step, -): - """ - Note that this is now a snapshot test, so there is no guarantee that the - snapshots are correct, they are just documenting the current behavior. - """ - ert_config = snake_oil_case_storage - - # Row scaling with a scaling factor of 0.0 should result in no update, - # which means that applying row scaling with a scaling factor of 0.0 - # should not change the snapshot. - row_scaling = RowScaling() - row_scaling.assign(10, lambda x: 0.0) - for us in update_step: - us["row_scaling_parameters"] = [("SNAKE_OIL_PARAM", row_scaling)] - - update_config = UpdateConfiguration(update_steps=update_step) - - prior_ens = snake_oil_storage.get_ensemble_by_name("default_0") - - posterior_ens = snake_oil_storage.create_ensemble( - prior_ens.experiment_id, - ensemble_size=ert_config.model_config.num_realizations, - iteration=1, - name="posterior", - prior_ensemble=prior_ens, - ) - smoother_update( - prior_ens, - posterior_ens, - "id", - update_config, - UpdateSettings(), - ESSettings(ies_inversion=1), - rng=np.random.default_rng(42), - log_path=Path("update_log"), - ) - - sim_gen_kw = list( - prior_ens.load_parameters("SNAKE_OIL_PARAM", 0)["values"].values.flatten() - ) - - target_gen_kw = list( - posterior_ens.load_parameters("SNAKE_OIL_PARAM", 0)["values"].values.flatten() - ) - - # Test that the localized values has been updated - assert sim_gen_kw[1:3] != target_gen_kw[1:3] - - # test that all the other values are left unchanged - assert sim_gen_kw[3:] == target_gen_kw[3:] - - assert target_gen_kw == pytest.approx(expected_target_gen_kw) - - # This is a regression test making sure that the update log - # contains all observations used in the update. - log_file = Path(ert_config.analysis_config.log_path) / "id.txt" - update_log = log_file.read_text("utf-8") - observations = [us["observations"][0] for us in update_step] - for obs in observations: - assert obs in update_log - - @pytest.mark.usefixtures("use_tmpdir") @pytest.mark.parametrize( "alpha, expected", @@ -403,7 +229,7 @@ def test_localization( ], ) def test_snapshot_alpha( - alpha, expected, storage, uniform_parameter, update_config, obs + alpha, expected, storage, uniform_parameter, obs ): """ Note that this is now a snapshot test, so there is no guarantee that the @@ -469,16 +295,15 @@ def test_snapshot_alpha( posterior_storage=posterior_storage, sies_smoother=sies_smoother, run_id="id", - update_config=update_config, + observations=experiment.observations.keys(), + parameters=experiment.parameter_configuration.keys(), update_settings=UpdateSettings(alpha=alpha), analysis_config=IESSettings(), sies_step_length=sies_step_length, initial_mask=initial_mask, ) assert result_snapshot.alpha == alpha - assert [ - obs.status for obs in result_snapshot.update_step_snapshots["ALL_ACTIVE"] - ] == expected + assert [obs.status for obs in result_snapshot.update_step_snapshots] == expected def test_and_benchmark_adaptive_localization_with_fields( @@ -551,15 +376,6 @@ def g(X): ) param_group = "PARAM_FIELD" - update_config = UpdateConfiguration( - update_steps=[ - UpdateStep( - name="ALL_ACTIVE", - observations=["OBSERVATION"], - parameters=[param_group], - ) - ] - ) config = Field.from_config_list( "MY_EGRID.EGRID", @@ -619,7 +435,8 @@ def g(X): prior, posterior_ens, "id", - update_config, + ["OBSERVATION"], + [param_group], UpdateSettings(), ESSettings(localization=True), ) @@ -643,18 +460,6 @@ def test_update_only_using_subset_observations( snapshots are correct, they are just documenting the current behavior. """ ert_config = snake_oil_case_storage - update_config = UpdateConfiguration( - update_steps=[ - { - "name": "DISABLED_OBSERVATIONS", - "observations": [ - {"name": "FOPR", "index_list": [1]}, - {"name": "WPR_DIFF_1"}, - ], - "parameters": ert_config.ensemble_config.parameters, - } - ] - ) prior_ens = snake_oil_storage.get_ensemble_by_name("default_0") posterior_ens = snake_oil_storage.create_ensemble( @@ -668,7 +473,8 @@ def test_update_only_using_subset_observations( prior_ens, posterior_ens, "id", - update_config, + ["WPR_DIFF_1"], + ert_config.ensemble_config.parameters, UpdateSettings(), ESSettings(), log_path=Path(ert_config.analysis_config.log_path), diff --git a/tests/unit_tests/analysis/test_row_scaling.py b/tests/unit_tests/analysis/test_row_scaling.py deleted file mode 100644 index fb39ff3b165..00000000000 --- a/tests/unit_tests/analysis/test_row_scaling.py +++ /dev/null @@ -1,136 +0,0 @@ -import random -from functools import partial - -import numpy as np -import pytest -from iterative_ensemble_smoother import ESMDA -from iterative_ensemble_smoother.experimental import ( - ensemble_smoother_update_step_row_scaling, -) - -from ert.analysis.row_scaling import RowScaling - - -def row_scaling_coloumb(nx, ny, data_index): - j = data_index / nx - i = data_index - j * nx - - dx = 0.5 + i - dy = 0.5 + j - - r2 = dx * dx + dy * dy - return 0.50 / r2 - - -def test_length_of_rowscaling_is_initially_zero(): - row_scaling = RowScaling() - assert len(row_scaling) == 0 - - -def test_row_scaling_automatically_grows_on_set(): - row_scaling = RowScaling() - row_scaling[9] = 0.25 - assert len(row_scaling) == 10 - assert row_scaling[0] == 0 - assert row_scaling[9] == 0.25 - - -def test_row_scaling_throws_index_error_on_get(): - with pytest.raises(IndexError): - _ = RowScaling()[10] - - -def test_assigning_to_index_in_row_scaling_clamps_the_value(): - row_scaling = RowScaling() - for index, _ in enumerate(row_scaling): - r = random.random() - row_scaling[index] = r - assert row_scaling[index] == row_scaling.clamp(r) - - -def test_assigning_with_function_applies_function_to_each_index(): - row_scaling = RowScaling() - row_scaling.assign(100, lambda _: 1) - assert len(row_scaling) == 100 - for i in range(100): - row_scaling[i] = 1 - - row_scaling.assign(100, lambda x: x / 100) - for i in range(100): - assert row_scaling[i] == row_scaling.clamp(i / 100) - - ny = 10 - nx = 10 - coloumb = partial(row_scaling_coloumb, nx, ny) - row_scaling.assign(nx * ny, coloumb) - for j in range(ny): - for i in range(nx): - g = j * nx + i - assert row_scaling[g] == row_scaling.clamp(row_scaling_coloumb(nx, ny, g)) - - -def test_assigning_a_non_vector_value_with_assign_vector_raises_value_error(): - row_scaling = RowScaling() - with pytest.raises(ValueError): - row_scaling.assign_vector(123.0) - - -def test_row_scaling_factor_0_for_all_parameters(): - row_scaling = RowScaling() - A = np.asfortranarray(np.array([[1.0, 2.0], [4.0, 5.0]])) - observations = np.array([5.0, 6.0]) - S = np.array([[1.0, 2.0], [3.0, 4.0]]) - - for i in range(A.shape[0]): - row_scaling[i] = 0.0 - - A_copy = A.copy() - ((A, row_scaling),) = ensemble_smoother_update_step_row_scaling( - Y=S, - X_with_row_scaling=[(A, row_scaling)], - covariance=np.full(observations.shape, 0.5), - observations=observations, - ) - assert np.allclose(A_copy, A) - - -def test_row_scaling_factor_1_for_either_parameter(): - row_scaling = RowScaling() - A = np.asfortranarray(np.array([[1.0, 2.0], [4.0, 5.0]])) - observations = np.array([5.0, 6.0]) - covariance = np.full(observations.shape, 0.5) - S = np.array([[1.0, 2.0], [3.0, 4.0]]) - - # Define row-scaling factors for the two parameters - row_scaling[0] = 0.0 - row_scaling[1] = 1.0 - - # Copy the prior, in case it gets overwritten - A_prior = A.copy() - A_no_row_scaling = A.copy() - - # Update with row-scaling - ((A, row_scaling),) = ensemble_smoother_update_step_row_scaling( - Y=S, - X_with_row_scaling=[(A, row_scaling)], - covariance=covariance, - observations=observations, - seed=42, - ) - - # Update without row-scaling, for comparisons - smoother = ESMDA( - covariance=covariance, - observations=observations, - alpha=np.array([1]), - seed=42, - ) - A_no_row_scaling = smoother.assimilate( - X=A_no_row_scaling, - Y=S, - ) - - # A[0] should not be updated because row_scaling[0] = 0.0 - # A[1] should get a normal update, because row_scaling[1] = 1.0 - assert np.allclose(A[0], A_prior[0]) - assert np.allclose(A[1], A_no_row_scaling[1]) diff --git a/tests/unit_tests/analysis/test_row_scaling_increased_belief_case.py b/tests/unit_tests/analysis/test_row_scaling_increased_belief_case.py deleted file mode 100644 index c4c0318948e..00000000000 --- a/tests/unit_tests/analysis/test_row_scaling_increased_belief_case.py +++ /dev/null @@ -1,148 +0,0 @@ -import numpy as np -import pytest -from iterative_ensemble_smoother.experimental import ( - ensemble_smoother_update_step_row_scaling, -) - -from ert.analysis.row_scaling import RowScaling - - -# We fix the random seed in the tests to ensure no flakiness -@pytest.fixture(autouse=True) -def fix_seed(seed=321): - np.random.seed(seed) - - -# The following tests follow the -# posterior properties described in -# https://ert.readthedocs.io/en/latest/theory/ensemble_based_methods.html -a_true = 1.0 -b_true = 5.0 -number_of_parameters = 2 -number_of_observations = 45 - - -class LinearModel: - def __init__(self, a, b): - self.a = a - self.b = b - self.size = 2 - - @classmethod - def random(cls): - a_std = 2.0 - b_std = 2.0 - # Priors with bias - a_bias = 0.5 * a_std - b_bias = -0.5 * b_std - - return cls( - np.random.normal(a_true + a_bias, a_std), - np.random.normal(b_true + b_bias, b_std), - ) - - def eval(self, x): - return self.a * x + self.b - - -def distance(point1, point2): - """euclidean distance - :param point1: n-dimensional point (array of floats). - :param point2: n-dimensional point (array of floats). - """ - point1 = np.asarray(point1) - point2 = np.asarray(point2) - return np.sqrt(np.sum(np.square(point1 - point2))) - - -@pytest.mark.parametrize("number_of_realizations", [100, 200]) -def test_that_update_for_a_linear_model_works_with_rowscaling(number_of_realizations): - true_model = LinearModel(a_true, b_true) - - ensemble = [LinearModel.random() for _ in range(number_of_realizations)] - - A = np.array( - [ - [realization.a for realization in ensemble], - [realization.b for realization in ensemble], - ] - ) - mean_prior = np.mean(A, axis=1) - - # We use time as the x-axis and observations are at - # t=0,1,2...number_of_observations - times = np.arange(number_of_observations) - - S = np.array([[realization.eval(t) for realization in ensemble] for t in times]) - - # When observations != true model, then ml estimates != true parameters. - # This gives both a more advanced and realistic test. Standard normal - # N(0,1) noise is added to obtain this. The randomness ensures we are not - # gaming the test. But the difference could in principle be any non-zero - # scalar. - observations = np.array( - [true_model.eval(t) + np.random.normal(0.0, 1.0) for t in times] - ) - - # Leading to fixed Maximum likelihood estimate. - # It will equal true values when observations are sampled without noise. - # It will also stay the same over beliefs. - mean_observations = np.mean(observations) - times_mean = np.mean(times) - times_square_sum = sum(np.square(times)) - a_maximum_likelihood = sum( - t * (observations[t] - mean_observations) for t in times - ) / (times_square_sum - times_mean * sum(times)) - b_maximum_likelihood = mean_observations - a_maximum_likelihood * times_mean - maximum_likelihood = np.array([a_maximum_likelihood, b_maximum_likelihood]) - - previous_mean_posterior = mean_prior - - # numerical precision tolerance - epsilon = 1e-2 - - # We iterate with an increased belief in the observations - for error in [10000.0, 100.0, 10.0, 1.0, 0.1]: - # An important point here is that we do not iteratively - # update A, but instead, observations stay the same and - # we increase our belief in the observations - # As A is update inplace, we have to reset it. - A = np.asfortranarray( - [ - [realization.a for realization in ensemble], - [realization.b for realization in ensemble], - ] - ) - row_scaling = RowScaling() - row_scaling[0] = 1.0 - row_scaling[1] = 0.7 - ((A_posterior, _),) = ensemble_smoother_update_step_row_scaling( - Y=S, - X_with_row_scaling=[(A, row_scaling)], - covariance=np.full(observations.shape, error), - observations=observations, - seed=42, - ) - - mean_posterior = np.mean(A_posterior, axis=1) - - # All posterior estimates lie between prior and maximum likelihood estimate - assert np.all( - distance(mean_posterior, maximum_likelihood) - - distance(mean_prior, maximum_likelihood) - < epsilon - ) - assert np.all( - distance(mean_prior, mean_posterior) - - distance(mean_prior, maximum_likelihood) - < epsilon - ) - - # Posterior parameter estimates improve with increased trust in observations - assert np.all( - distance(mean_posterior, maximum_likelihood) - - distance(previous_mean_posterior, maximum_likelihood) - < epsilon - ) - - previous_mean_posterior = mean_posterior diff --git a/tests/unit_tests/analysis/test_update_configuration.py b/tests/unit_tests/analysis/test_update_configuration.py deleted file mode 100644 index b4d2a675bee..00000000000 --- a/tests/unit_tests/analysis/test_update_configuration.py +++ /dev/null @@ -1,156 +0,0 @@ -import pytest -from pydantic import ValidationError - -from ert.analysis import UpdateConfiguration -from ert.analysis.row_scaling import RowScaling - - -def test_configuration(): - config = UpdateConfiguration( - update_steps=[ - { - "name": "update_step_name", - "observations": ["MY_OBS"], - "parameters": ["MY_PARAMETER"], - "row_scaling_parameters": [("MY_ROW_SCALING", RowScaling())], - } - ] - ) - - config.context_validate(["MY_OBS"], ["MY_PARAMETER", "MY_ROW_SCALING"]) - - -@pytest.mark.parametrize( - "config, expectation", - [ - [ - {"name": "not_relevant", "observations": ["not_relevant"]}, - pytest.raises(ValidationError, match="Must provide at least one parameter"), - ], - [ - {"name": "not_relevant", "parameters": ["not_relevant"]}, - pytest.raises(ValidationError, match="update_steps.0.observations"), - ], - [ - { - "name": "not_relevant", - "observations": ["not_relevant"], - "parameters": "relevant", - }, - pytest.raises(ValidationError, match="value is not a valid list"), - ], - [ - { - "name": "not_relevant", - "observations": ["relevant"], - "parameters": [], - }, - pytest.raises(ValidationError, match="Must provide at least one parameter"), - ], - [ - { - "name": "not_relevant", - "observations": [], - "parameters": ["not_relevant"], - }, - pytest.raises(ValidationError, match="List should have at least 1 item"), - ], - [ - { - "observations": ["not_relevant"], - "parameters": ["not_relevant"], - }, - pytest.raises(ValidationError, match="update_steps.0.name"), - ], - ], -) -def test_missing(config, expectation): - with expectation: - UpdateConfiguration(update_steps=[config]) - - -@pytest.mark.parametrize( - "config, expectation", - [ - [ - [ - { - "name": "not_relevant", - "observations": ["not_relevant"], - "parameters": "not_list", - }, - ], - pytest.raises( - ValidationError, match="1 validation error for UpdateConfiguration" - ), - ], - [ - [ - { - "name": "not_relevant", - "observations": ["not_relevant"], - "parameters": [], - }, - {"name": "not_relevant", "parameters": ["not_relevant"]}, - ], - pytest.raises( - ValidationError, match="2 validation errors for UpdateConfiguration" - ), - ], - [ - [ - {}, - {"name": "not_relevant", "observations": ["not_relevant"]}, - ], - pytest.raises( - ValidationError, match="3 validation errors for UpdateConfiguration" - ), - ], - ], -) -def test_missing_multiple(config, expectation): - with expectation: - UpdateConfiguration(update_steps=config) - - -@pytest.mark.parametrize( - "input_obs", - [ - ["OBS"], - [{"name": "OBS"}], - [{"name": "OBS", "index_list": [1, 2, 3]}], - [["OBS", [1, 2, 3]]], - [("OBS", [1, 2, 3])], - ["OBS_1", ["OBS", [1, 2, 3]]], - ], -) -def test_configuration_valid_obs_input(input_obs): - config = UpdateConfiguration( - update_steps=[ - { - "name": "not_relevant", - "observations": input_obs, - "parameters": ["not_relevant"], - } - ] - ) - config.context_validate(["OBS", "OBS_1"], ["not_relevant"]) - - -def test_user_setup(): - test_input = [ - { - "name": "update_step_NAME", - "observations": [ - "WOPR_OP1_72", - ("MY_INDEX_OBS", [1, 2, 3]), - ("JUST OBS"), - ], - "parameters": ["SNAKE_OIL_PARAM", ("INDEX_PARAMETER", [1, 2, 3])], - "row_scaling_parameters": [ - ("ROW_SCALE", RowScaling()), - ("ROW_SCALE", RowScaling(), [1, 2, 3]), - ], - } - ] - UpdateConfiguration(update_steps=test_input) diff --git a/tests/unit_tests/scenarios/test_summary_response.py b/tests/unit_tests/scenarios/test_summary_response.py index e2ee21cb5fd..3363562a38e 100644 --- a/tests/unit_tests/scenarios/test_summary_response.py +++ b/tests/unit_tests/scenarios/test_summary_response.py @@ -102,10 +102,8 @@ def test_that_reading_matching_time_is_ok(ert_config, storage, prior_ensemble): prior_ensemble, target_ensemble, "an id", - UpdateConfiguration.global_update_step( - list(ert_config.observations.keys()), - ert_config.ensemble_config.parameters, - ), + list(ert_config.observations.keys()), + ert_config.ensemble_config.parameters, ) @@ -132,10 +130,8 @@ def test_that_mismatched_responses_give_error(ert_config, storage, prior_ensembl prior_ensemble, target_ensemble, "an id", - UpdateConfiguration.global_update_step( - list(ert_config.observations.keys()), - ert_config.ensemble_config.parameters, - ), + list(ert_config.observations.keys()), + ert_config.ensemble_config.parameters, ) @@ -166,10 +162,8 @@ def test_that_different_length_is_ok_as_long_as_observation_time_exists( prior_ensemble, target_ensemble, "an id", - UpdateConfiguration.global_update_step( - list(ert_config.observations.keys()), - ert_config.ensemble_config.parameters, - ), + list(ert_config.observations.keys()), + ert_config.ensemble_config.parameters, ) @@ -215,10 +209,8 @@ def test_that_duplicate_summary_time_steps_does_not_fail( prior_ensemble, target_ensemble, "an id", - UpdateConfiguration.global_update_step( - list(ert_config.observations.keys()), - ert_config.ensemble_config.parameters, - ), + list(ert_config.observations.keys()), + ert_config.ensemble_config.parameters, )