Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(remotestore): raise error if path includes scheme #2348

Merged
merged 16 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/guide/storage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ that implements the `AbstractFileSystem` API,
.. code-block:: python

>>> import zarr
>>> store = zarr.storage.RemoteStore("gs://foo/bar", mode="r")
>>> store = zarr.storage.RemoteStore.from_url("gs://foo/bar", mode="r")
>>> zarr.open(store=store)
<Array <RemoteStore(GCSFileSystem, foo/bar)> shape=(10, 20) dtype=float32>

Expand Down
2 changes: 1 addition & 1 deletion src/zarr/abc/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def with_mode(self, mode: AccessModeLiteral) -> Self:

Returns
-------
store:
store
A new store of the same type with the new mode.

Examples
Expand Down
42 changes: 40 additions & 2 deletions src/zarr/storage/remote.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import warnings
from typing import TYPE_CHECKING, Any, Self

from zarr.abc.store import ByteRangeRequest, Store
Expand Down Expand Up @@ -32,7 +33,8 @@ class RemoteStore(Store):
mode : AccessModeLiteral
The access mode to use.
path : str
The root path of the store.
The root path of the store. This should be a relative path and must not include the
filesystem scheme.
allowed_exceptions : tuple[type[Exception], ...]
When fetching data, these cases will be deemed to correspond to missing keys.

Expand All @@ -44,6 +46,23 @@ class RemoteStore(Store):
supports_deletes
supports_partial_writes
supports_listing

Raises
------
TypeError
If the Filesystem does not support async operations.
ValueError
If the path argument includes a scheme.

Warns
-----
UserWarning
If the file system (fs) was not created with `asynchronous=True`.

See Also
--------
RemoteStore.from_upath
RemoteStore.from_url
"""

# based on FSSpec
Expand All @@ -69,6 +88,15 @@ def __init__(

if not self.fs.async_impl:
raise TypeError("Filesystem needs to support async operations.")
if not self.fs.asynchronous:
warnings.warn(
f"fs ({fs}) was not created with `asynchronous=True`, this may lead to surprising behavior",
stacklevel=2,
)
if "://" in path and not path.startswith("http"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could consider also "::", since that is used in chained fsspec URLs, but should not normally end up passed to the FS. However, it is not as special as "://", so I'm ok not to make a special case for it.

# `not path.startswith("http")` is a special case for the http filesystem (¯\_(ツ)_/¯)
scheme, _ = path.split("://", maxsplit=1)
raise ValueError(f"path argument to RemoteStore must not include scheme ({scheme}://)")

@classmethod
def from_upath(
Expand Down Expand Up @@ -134,7 +162,17 @@ def from_url(
# before fsspec==2024.3.1
from fsspec.core import url_to_fs

fs, path = url_to_fs(url, **storage_options)
opts = storage_options or {}
opts = {"asynchronous": True, **opts}

fs, path = url_to_fs(url, **opts)

# fsspec is not consistent about removing the scheme from the path, so check and strip it here
# https://github.com/fsspec/filesystem_spec/issues/1722
if "://" in path and not path.startswith("http"):
# `not path.startswith("http")` is a special case for the http filesystem (¯\_(ツ)_/¯)
path = fs._strip_protocol(path)

return cls(fs=fs, path=path, mode=mode, allowed_exceptions=allowed_exceptions)

async def clear(self) -> None:
Expand Down
5 changes: 1 addition & 4 deletions tests/test_store/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,7 @@ async def test_make_store_path_invalid() -> None:


async def test_make_store_path_fsspec(monkeypatch) -> None:
import fsspec.implementations.memory

monkeypatch.setattr(fsspec.implementations.memory.MemoryFileSystem, "async_impl", True)
store_path = await make_store_path("memory://")
store_path = await make_store_path("http://foo.com/bar")
assert isinstance(store_path.store, RemoteStore)


Expand Down
35 changes: 29 additions & 6 deletions tests/test_store/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,12 @@ def s3(s3_base: None) -> Generator[s3fs.S3FileSystem, None, None]:

async def test_basic() -> None:
store = RemoteStore.from_url(
f"s3://{test_bucket_name}",
f"s3://{test_bucket_name}/foo/spam/",
mode="w",
storage_options={"endpoint_url": endpoint_url, "anon": False},
)
assert store.fs.asynchronous
assert store.path == f"{test_bucket_name}/foo/spam"
assert await _collect_aiterator(store.list()) == ()
assert not await store.exists("foo")
data = b"hello"
Expand All @@ -109,7 +111,7 @@ class TestRemoteStoreS3(StoreTests[RemoteStore, cpu.Buffer]):
@pytest.fixture
def store_kwargs(self, request) -> dict[str, str | bool]:
fs, path = fsspec.url_to_fs(
f"s3://{test_bucket_name}", endpoint_url=endpoint_url, anon=False
f"s3://{test_bucket_name}", endpoint_url=endpoint_url, anon=False, asynchronous=True
)
return {"fs": fs, "path": path, "mode": "r+"}

Expand Down Expand Up @@ -143,9 +145,7 @@ def test_store_supports_partial_writes(self, store: RemoteStore) -> None:
def test_store_supports_listing(self, store: RemoteStore) -> None:
assert store.supports_listing

async def test_remote_store_from_uri(
self, store: RemoteStore, store_kwargs: dict[str, str | bool]
):
async def test_remote_store_from_uri(self, store: RemoteStore):
storage_options = {
"endpoint_url": endpoint_url,
"anon": False,
Expand Down Expand Up @@ -183,9 +183,32 @@ async def test_remote_store_from_uri(
assert dict(group.attrs) == {"key": "value-3"}

def test_from_upath(self) -> None:
path = UPath(f"s3://{test_bucket_name}", endpoint_url=endpoint_url, anon=False)
path = UPath(
f"s3://{test_bucket_name}/foo/bar/",
endpoint_url=endpoint_url,
anon=False,
asynchronous=True,
)
result = RemoteStore.from_upath(path)
assert result.fs.endpoint_url == endpoint_url
assert result.fs.asynchronous
assert result.path == f"{test_bucket_name}/foo/bar"

def test_init_raises_if_path_has_scheme(self, store_kwargs) -> None:
# regression test for https://github.com/zarr-developers/zarr-python/issues/2342
store_kwargs["path"] = "s3://" + store_kwargs["path"]
with pytest.raises(
ValueError, match="path argument to RemoteStore must not include scheme .*"
):
self.store_cls(**store_kwargs)

def test_init_warns_if_fs_asynchronous_is_false(self) -> None:
fs, path = fsspec.url_to_fs(
f"s3://{test_bucket_name}", endpoint_url=endpoint_url, anon=False, asynchronous=False
)
store_kwargs = {"fs": fs, "path": path, "mode": "r+"}
with pytest.warns(UserWarning, match=r".* was not created with `asynchronous=True`.*"):
self.store_cls(**store_kwargs)

async def test_empty_nonexistent_path(self, store_kwargs) -> None:
# regression test for https://github.com/zarr-developers/zarr-python/pull/2343
Expand Down