Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(remotestore): raise error if path includes scheme #2348

Merged
merged 16 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/guide/storage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ that implements the `AbstractFileSystem` API,
.. code-block:: python

>>> import zarr
>>> store = zarr.storage.RemoteStore("gs://foo/bar", mode="r")
>>> store = zarr.storage.RemoteStore.from_url("gs://foo/bar", mode="r")
>>> zarr.open(store=store)
<Array <RemoteStore(GCSFileSystem, foo/bar)> shape=(10, 20) dtype=float32>

Expand Down
4 changes: 2 additions & 2 deletions src/zarr/abc/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,12 @@ def with_mode(self, mode: AccessModeLiteral) -> Self:

Parameters
----------
mode: AccessModeLiteral
mode : AccessModeLiteral
The new mode to use.

Returns
-------
store:
store
A new store of the same type with the new mode.

Examples
Expand Down
54 changes: 47 additions & 7 deletions src/zarr/storage/remote.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import warnings
from typing import TYPE_CHECKING, Any, Self

import fsspec
Expand Down Expand Up @@ -34,7 +35,8 @@ class RemoteStore(Store):
mode : AccessModeLiteral
The access mode to use.
path : str
The root path of the store.
The root path of the store. This should be a relative path and must not include the
filesystem scheme.
allowed_exceptions : tuple[type[Exception], ...]
When fetching data, these cases will be deemed to correspond to missing keys.

Expand All @@ -46,6 +48,23 @@ class RemoteStore(Store):
supports_deletes
supports_partial_writes
supports_listing

Raises
------
TypeError
If the Filesystem does not support async operations.
ValueError
If the path argument includes a scheme.

Warns
-----
UserWarning
If the file system (fs) was not created with `asynchronous=True`.

See Also
--------
RemoteStore.from_upath
RemoteStore.from_url
"""

# based on FSSpec
Expand All @@ -71,6 +90,15 @@ def __init__(

if not self.fs.async_impl:
raise TypeError("Filesystem needs to support async operations.")
if not self.fs.asynchronous:
warnings.warn(
f"fs ({fs}) was not created with `asynchronous=True`, this may lead to surprising behavior",
stacklevel=2,
)
if "://" in path and not path.startswith("http"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could consider also "::", since that is used in chained fsspec URLs, but should not normally end up passed to the FS. However, it is not as special as "://", so I'm ok not to make a special case for it.

# `not path.startswith("http")` is a special case for the http filesystem (¯\_(ツ)_/¯)
scheme, _ = path.split("://", maxsplit=1)
raise ValueError(f"path argument to RemoteStore must not include scheme ({scheme}://)")

@classmethod
def from_upath(
Expand Down Expand Up @@ -130,13 +158,23 @@ def from_url(
-------
RemoteStore
"""
fs, path = fsspec.url_to_fs(url, **storage_options)
opts = storage_options or {}
opts = {"asynchronous": True, **opts}

fs, path = fsspec.url_to_fs(url, **opts)

# fsspec is not consistent about removing the scheme from the path, so check and strip it here
# https://github.com/fsspec/filesystem_spec/issues/1722
if "://" in path and not path.startswith("http"):
# `not path.startswith("http")` is a special case for the http filesystem (¯\_(ツ)_/¯)
_, path = path.split("://", maxsplit=1)

return cls(fs=fs, path=path, mode=mode, allowed_exceptions=allowed_exceptions)

async def clear(self) -> None:
# docstring inherited
try:
for subpath in await self.fs._find(self.path, withdirs=True):
for subpath in await self.fs._find(self.path, withdirs=True, refresh=True):
if subpath != self.path:
await self.fs._rm(subpath, recursive=True)
except FileNotFoundError:
Expand All @@ -148,7 +186,7 @@ async def empty(self) -> bool:
# TODO: it would be nice if we didn't have to list all keys here
# it should be possible to stop after the first key is discovered
try:
return not await self.fs._ls(self.path)
return not await self.fs._ls(self.path, refresh=True)
except FileNotFoundError:
return True

Expand Down Expand Up @@ -282,15 +320,15 @@ async def set_partial_values(

async def list(self) -> AsyncGenerator[str, None]:
# docstring inherited
allfiles = await self.fs._find(self.path, detail=False, withdirs=False)
allfiles = await self.fs._find(self.path, detail=False, withdirs=False, refresh=True)
for onefile in (a.replace(self.path + "/", "") for a in allfiles):
yield onefile

async def list_dir(self, prefix: str) -> AsyncGenerator[str, None]:
# docstring inherited
prefix = f"{self.path}/{prefix.rstrip('/')}"
try:
allfiles = await self.fs._ls(prefix, detail=False)
allfiles = await self.fs._ls(prefix, detail=False, refresh=True)
except FileNotFoundError:
return
for onefile in (a.replace(prefix + "/", "") for a in allfiles):
Expand All @@ -299,5 +337,7 @@ async def list_dir(self, prefix: str) -> AsyncGenerator[str, None]:
async def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]:
# docstring inherited
find_str = f"{self.path}/{prefix}"
for onefile in await self.fs._find(find_str, detail=False, maxdepth=None, withdirs=False):
for onefile in await self.fs._find(
find_str, detail=False, maxdepth=None, withdirs=False, refresh=True
):
yield onefile.removeprefix(find_str)
5 changes: 1 addition & 4 deletions tests/v3/test_store/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,7 @@ async def test_make_store_path_invalid() -> None:


async def test_make_store_path_fsspec(monkeypatch) -> None:
import fsspec.implementations.memory

monkeypatch.setattr(fsspec.implementations.memory.MemoryFileSystem, "async_impl", True)
store_path = await make_store_path("memory://")
store_path = await make_store_path("http://foo.com/bar")
assert isinstance(store_path.store, RemoteStore)


Expand Down
35 changes: 29 additions & 6 deletions tests/v3/test_store/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,12 @@ def s3(s3_base: None) -> Generator[s3fs.S3FileSystem, None, None]:

async def test_basic() -> None:
store = RemoteStore.from_url(
f"s3://{test_bucket_name}",
f"s3://{test_bucket_name}/foo/spam/",
mode="w",
storage_options={"endpoint_url": endpoint_url, "anon": False},
)
assert store.fs.asynchronous
assert store.path == f"{test_bucket_name}/foo/spam"
assert await _collect_aiterator(store.list()) == ()
assert not await store.exists("foo")
data = b"hello"
Expand All @@ -109,7 +111,7 @@ class TestRemoteStoreS3(StoreTests[RemoteStore, cpu.Buffer]):
@pytest.fixture
def store_kwargs(self, request) -> dict[str, str | bool]:
fs, path = fsspec.url_to_fs(
f"s3://{test_bucket_name}", endpoint_url=endpoint_url, anon=False
f"s3://{test_bucket_name}", endpoint_url=endpoint_url, anon=False, asynchronous=True
)
return {"fs": fs, "path": path, "mode": "r+"}

Expand Down Expand Up @@ -143,9 +145,7 @@ def test_store_supports_partial_writes(self, store: RemoteStore) -> None:
def test_store_supports_listing(self, store: RemoteStore) -> None:
assert store.supports_listing

async def test_remote_store_from_uri(
self, store: RemoteStore, store_kwargs: dict[str, str | bool]
):
async def test_remote_store_from_uri(self, store: RemoteStore):
storage_options = {
"endpoint_url": endpoint_url,
"anon": False,
Expand Down Expand Up @@ -183,9 +183,32 @@ async def test_remote_store_from_uri(
assert dict(group.attrs) == {"key": "value-3"}

def test_from_upath(self) -> None:
path = UPath(f"s3://{test_bucket_name}", endpoint_url=endpoint_url, anon=False)
path = UPath(
f"s3://{test_bucket_name}/foo/bar/",
endpoint_url=endpoint_url,
anon=False,
asynchronous=True,
)
result = RemoteStore.from_upath(path)
assert result.fs.endpoint_url == endpoint_url
assert result.fs.asynchronous
assert result.path == f"{test_bucket_name}/foo/bar"

def test_init_raises_if_path_has_scheme(self, store_kwargs) -> None:
# regression test for https://github.com/zarr-developers/zarr-python/issues/2342
store_kwargs["path"] = "s3://" + store_kwargs["path"]
with pytest.raises(
ValueError, match="path argument to RemoteStore must not include scheme .*"
):
self.store_cls(**store_kwargs)

def test_init_warns_if_fs_asynchronous_is_false(self) -> None:
fs, path = fsspec.url_to_fs(
f"s3://{test_bucket_name}", endpoint_url=endpoint_url, anon=False, asynchronous=False
)
store_kwargs = {"fs": fs, "path": path, "mode": "r+"}
with pytest.warns(UserWarning, match=r".* was not created with `asynchronous=True`.*"):
self.store_cls(**store_kwargs)

async def test_empty_nonexistent_path(self, store_kwargs) -> None:
# regression test for https://github.com/zarr-developers/zarr-python/pull/2343
Expand Down