Skip to content

Commit

Permalink
Feat: improves delete_dir for s3fs-backed FsspecStore (#2661)
Browse files Browse the repository at this point in the history
* Implement asynchronous directory deletion in FsspecStore

- override Store.delete_dir default method, which deletes keys one by one, to support bulk deletion for fsspec implementations that support a list of paths in the fs._rm method.
- This can greatly reduce the number of requests to S3, which reduces likelihood of running into throttling errors and improves delete performance.
- Currently, only s3fs is supported.

* Use async batched _rm() for FsspecStore.delete_dir()

* Suppress allowed exceptions instead of try-except-pass

* Adds note on possibly redundant condition in FsspecStore.delete_dir()

* Fix: unpack allowed arguments list

* Adds tests for FsspecStore.delete_dir

* Update src/zarr/storage/_fsspec.py

Co-authored-by: Joe Hamman <jhamman1@gmail.com>

* Remove supports_listing condition from FsspecStore.delete_dir

* use f-string for url formatting

* assert `store.fs.asynchronous` instead of `store.fs.async_impl`

* updates release notes

* remove unused import

* Explicitly construct wrapped local filesystem for test

---------

Co-authored-by: Joe Hamman <joe@earthmover.io>
Co-authored-by: Joe Hamman <jhamman1@gmail.com>
Co-authored-by: Deepak Cherian <dcherian@users.noreply.github.com>
  • Loading branch information
4 people authored Feb 14, 2025
1 parent 23abb5b commit 48f7c9a
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 0 deletions.
1 change: 1 addition & 0 deletions changes/2661.feature.1.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improves performance of FsspecStore.delete_dir for remote filesystems supporting concurrent/batched deletes, e.g., s3fs.
14 changes: 14 additions & 0 deletions src/zarr/storage/_fsspec.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import warnings
from contextlib import suppress
from typing import TYPE_CHECKING, Any

from zarr.abc.store import (
Expand Down Expand Up @@ -286,6 +287,19 @@ async def delete(self, key: str) -> None:
except self.allowed_exceptions:
pass

async def delete_dir(self, prefix: str) -> None:
# docstring inherited
if not self.supports_deletes:
raise NotImplementedError(
"This method is only available for stores that support deletes."
)
self._check_writable()

path_to_delete = _dereference_path(self.path, prefix)

with suppress(*self.allowed_exceptions):
await self.fs._rm(path_to_delete, recursive=True)

async def exists(self, key: str) -> bool:
# docstring inherited
path = _dereference_path(self.path, key)
Expand Down
33 changes: 33 additions & 0 deletions tests/test_store/test_fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,14 @@ async def test_empty_nonexistent_path(self, store_kwargs) -> None:
store = await self.store_cls.open(**store_kwargs)
assert await store.is_empty("")

async def test_delete_dir_unsupported_deletes(self, store: FsspecStore) -> None:
store.supports_deletes = False
with pytest.raises(
NotImplementedError,
match="This method is only available for stores that support deletes.",
):
await store.delete_dir("test_prefix")


@pytest.mark.skipif(
parse_version(fsspec.__version__) < parse_version("2024.12.0"),
Expand Down Expand Up @@ -244,3 +252,28 @@ def test_no_wrap_async_filesystem():

assert not isinstance(store.fs, AsyncFileSystemWrapper)
assert store.fs.async_impl


@pytest.mark.skipif(
parse_version(fsspec.__version__) < parse_version("2024.12.0"),
reason="No AsyncFileSystemWrapper",
)
async def test_delete_dir_wrapped_filesystem(tmpdir) -> None:
from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper
from fsspec.implementations.local import LocalFileSystem

wrapped_fs = AsyncFileSystemWrapper(LocalFileSystem(auto_mkdir=True))
store = FsspecStore(wrapped_fs, read_only=False, path=f"{tmpdir}/test/path")

assert isinstance(store.fs, AsyncFileSystemWrapper)
assert store.fs.asynchronous

await store.set("zarr.json", cpu.Buffer.from_bytes(b"root"))
await store.set("foo-bar/zarr.json", cpu.Buffer.from_bytes(b"root"))
await store.set("foo/zarr.json", cpu.Buffer.from_bytes(b"bar"))
await store.set("foo/c/0", cpu.Buffer.from_bytes(b"chunk"))
await store.delete_dir("foo")
assert await store.exists("zarr.json")
assert await store.exists("foo-bar/zarr.json")
assert not await store.exists("foo/zarr.json")
assert not await store.exists("foo/c/0")

0 comments on commit 48f7c9a

Please sign in to comment.