From 3a1dde6e9c989943fcde2f9eb6164e8819201fc6 Mon Sep 17 00:00:00 2001 From: "Luiz Felippe S. Rodrigues" Date: Wed, 13 Jan 2021 21:55:57 +0100 Subject: [PATCH 1/2] Implements separate storage of observable dictionaries When saving the pipeline to disk, observable dictionaries are saved separately, using hickle and with compression enabled --- imagine/tests/test_templates.py | 9 ++++ imagine/tools/io.py | 74 +++++++++++++++++++++++++-------- tutorials/tutorial_one.ipynb | 6 ++- 3 files changed, 70 insertions(+), 19 deletions(-) diff --git a/imagine/tests/test_templates.py b/imagine/tests/test_templates.py index deb19395..27e9b758 100644 --- a/imagine/tests/test_templates.py +++ b/imagine/tests/test_templates.py @@ -226,3 +226,12 @@ def test_pipeline_template(): assert (pipeline_copy.log_evidence, pipeline_copy.log_evidence_err) == (42.0, 17.0) assert pipeline_copy.posterior_summary['constant_B_By']['median']==0.5*muG + + # Tests the separate loading and saving of observable dictionaries + assert np.all(pipeline.likelihood.measurement_dict[dset.key].data == + pipeline_copy.likelihood.measurement_dict[dset.key].data ) + # Also checks the construction of covariance matrix + assert np.all(pipeline.likelihood.covariance_dict[dset.key].data == + pipeline_copy.likelihood.covariance_dict[dset.key].data ) + + diff --git a/imagine/tools/io.py b/imagine/tools/io.py index a4a92a12..34918738 100644 --- a/imagine/tools/io.py +++ b/imagine/tools/io.py @@ -21,16 +21,16 @@ mpirank = comm.Get_rank() # %% FUNCTION DEFINITIONS -def save_pipeline(pipeline, use_hickle=False): +def save_pipeline(pipeline, save_obs_data_separately=False): """ Saves the state of a Pipeline object - + Parameters ---------- pipeline : imagine.pipelines.pipeline.Pipeline The pipeline object one would like to save use_hickle : bool - If `False` (default) the state is saved using the `cloudpickle` package. + If `False` (default) the state is saved using the `cloudpickle` package. Otherwise, experimental support to `hickle` is enabled. """ # Works on a (shallow) copy @@ -42,41 +42,68 @@ def save_pipeline(pipeline, use_hickle=False): pipeline._chains_directory = os.path.relpath(pipeline._chains_directory, run_directory) + # Shallow copy of the likelihood object to allow later manipulation + pipeline.likelihood = copy(pipeline.likelihood) + # Adjusts observational data, if using distributed arrays if rc['distributed_arrays']: # Covariances need to be "undistributed" - # First, makes sure we are working on a copy - # (this is done as shallow as possible to save memory) - pipeline.likelihood = copy(pipeline.likelihood) pipeline.likelihood.covariance_dict = deepcopy(pipeline.likelihood.covariance_dict) # Gathers all distributed data -- i.e. turns global (distributed) into local - for k in pipeline.likelihood.covariance_dict.keys(): + for k in pipeline.likelihood.covariance_dict: pipeline.likelihood.covariance_dict[k]._data = pipeline.likelihood.covariance_dict[k].global_data # NB any process with mpirank!=0 will store None in the above operation + # Stores measurements, covariances and masks separately + if save_obs_data_separately: + for obs_dict_name in ('covariance', 'measurement','mask'): + print('Saving', obs_dict_name) + obs_dict = getattr(pipeline.likelihood, obs_dict_name + '_dict') + + if obs_dict is not None: + obs_dict = copy(obs_dict) + obs_dict._archive = copy(obs_dict._archive) + + # Takes the actual data arrays out of the Observable_Dict + # for saving (hickle works better if supplied a python dict + # comprising only numpy arrays as values) + data_dict = {} + for k, obs in obs_dict._archive.items(): + obs = copy(obs) + # Serializes the key (to avoid hickle hickups) + serial_key = cloudpickle.dumps(k) + data_dict[serial_key] = obs._data + obs._data = None + obs_dict._archive[k] = obs + + # Finally, save the dictionary + hickle.dump(data_dict, + os.path.join(run_directory, obs_dict_name+'.hkl'), + mode='w', compression='gzip') + + setattr(pipeline.likelihood, obs_dict_name + '_dict', obs_dict) + # Hammurabi-specific path adjustment if hasattr(pipeline.simulator, 'hamx_path'): - # In the case hamx path is the system default, it will use the + # In the case hamx path is the system default, it will use the # system default the next time it is loaded. pipeline.simulator = copy(pipeline.simulator) if pipeline.simulator.hamx_path == rc['hammurabi_hamx_path']: pipeline.simulator._hamx_path = None pipeline.simulator._ham._exe_path = None - + if mpirank == 0: - if not use_hickle: - with open(os.path.join(run_directory,'pipeline.pkl'), 'wb') as f: - cloudpickle.dump(pipeline, f) - else: - hickle.dump(pipeline, os.path.join(run_directory,'pipeline.hkl')) + with open(os.path.join(run_directory,'pipeline.pkl'), 'wb') as f: + cloudpickle.dump(pipeline, f) + return pipeline def load_pipeline(directory_path='.'): """ Loads the state of a Pipeline object - + Parameters ---------- directory_path : str @@ -87,11 +114,22 @@ def load_pipeline(directory_path='.'): else: with open(os.path.join(directory_path,'pipeline.pkl'), 'rb') as f: pipeline = cloudpickle.load(f) - + # Adjusts paths (hidden variables are used to avoid checks) pipeline._run_directory = os.path.join(directory_path, pipeline._run_directory) pipeline._chains_directory = os.path.join(directory_path, pipeline._chains_directory) + # Loads observable dictionaries, if available + for obs_dict_name in ('covariance', 'measurement','mask'): + filename = os.path.join(directory_path, obs_dict_name + '.hkl') + if os.path.isfile(filename): + data_dict = hickle.load(filename) + obs_dict = getattr(pipeline.likelihood, obs_dict_name +'_dict') + for skey, data in data_dict.items(): + # Deserializes the key + key = cloudpickle.loads(eval(skey)) + obs_dict[key]._data = data + # Adjust observational data, if using distributed arrays if rc['distributed_arrays']: # Distributes the covariance data @@ -101,7 +139,7 @@ def load_pipeline(directory_path='.'): # Hammurabi-specific path adjustment if hasattr(pipeline.simulator, 'hamx_path'): - # In the case hamx path is the system default, it will use the + # In the case hamx path is the system default, it will use the # system default the next time it is loaded. if pipeline.simulator.hamx_path is None: pipeline.simulator.hamx_path = rc['hammurabi_hamx_path'] @@ -109,5 +147,5 @@ def load_pipeline(directory_path='.'): # using the xml_path property setter if pipeline.simulator.hamx_path is None: pipeline.xml_path = None - + return pipeline diff --git a/tutorials/tutorial_one.ipynb b/tutorials/tutorial_one.ipynb index e9ba0046..075e8935 100644 --- a/tutorials/tutorial_one.ipynb +++ b/tutorials/tutorial_one.ipynb @@ -702,7 +702,11 @@ { "cell_type": "code", "execution_count": 21, - "metadata": {}, + "metadata": { + "tags": [ + "nbval-ignore-output" + ] + }, "outputs": [ { "data": { From 9f32a71c7b31edd23a88448aa7020010b4524ed5 Mon Sep 17 00:00:00 2001 From: "Luiz Felippe S. Rodrigues" Date: Thu, 14 Jan 2021 07:49:11 +0100 Subject: [PATCH 2/2] Adjustments to save_pipeline function * Saves measurements separately by default * Corrects behaviour when dealing with distributed data: only gathers covariances * Adjustments to docstring --- imagine/tools/io.py | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/imagine/tools/io.py b/imagine/tools/io.py index 34918738..8324072f 100644 --- a/imagine/tools/io.py +++ b/imagine/tools/io.py @@ -21,7 +21,7 @@ mpirank = comm.Get_rank() # %% FUNCTION DEFINITIONS -def save_pipeline(pipeline, save_obs_data_separately=False): +def save_pipeline(pipeline, save_obs_data_separately=True): """ Saves the state of a Pipeline object @@ -29,9 +29,11 @@ def save_pipeline(pipeline, save_obs_data_separately=False): ---------- pipeline : imagine.pipelines.pipeline.Pipeline The pipeline object one would like to save - use_hickle : bool - If `False` (default) the state is saved using the `cloudpickle` package. - Otherwise, experimental support to `hickle` is enabled. + save_obs_data_separately : bool + If `True` (default) observable dictionaries (the Measurements, + Covariances and Masks objects linked to the pipeline's Likelihood + object) are saved separately and compressed. Otherwise, they are + serialized together with the remainder of the Pipeline object. """ # Works on a (shallow) copy pipeline = copy(pipeline) @@ -50,15 +52,17 @@ def save_pipeline(pipeline, save_obs_data_separately=False): # Covariances need to be "undistributed" pipeline.likelihood.covariance_dict = deepcopy(pipeline.likelihood.covariance_dict) - # Gathers all distributed data -- i.e. turns global (distributed) into local + # Gathers distributed covariance data + # i.e. turns global (distributed) into local for k in pipeline.likelihood.covariance_dict: - pipeline.likelihood.covariance_dict[k]._data = pipeline.likelihood.covariance_dict[k].global_data - # NB any process with mpirank!=0 will store None in the above operation + if pipeline.likelihood.covariance_dict[k].dtype == 'covariance': + pipeline.likelihood.covariance_dict[k]._data = pipeline.likelihood.covariance_dict[k].global_data + # NB any process with mpirank!=0 will store None in the above operation # Stores measurements, covariances and masks separately - if save_obs_data_separately: + if save_obs_data_separately and mpirank==0: for obs_dict_name in ('covariance', 'measurement','mask'): - print('Saving', obs_dict_name) + obs_dict = getattr(pipeline.likelihood, obs_dict_name + '_dict') if obs_dict is not None: @@ -97,7 +101,6 @@ def save_pipeline(pipeline, save_obs_data_separately=False): with open(os.path.join(run_directory,'pipeline.pkl'), 'wb') as f: cloudpickle.dump(pipeline, f) - return pipeline def load_pipeline(directory_path='.'):