Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implements separate storage of observable dictionaries #78

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions imagine/tests/test_templates.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,3 +234,12 @@ def test_pipeline_template():
assert (pipeline_copy.log_evidence,
pipeline_copy.log_evidence_err) == (42.0, 17.0)
assert pipeline_copy.posterior_summary['constant_B_By']['median']==0.5*muG

# Tests the separate loading and saving of observable dictionaries
assert np.all(pipeline.likelihood.measurement_dict[dset.key].data ==
pipeline_copy.likelihood.measurement_dict[dset.key].data )
# Also checks the construction of covariance matrix
assert np.all(pipeline.likelihood.covariance_dict[dset.key].data ==
pipeline_copy.likelihood.covariance_dict[dset.key].data )


76 changes: 57 additions & 19 deletions imagine/tools/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,19 @@
mpirank = comm.Get_rank()

# %% FUNCTION DEFINITIONS
def save_pipeline(pipeline, use_hickle=False):
def save_pipeline(pipeline, save_obs_data_separately=True):
"""
Saves the state of a Pipeline object

Parameters
----------
pipeline : imagine.pipelines.pipeline.Pipeline
The pipeline object one would like to save
use_hickle : bool
If `False` (default) the state is saved using the `cloudpickle` package.
Otherwise, experimental support to `hickle` is enabled.
save_obs_data_separately : bool
If `True` (default) observable dictionaries (the Measurements,
Covariances and Masks objects linked to the pipeline's Likelihood
object) are saved separately and compressed. Otherwise, they are
serialized together with the remainder of the Pipeline object.
"""
# Works on a (shallow) copy
pipeline = copy(pipeline)
Expand All @@ -42,18 +44,49 @@ def save_pipeline(pipeline, use_hickle=False):
pipeline._chains_directory = os.path.relpath(pipeline._chains_directory,
run_directory)

# Shallow copy of the likelihood object to allow later manipulation
pipeline.likelihood = copy(pipeline.likelihood)

# Adjusts observational data, if using distributed arrays
if rc['distributed_arrays']:
# Covariances need to be "undistributed"
# First, makes sure we are working on a copy
# (this is done as shallow as possible to save memory)
pipeline.likelihood = copy(pipeline.likelihood)
pipeline.likelihood.covariance_dict = deepcopy(pipeline.likelihood.covariance_dict)

# Gathers all distributed data -- i.e. turns global (distributed) into local
for k in pipeline.likelihood.covariance_dict.keys():
pipeline.likelihood.covariance_dict[k]._data = pipeline.likelihood.covariance_dict[k].global_data
# NB any process with mpirank!=0 will store None in the above operation
# Gathers distributed covariance data
# i.e. turns global (distributed) into local
for k in pipeline.likelihood.covariance_dict:
if pipeline.likelihood.covariance_dict[k].dtype == 'covariance':
pipeline.likelihood.covariance_dict[k]._data = pipeline.likelihood.covariance_dict[k].global_data
# NB any process with mpirank!=0 will store None in the above operation

# Stores measurements, covariances and masks separately
if save_obs_data_separately and mpirank==0:
for obs_dict_name in ('covariance', 'measurement','mask'):

obs_dict = getattr(pipeline.likelihood, obs_dict_name + '_dict')

if obs_dict is not None:
obs_dict = copy(obs_dict)
obs_dict._archive = copy(obs_dict._archive)

# Takes the actual data arrays out of the Observable_Dict
# for saving (hickle works better if supplied a python dict
# comprising only numpy arrays as values)
data_dict = {}
for k, obs in obs_dict._archive.items():
obs = copy(obs)
# Serializes the key (to avoid hickle hickups)
serial_key = cloudpickle.dumps(k)
data_dict[serial_key] = obs._data
obs._data = None
obs_dict._archive[k] = obs

# Finally, save the dictionary
hickle.dump(data_dict,
os.path.join(run_directory, obs_dict_name+'.hkl'),
mode='w', compression='gzip')

setattr(pipeline.likelihood, obs_dict_name + '_dict', obs_dict)

# Hammurabi-specific path adjustment
if hasattr(pipeline.simulator, 'hamx_path'):
Expand All @@ -65,11 +98,8 @@ def save_pipeline(pipeline, use_hickle=False):
pipeline.simulator._ham._exe_path = None

if mpirank == 0:
if not use_hickle:
with open(os.path.join(run_directory,'pipeline.pkl'), 'wb') as f:
cloudpickle.dump(pipeline, f)
else:
hickle.dump(pipeline, os.path.join(run_directory,'pipeline.hkl'))
with open(os.path.join(run_directory,'pipeline.pkl'), 'wb') as f:
cloudpickle.dump(pipeline, f)

return pipeline

Expand All @@ -92,6 +122,17 @@ def load_pipeline(directory_path='.'):
pipeline._run_directory = os.path.join(directory_path, pipeline._run_directory)
pipeline._chains_directory = os.path.join(directory_path, pipeline._chains_directory)

# Loads observable dictionaries, if available
for obs_dict_name in ('covariance', 'measurement','mask'):
filename = os.path.join(directory_path, obs_dict_name + '.hkl')
if os.path.isfile(filename):
data_dict = hickle.load(filename)
obs_dict = getattr(pipeline.likelihood, obs_dict_name +'_dict')
for skey, data in data_dict.items():
# Deserializes the key
key = cloudpickle.loads(eval(skey))
obs_dict[key]._data = data

# Adjust observational data, if using distributed arrays
if rc['distributed_arrays']:
# Distributes the covariance data
Expand All @@ -110,7 +151,4 @@ def load_pipeline(directory_path='.'):
if pipeline.simulator.hamx_path is None:
pipeline.xml_path = None

# Avoids synchronization problems after loading the pipeline when using MPI
comm.Barrier()

return pipeline
6 changes: 5 additions & 1 deletion tutorials/tutorial_one.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,11 @@
{
"cell_type": "code",
"execution_count": 21,
"metadata": {},
"metadata": {
"tags": [
"nbval-ignore-output"
]
},
"outputs": [
{
"data": {
Expand Down