diff --git a/ci/requirements/environment.yml b/ci/requirements/environment.yml index 85605aa..66d906d 100644 --- a/ci/requirements/environment.yml +++ b/ci/requirements/environment.yml @@ -20,6 +20,7 @@ dependencies: - python-cdo - cmor - pint-xarray + - flox # for testing - pytest - pytest-cov diff --git a/cordex/cmor/__init__.py b/cordex/cmor/__init__.py index c7c5048..1397142 100644 --- a/cordex/cmor/__init__.py +++ b/cordex/cmor/__init__.py @@ -1,5 +1,5 @@ from .cmor import cmorize_variable, prepare_variable -from .config import set_options +from .config import options, set_options from .utils import ( mid_of_month, mid_of_season, @@ -30,4 +30,5 @@ "season_bounds", "to_cftime", "set_options", + "options", ] diff --git a/cordex/cmor/cmor.py b/cordex/cmor/cmor.py index 4d6c8b1..7a2d00c 100644 --- a/cordex/cmor/cmor.py +++ b/cordex/cmor/cmor.py @@ -1,5 +1,5 @@ -import json import os +from os import path as op from warnings import warn import cf_xarray as cfxr @@ -31,7 +31,9 @@ _get_cfvarinfo, _get_cordex_pole, _get_pole, + _read_table, _strip_time_cell_method, + _tmp_table, mid_of_month, month_bounds, time_bounds_name, @@ -39,6 +41,8 @@ xr.set_options(keep_attrs=True) +flox_method = "blockwise" + def resample_both_closed(ds, hfreq, op, **kwargs): rolling = getattr(ds.rolling(time=hfreq + 1, center=True), op)() @@ -67,17 +71,18 @@ def _resample( ds, time, time_cell_method="point", label="left", time_offset=True, **kwargs ): """Resample a variable.""" - # freq = "{}H".format(hfreq) if time_cell_method == "point": - return ds.resample( - time=time, label=label, **kwargs - ).nearest() # .interpolate("nearest") # use as_freq? + return ds.resample(time=time, label=label, **kwargs).nearest( + method=flox_method + ) # .interpolate("nearest") # use as_freq? elif time_cell_method == "mean": if time_offset is True: loffset = _get_loffset(time) else: loffset = None - return ds.resample(time=time, label=label, loffset=loffset, **kwargs).mean() + return ds.resample(time=time, label=label, loffset=loffset, **kwargs).mean( + method=flox_method + ) else: raise Exception("unknown time_cell_method: {}".format(time_cell_method)) @@ -130,7 +135,7 @@ def _get_time_axis_name(time_cell_method): def _define_axes(ds, table_id): if "CORDEX_domain" in ds.attrs: - grid = cordex_domain(ds.attrs["CORDEX_domain"], add_vertices=True) + grid = cordex_domain(ds.attrs["CORDEX_domain"], bounds=True) lon_vertices = grid.lon_vertices.to_numpy() lat_vertices = grid.lat_vertices.to_numpy() else: @@ -222,7 +227,11 @@ def _cmor_write(da, table_id, cmorTime, cmorGrid, file_name=True): else: coords = [cmorTime, cmorGrid] cmor_var = cmor.variable(da.name, da.units, coords) - cmor.write(cmor_var, da.values) + if "time" in da.coords: + ntimes_passed = da.time.size + else: + ntimes_passed = None + cmor.write(cmor_var, da.to_numpy(), ntimes_passed=ntimes_passed) return cmor.close(cmor_var, file_name=file_name) @@ -237,15 +246,14 @@ def _units_convert(da, cf_units, format=None): return da -def _cf_units_convert(da, table_file, mapping_table={}): +def _cf_units_convert(da, table, mapping_table={}): """Convert units. Convert units according to the rules in units_convert_rules dict. Maybe metpy can do this also: https://unidata.github.io/MetPy/latest/tutorials/unit_tutorial.html """ - with open(table_file) as f: - table = json.load(f) + if da.name in mapping_table: map_units = mapping_table[da.name].get("units") atr_units = da.attrs.get("units") @@ -365,10 +373,13 @@ def _add_time_bounds(ds, cf_freq): ds = _add_month_bounds(ds) else: try: - ds = ds.convert_calendar(ds.time.dt.calendar).cf.add_bounds("time") + ds = ds.convert_calendar( + ds.time.dt.calendar, use_cftime=False + ).cf.add_bounds("time") except Exception: # wait for cftime arithemtics in xarry here: warn("could not add time bounds.") + ds[time_bounds_name].encoding = ds.time.encoding ds.time.attrs.update({"bounds": time_bounds_name}) return ds @@ -383,7 +394,9 @@ def _adjust_frequency(ds, cf_freq, input_freq=None, time_cell_method=None): pd_freq = freq_map[cf_freq] if pd_freq != input_freq: warn("resampling input data from {} to {}".format(input_freq, pd_freq)) - resample = _resample(ds, pd_freq, time_cell_method=time_cell_method) + resample = _resample( + ds, pd_freq, time_cell_method=time_cell_method, **options["resample_kwargs"] + ) return resample return ds @@ -395,6 +408,14 @@ def cmorize_cmor( if inpath is None: inpath = os.path.dirname(cmor_table) + dataset_table_json = dataset_table + cmor_table_json = cmor_table + + if isinstance(dataset_table, dict): + dataset_table_json = _tmp_table(dataset_table) + if isinstance(cmor_table, dict): + cmor_table_json = _tmp_table(cmor_table) + cfvarinfo = _get_cfvarinfo(out_name, cmor_table) if cfvarinfo is None: @@ -403,7 +424,7 @@ def cmorize_cmor( time_cell_method = _strip_time_cell_method(cfvarinfo) table_ids = _setup( - dataset_table, cmor_table, grids_table=grids_table, inpath=inpath + dataset_table_json, cmor_table_json, grids_table=grids_table, inpath=inpath ) cmorTime, cmorGrid = _define_grid(ds, table_ids, time_cell_method=time_cell_method) @@ -422,17 +443,29 @@ def prepare_variable( input_freq=None, CORDEX_domain=None, time_units=None, - time_cell_method=None, - cf_freq=None, rewrite_time_axis=False, + use_cftime=False, squeeze=True, ): """prepares a variable for cmorization.""" + + ds = ds.copy(deep=False) + + if isinstance(cmor_table, str): + cmor_table = _read_table(cmor_table) + cfvarinfo = _get_cfvarinfo(out_name, cmor_table) + + cf_freq = cfvarinfo["frequency"] + time_cell_method = _strip_time_cell_method(cfvarinfo) + if isinstance(ds, xr.DataArray): ds = ds.to_dataset() + # ensure that we propagate everything + # ds = xr.decode_cf(ds, decode_coords="all") + # no mapping table provided, we assume datasets has already correct out_names and units. - if mapping_table is None: + if out_name not in mapping_table: try: var_ds = ds[[out_name]] except Exception: @@ -448,11 +481,15 @@ def prepare_variable( if squeeze is True: var_ds = var_ds.squeeze(drop=True) if CORDEX_domain is not None: + var_ds.attrs["CORDEX_domain"] = CORDEX_domain var_ds = _crop_to_cordex_domain(var_ds, CORDEX_domain) if replace_coords is True: - grid = cordex_domain(CORDEX_domain) + grid = cordex_domain(CORDEX_domain, bounds=True) var_ds = var_ds.assign_coords(rlon=grid.rlon, rlat=grid.rlat) var_ds = var_ds.assign_coords(lon=grid.lon, lat=grid.lat) + var_ds = var_ds.assign_coords( + lon_vertices=grid.lon_vertices, lat_vertices=grid.lat_vertices + ) if "time" in var_ds: # ensure cftime @@ -464,6 +501,8 @@ def prepare_variable( if "time" not in ds.cf.bounds and time_cell_method == "mean": warn("adding time bounds") var_ds = _add_time_bounds(var_ds, cf_freq) + if use_cftime is False: + var_ds = var_ds.convert_calendar(ds.time.dt.calendar, use_cftime=False) var_ds = _set_time_encoding(var_ds, time_units, ds) if allow_units_convert is True: @@ -477,7 +516,10 @@ def prepare_variable( warn("adding pole from archive specs: {}".format(CORDEX_domain)) mapping = _get_cordex_pole(CORDEX_domain) - var_ds = xr.merge([var_ds, mapping]) + if "time" in mapping.coords: + raise Exception("grid_mapping variable should have no time coordinate!") + + var_ds[mapping.name] = mapping return var_ds @@ -509,10 +551,10 @@ def cmorize_variable( out_name: str CF out_name of the variable that should be cmorized. The corresponding variable name in the dataset is looked up from the mapping_table if provided. - cmor_table : str - Filepath to cmor table. - dataset_table: str - Filepath to dataset cmor table. + cmor_table : str or dict + Cmor table dict of filepath to cmor table (json). + dataset_table: str or dict + Dataset table dict of filepath to dataset cmor table (json). mapping_table: dict Mapping of input variable names and meta data to CF out_name. Required if the variable name in the input dataset is not equal to out_name. @@ -548,7 +590,7 @@ def cmorize_variable( Rewrite the time axis to CF compliant timestamps. outpath: str Root directory for output (can be either a relative or full path). This will override - the outpath defined in the dataset cmor input table. + the outpath defined in the dataset cmor input table (``dataset_table``). **kwargs: Argumets passed to prepare_variable. @@ -573,14 +615,11 @@ def cmorize_variable( if inpath is None: inpath = os.path.dirname(cmor_table) - # get meta info from cmor table - cfvarinfo = _get_cfvarinfo(out_name, cmor_table) - - if cfvarinfo is None: - raise Exception("{} not found in {}".format(out_name, cmor_table)) + if op.isfile(dataset_table): + dataset_table = _read_table(dataset_table) - cf_freq = cfvarinfo["frequency"] - time_cell_method = _strip_time_cell_method(cfvarinfo) + if outpath: + dataset_table["outpath"] = outpath ds_prep = prepare_variable( ds, @@ -590,8 +629,6 @@ def cmorize_variable( mapping_table=mapping_table, replace_coords=replace_coords, input_freq=input_freq, - cf_freq=cf_freq, - time_cell_method=time_cell_method, rewrite_time_axis=rewrite_time_axis, time_units=time_units, allow_resample=allow_resample, diff --git a/cordex/cmor/config.py b/cordex/cmor/config.py index 062d644..1bbe469 100644 --- a/cordex/cmor/config.py +++ b/cordex/cmor/config.py @@ -2,7 +2,11 @@ import numpy as np -options = {"table_prefix": "CORDEX-CMIP6", "exit_control": "CMOR_NORMAL"} +options = { + "table_prefix": "CORDEX-CMIP6", + "exit_control": "CMOR_NORMAL", + "resample_kwargs": {"closed": "left"}, +} # time offsets relative to left labeling for resampling. diff --git a/cordex/cmor/utils.py b/cordex/cmor/utils.py index 699df3b..064840e 100644 --- a/cordex/cmor/utils.py +++ b/cordex/cmor/utils.py @@ -2,6 +2,7 @@ """ import datetime as dt import json +import tempfile from warnings import warn import cftime as cfdt @@ -290,7 +291,7 @@ def _encode_time(time): return xr.conventions.encode_cf_variable(time) -def _read_cmor_table(table): +def _read_table(table): return _read_json_file(table) @@ -300,9 +301,41 @@ def _read_json_file(filename): return data -def _get_cfvarinfo(cf_varname, table): - data = _read_cmor_table(table) - return data["variable_entry"].get(cf_varname, None) +def _write_json_file(filename, data): + with open(filename, "w") as fp: + json.dump(data, fp, indent=4) + return filename + + +def _get_cfvarinfo(out_name, table): + """Returns variable entry from cmor table""" + if isinstance(table, str): + table = _read_table(table) + info = table["variable_entry"].get(out_name, None) + if info is None: + raise Exception( + "{} not found in table {}".format(out_name, get_table_id(table)) + ) + return info + + +def get_table_id(table): + """parse the table_id from a cmor table header""" + separator = " " + table_id = table["Header"].get("table_id", None) + if table_id is None: + raise Exception("no table_id in Header") + if separator in table_id: + return table_id.split(separator)[1] + return table_id + + +def _tmp_table(table, format="json"): + """creates a temporay table json file""" + _, filename = tempfile.mkstemp() + warn(f"writing temporary table to {filename}") + if format == "json": + return _write_json_file(filename, table) def _get_time_cell_method(cf_varname, table): diff --git a/docs/whats_new.rst b/docs/whats_new.rst index 4eff6bc..6866b8f 100644 --- a/docs/whats_new.rst +++ b/docs/whats_new.rst @@ -8,6 +8,15 @@ What's New import cordex +v0.6.0 (Unreleased) +------------------- + +Internal Changes +~~~~~~~~~~~~~~~~ + +- CMOR updates, including fixing of time step warnings and resampling options, includes options for using `flox `_ in resampling operations (:pull:`116`). + + v0.5.1 (2 March 2023) --------------------- diff --git a/tests/__init__.py b/tests/__init__.py index aa52adb..4bc6b04 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -20,3 +20,4 @@ def _importorskip(modname): has_regionmask, requires_regionmask = _importorskip("regionmask") has_xesmf, requires_xesmf = _importorskip("xesmf") has_geopandas, requires_geopandas = _importorskip("geopandas") +has_pint_xarray, requires_pint_xarray = _importorskip("pint_xarray") diff --git a/tests/test_cmor.py b/tests/test_cmor.py index 82ed2b0..c022cc5 100644 --- a/tests/test_cmor.py +++ b/tests/test_cmor.py @@ -1,4 +1,5 @@ import datetime as dt +import os import cftime import cftime as cfdt @@ -9,6 +10,11 @@ import cordex as cx from cordex import cmor from cordex.cmor import utils +from cordex.tables import cordex_cmor_table + +from . import requires_pint_xarray + +mapping_table = {"orog": {"varname": "topo"}, "tas": {"varname": "TEMP2"}} def test_cfdt(): @@ -160,60 +166,74 @@ def test_month_bounds(): assert np.array_equal(mid, expect) -def test_cmorizer_fx(): - ds = cx.cordex_domain("EUR-11", dummy="topo") - filename = cmor.cmorize_variable( +def run_cmorizer(ds, out_name, domain_id, table_id, dataset_table=None, **kwargs): + if dataset_table is None: + dataset_table = cordex_cmor_table("CORDEX_remo_example") + return cmor.cmorize_variable( ds, - "orog", - mapping_table={"orog": {"varname": "topo"}}, - cmor_table=cx.tables.cordex_cmor_table("CORDEX_fx"), - dataset_table=cx.tables.cordex_cmor_table("CORDEX_remo_example"), - grids_table=cx.tables.cordex_cmor_table("CORDEX_grids"), - CORDEX_domain="EUR-11", - time_units=None, + out_name, + mapping_table=mapping_table, + cmor_table=cordex_cmor_table(f"CORDEX_{table_id}"), + dataset_table=dataset_table, + grids_table=cordex_cmor_table("CORDEX_grids"), + CORDEX_domain=domain_id, + replace_coords=True, allow_units_convert=True, + allow_resample=True, + **kwargs, ) + + +def test_cmorizer_fx(): + ds = cx.cordex_domain("EUR-11", dummy="topo") + filename = run_cmorizer(ds, "orog", "EUR-11", "fx") output = xr.open_dataset(filename) assert "orog" in output def test_cmorizer_mon(): ds = cx.tutorial.open_dataset("remo_EUR-11_TEMP2_mon") - eur11 = cx.cordex_domain("EUR-11") - ds = ds.assign_coords({"lon": eur11.lon, "lat": eur11.lat}) - filename = cmor.cmorize_variable( - ds, - "tas", - mapping_table={"tas": {"varname": "TEMP2"}}, - cmor_table=cx.tables.cordex_cmor_table("CORDEX_mon"), - dataset_table=cx.tables.cordex_cmor_table("CORDEX_remo_example"), - grids_table=cx.tables.cordex_cmor_table("CORDEX_grids"), - CORDEX_domain="EUR-11", - time_units=None, - allow_units_convert=True, - ) + filename = run_cmorizer(ds, "tas", "EUR-11", "mon") output = xr.open_dataset(filename) assert output.dims["time"] == 12 assert "tas" in output -@pytest.mark.parametrize("table, tdim", [("CORDEX_day", 3), ("CORDEX_1hr", 49)]) -def test_cmorizer_subdaily(table, tdim): +@pytest.mark.parametrize("table_id, tdim", [("day", 3), ("1hr", 49)]) +def test_cmorizer_subdaily(table_id, tdim): ds = cx.tutorial.open_dataset("remo_EUR-11_TEMP2_1hr") eur11 = cx.cordex_domain("EUR-11") ds = ds.assign_coords({"lon": eur11.lon, "lat": eur11.lat}) - filename = cmor.cmorize_variable( - ds, - "tas", - mapping_table={"tas": {"varname": "TEMP2"}}, - cmor_table=cx.tables.cordex_cmor_table(table), - dataset_table=cx.tables.cordex_cmor_table("CORDEX_remo_example"), - grids_table=cx.tables.cordex_cmor_table("CORDEX_grids"), - CORDEX_domain="EUR-11", - time_units=None, - allow_units_convert=True, - allow_resample=True, - ) + filename = run_cmorizer(ds, "tas", "EUR-11", table_id) output = xr.open_dataset(filename) assert "tas" in output assert output.dims["time"] == tdim + + +@requires_pint_xarray +def test_units_convert(): + import pint_xarray # noqa + from cf_xarray.units import units # noqa + + ds = cx.tutorial.open_dataset("remo_EUR-11_TEMP2_mon") + ds["TEMP2"] = ds.TEMP2.pint.quantify().pint.to("degC").pint.dequantify(format="cf") + filename = run_cmorizer(ds, "tas", "EUR-11", "mon") + output = xr.open_dataset(filename) + assert output.tas.units == "K" + + +@pytest.mark.parametrize("table_id", ["1hr", "6hr", "day", "mon"]) +def test_table_id(table_id): + table = f"CORDEX_{table_id}" + filename = cordex_cmor_table(table) + tid = utils.get_table_id(utils._read_table(filename)) + assert tid == table_id + + +def test_table_manipulation(): + home = os.environ["HOME"] + print(f"writing to {home}") + ds = cx.cordex_domain("EUR-11", dummy="topo") + filename = run_cmorizer(ds, "orog", "EUR-11", "fx", outpath=home) + print(filename) + assert home in filename