From 13866e05513f025432a56c5884debde93acf19a5 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Tue, 7 Nov 2023 16:08:27 -0600 Subject: [PATCH 1/5] Add support for remote string paths to h5netcdf engine --- xarray/backends/file_manager.py | 8 +++++++- xarray/backends/h5netcdf_.py | 24 +++++++++++++++++++++--- xarray/tests/test_backends.py | 21 +++++++++++++++++++++ 3 files changed, 49 insertions(+), 4 deletions(-) diff --git a/xarray/backends/file_manager.py b/xarray/backends/file_manager.py index df901f9a1d9..0b8f192c5bc 100644 --- a/xarray/backends/file_manager.py +++ b/xarray/backends/file_manager.py @@ -157,11 +157,17 @@ def __init__( def _make_key(self): """Make a key for caching files in the LRU cache.""" + kwargs = self._kwargs + # storage_options is a non-hashable dict, so we implement special logic for hashing + if self._kwargs.get("storage_options", None) is not None: + kwargs = self._kwargs.copy() + kwargs["storage_options"] = tuple(sorted(kwargs["storage_options"].items())) + value = ( self._opener, self._args, "a" if self._mode == "w" else self._mode, - tuple(sorted(self._kwargs.items())), + tuple(sorted(kwargs.items())), self._manager_id, ) return _HashedSequence(value) diff --git a/xarray/backends/h5netcdf_.py b/xarray/backends/h5netcdf_.py index 19748084625..0ca992b0a98 100644 --- a/xarray/backends/h5netcdf_.py +++ b/xarray/backends/h5netcdf_.py @@ -88,6 +88,20 @@ def _h5netcdf_create_group(dataset, name): return dataset.create_group(name) +def _h5netcdf_opener(filename, mode, storage_options=None, **kwargs): + import h5netcdf + + if is_remote_uri(filename): + import fsspec + + mode_ = "rb" if mode == "r" else mode + fs, _, _ = fsspec.get_fs_token_paths( + filename, mode=mode_, storage_options=storage_options + ) + filename = fs.open(filename, mode=mode_) + return h5netcdf.File(filename, mode=mode, **kwargs) + + class H5NetCDFStore(WritableCFDataStore): """Store for reading and writing data via h5netcdf""" @@ -140,9 +154,8 @@ def open( invalid_netcdf=None, phony_dims=None, decode_vlen_strings=True, + storage_options=None, ): - import h5netcdf - if isinstance(filename, bytes): raise ValueError( "can't open netCDF4/HDF5 as bytes " @@ -161,6 +174,7 @@ def open( kwargs = { "invalid_netcdf": invalid_netcdf, "decode_vlen_strings": decode_vlen_strings, + "storage_options": storage_options, } if phony_dims is not None: kwargs["phony_dims"] = phony_dims @@ -171,7 +185,9 @@ def open( else: lock = combine_locks([HDF5_LOCK, get_write_lock(filename)]) - manager = CachingFileManager(h5netcdf.File, filename, mode=mode, kwargs=kwargs) + manager = CachingFileManager( + _h5netcdf_opener, filename, mode=mode, kwargs=kwargs + ) return cls(manager, group=group, mode=mode, lock=lock, autoclose=autoclose) def _acquire(self, needs_lock=True): @@ -397,6 +413,7 @@ def open_dataset( # type: ignore[override] # allow LSP violation, not supporti invalid_netcdf=None, phony_dims=None, decode_vlen_strings=True, + storage_options=None, ) -> Dataset: filename_or_obj = _normalize_path(filename_or_obj) store = H5NetCDFStore.open( @@ -407,6 +424,7 @@ def open_dataset( # type: ignore[override] # allow LSP violation, not supporti invalid_netcdf=invalid_netcdf, phony_dims=phony_dims, decode_vlen_strings=decode_vlen_strings, + storage_options=storage_options, ) store_entrypoint = StoreBackendEntrypoint() diff --git a/xarray/tests/test_backends.py b/xarray/tests/test_backends.py index 59e9f655b2e..33825796ec7 100644 --- a/xarray/tests/test_backends.py +++ b/xarray/tests/test_backends.py @@ -2915,6 +2915,27 @@ def test_zarr_storage_options() -> None: assert_identical(ds, ds_a) +@requires_h5netcdf +@requires_fsspec +def test_h5netcdf_storage_options() -> None: + with create_tmp_files(2) as (f1, f2): + ds1 = create_test_data() + ds1.to_netcdf(f1, engine="h5netcdf") + + ds2 = create_test_data() + ds2.to_netcdf(f2, engine="h5netcdf") + + files = [f"file://{f}" for f in [f1, f2]] + ds = xr.open_mfdataset( + files, + engine="h5netcdf", + concat_dim="time", + combine="nested", + storage_options={"skip_instance_cache": False}, + ) + assert_identical(xr.concat([ds1, ds2], dim="time"), ds) + + @requires_scipy class TestScipyInMemoryData(CFEncodedBase, NetCDF3Only): engine: T_NetcdfEngine = "scipy" From 8090116a09a533bf3ccd1afed527d3011d11be3b Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Tue, 7 Nov 2023 16:29:34 -0600 Subject: [PATCH 2/5] Fixup --- xarray/backends/h5netcdf_.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xarray/backends/h5netcdf_.py b/xarray/backends/h5netcdf_.py index 0ca992b0a98..fab37df3d15 100644 --- a/xarray/backends/h5netcdf_.py +++ b/xarray/backends/h5netcdf_.py @@ -91,7 +91,7 @@ def _h5netcdf_create_group(dataset, name): def _h5netcdf_opener(filename, mode, storage_options=None, **kwargs): import h5netcdf - if is_remote_uri(filename): + if isinstance(filename, str) and is_remote_uri(filename): import fsspec mode_ = "rb" if mode == "r" else mode From 124b14cff341f251df0b18518b44563bb4d29224 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Thu, 16 Nov 2023 11:27:25 -0600 Subject: [PATCH 3/5] Test fixup --- xarray/backends/h5netcdf_.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/xarray/backends/h5netcdf_.py b/xarray/backends/h5netcdf_.py index 8627fb999d5..a78c5ec005c 100644 --- a/xarray/backends/h5netcdf_.py +++ b/xarray/backends/h5netcdf_.py @@ -91,7 +91,11 @@ def _h5netcdf_create_group(dataset, name): def _h5netcdf_opener(filename, mode, storage_options=None, **kwargs): import h5netcdf - if isinstance(filename, str) and is_remote_uri(filename): + if ( + isinstance(filename, str) + and is_remote_uri(filename) + and kwargs["driver"] is None + ): import fsspec mode_ = "rb" if mode == "r" else mode From 4e248dc9de94e8039d2bf9993f146f9670dd0928 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kai=20M=C3=BChlbauer?= Date: Fri, 22 Dec 2023 12:58:32 +0100 Subject: [PATCH 4/5] init fsspec before init CachingFileManager --- xarray/backends/common.py | 18 +++++++++++++----- xarray/backends/h5netcdf_.py | 18 ++++++++++++++---- 2 files changed, 27 insertions(+), 9 deletions(-) diff --git a/xarray/backends/common.py b/xarray/backends/common.py index 5b8f9a6840f..e09f8db79d2 100644 --- a/xarray/backends/common.py +++ b/xarray/backends/common.py @@ -81,24 +81,32 @@ def _find_absolute_paths( ['common.py'] """ if isinstance(paths, str): - if is_remote_uri(paths) and kwargs.get("engine", None) == "zarr": + if is_remote_uri(paths) and (engine := kwargs.get("engine", None)) in [ + "zarr", + "h5netcdf", + ]: try: from fsspec.core import get_fs_token_paths except ImportError as e: raise ImportError( - "The use of remote URLs for opening zarr requires the package fsspec" + "The use of remote URLs for opening zarr and h5netcdf requires the package fsspec" ) from e + mode = kwargs.get("mode", "rb") + mode_ = "rb" if mode == "r" else mode fs, _, _ = get_fs_token_paths( paths, - mode="rb", + mode=mode_, storage_options=kwargs.get("backend_kwargs", {}).get( "storage_options", {} ), expand=False, ) - tmp_paths = fs.glob(fs._strip_protocol(paths)) # finds directories - paths = [fs.get_mapper(path) for path in tmp_paths] + if engine == "h5netcdf": + paths = fs.open(paths, mode=mode_) + else: + tmp_paths = fs.glob(fs._strip_protocol(paths)) # finds directories + paths = [fs.get_mapper(path) for path in tmp_paths] elif is_remote_uri(paths): raise ValueError( "cannot do wild-card matching for paths that are remote URLs " diff --git a/xarray/backends/h5netcdf_.py b/xarray/backends/h5netcdf_.py index b9314f661b1..741a0b56fc9 100644 --- a/xarray/backends/h5netcdf_.py +++ b/xarray/backends/h5netcdf_.py @@ -162,6 +162,8 @@ def open( driver_kwds=None, storage_options=None, ): + import h5netcdf + if isinstance(filename, bytes): raise ValueError( "can't open netCDF4/HDF5 as bytes " @@ -177,11 +179,21 @@ def open( if format not in [None, "NETCDF4"]: raise ValueError("invalid format for h5netcdf backend") + # get open fsspec-handle first + from xarray.backends.common import _find_absolute_paths + + if storage_options is not None: + filename = _find_absolute_paths( + filename, + engine="h5netcdf", + mode=mode, + backend_kwargs=dict(storage_options=storage_options), + ) + kwargs = { "invalid_netcdf": invalid_netcdf, "decode_vlen_strings": decode_vlen_strings, "driver": driver, - "storage_options": storage_options, } if driver_kwds is not None: kwargs.update(driver_kwds) @@ -194,9 +206,7 @@ def open( else: lock = combine_locks([HDF5_LOCK, get_write_lock(filename)]) - manager = CachingFileManager( - _h5netcdf_opener, filename, mode=mode, kwargs=kwargs - ) + manager = CachingFileManager(h5netcdf.File, filename, mode=mode, kwargs=kwargs) return cls(manager, group=group, mode=mode, lock=lock, autoclose=autoclose) def _acquire(self, needs_lock=True): From 38b9001d739fda2f6bbc5280187cff1490b8f2c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kai=20M=C3=BChlbauer?= Date: Fri, 22 Dec 2023 14:30:17 +0100 Subject: [PATCH 5/5] Update xarray/tests/test_backends.py --- xarray/tests/test_backends.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xarray/tests/test_backends.py b/xarray/tests/test_backends.py index 3d5de9358ca..e06d6311c8a 100644 --- a/xarray/tests/test_backends.py +++ b/xarray/tests/test_backends.py @@ -3019,7 +3019,7 @@ def test_zarr_storage_options() -> None: @requires_h5netcdf @requires_fsspec def test_h5netcdf_storage_options() -> None: - with create_tmp_files(2) as (f1, f2): + with create_tmp_files(2, allow_cleanup_failure=ON_WINDOWS) as (f1, f2): ds1 = create_test_data() ds1.to_netcdf(f1, engine="h5netcdf")