Skip to content

Commit

Permalink
refactor(fsspec): remove upper bound on fsspec (#221)
Browse files Browse the repository at this point in the history
* refactor(fsspec): remove upper bound on fsspec

We're using `pins` as a dependency in `ibis-framework` and ideally we
can remove the upper-pin on `fsspec` so that doesn't impact our users.
I think the usage of the moved `hash_name` function is the only breaking
change between the current upper pin and most recent release, so I've
added a small workaround to handle the change.

All of the local tests pass, I didn't yet set up sufficient credentials
to handle the various cloud tests.

* vendor in hash_name

* file or local protocols

* update gcs path

* file, local not protocols

* move to custom CacheMappers

* move PinsRscCache to its own mapper

* remove same_name

* fix test paths to be windows compatible

* remove 3.7 support

* specify get_file for RsConnectFs

* keep a call to hash_name

* move gh org machow-> rstudio

---------

Co-authored-by: isabelizimm <isabel.zimmerman@rstudio.com>
  • Loading branch information
gforsyth and isabelizimm authored Dec 18, 2023
1 parent e7f8077 commit d0d1095
Show file tree
Hide file tree
Showing 10 changed files with 129 additions and 69 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ README.md:
test: test-most test-rsc

test-most:
pytest pins -m "not fs_rsc" --workers 4 --tests-per-worker 1
pytest pins -m "not fs_rsc and not fs_s3" --workers 4 --tests-per-worker 1 -vv

test-rsc:
pytest pins -m "fs_rsc"
Expand Down
6 changes: 3 additions & 3 deletions pins/boards.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from .drivers import load_data, save_data, load_file, default_title
from .utils import inform, warn_deprecated, ExtendMethodDoc
from .config import get_allow_rsc_short_name
from .cache import PinsCache


_log = logging.getLogger(__name__)
Expand Down Expand Up @@ -733,9 +734,8 @@ def _touch_cache(self, path):

# TODO: assumes same_name set to True. Let's require this be set to
# instantiate a pins cache.
if not hasattr(self.fs, "cached_files"):
if not isinstance(self.fs, PinsCache):
return

path_to_hashed = self.fs._check_file(path)
return touch_access_time(path_to_hashed)

Expand All @@ -747,7 +747,7 @@ class BoardManual(BaseBoard):
--------
>>> import fsspec
>>> import os
>>> fs = fsspec.filesystem("github", org = "machow", repo = "pins-python")
>>> fs = fsspec.filesystem("github", org = "rstudio", repo = "pins-python")
>>> pin_paths = {"df_csv": "df_csv/20220214T163720Z-9bfad/"}
>>> board = BoardManual("pins/tests/pins-compat", fs, pin_paths=pin_paths)
Expand Down
126 changes: 78 additions & 48 deletions pins/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
import shutil
import urllib.parse

from fsspec.implementations.cached import SimpleCacheFileSystem, hash_name
from fsspec.implementations.cached import SimpleCacheFileSystem
from fsspec import register_implementation
from pathlib import Path

from .config import get_cache_dir
from .utils import inform
from .utils import inform, hash_name

_log = logging.getLogger(__name__)

Expand Down Expand Up @@ -57,12 +57,62 @@ def prefix_cache(fs, board_base_path):
return f"{proto_name}_{base_hash}"


class HashMapper:
def __init__(self, hash_prefix):
self.hash_prefix = hash_prefix

def __call__(self, path: str) -> str:
if self.hash_prefix is not None:
# optionally make the name relative to a parent path
# using the hash of parent path as a prefix, to flatten a bit
hash = Path(path).relative_to(Path(self.hash_prefix))
return hash

else:
raise NotImplementedError()


class PinsAccessTimeCacheMapper:
def __init__(self, hash_prefix):
self.hash_prefix = hash_prefix

def __call__(self, path):
# hash full path, and put anything after the final / at the end, just
# to make it easier to browse.
# this has
base_name = hash_name(path, False)
suffix = Path(path).name
return f"{base_name}_{suffix}"


class PinsRscCacheMapper:
"""Modifies the PinsCache to allow hash_prefix to be an RSC server url.
Note that this class also modifies the first / in a path to be a -, so that
pin contents will not be put into subdirectories, for e.g. michael/mtcars/data.txt.
"""

def __init__(self, hash_prefix):
self.hash_prefix = hash_prefix

def __call__(self, path):
# the main change in this function is that, for same_name, it returns
# the full path
# change pin path of form <user>/<content> to <user>+<content>
hash = path.replace("/", "+", 1)
return hash


class PinsCache(SimpleCacheFileSystem):
protocol = "pinscache"

def __init__(self, *args, hash_prefix=None, **kwargs):
def __init__(self, *args, hash_prefix=None, mapper=HashMapper, **kwargs):
super().__init__(*args, **kwargs)
self.hash_prefix = hash_prefix
self._mapper = mapper(hash_prefix)

def hash_name(self, path, *args, **kwargs):
return self._mapper(path)

def _open(self, path, *args, **kwargs):
# For some reason, the open method of SimpleCacheFileSystem doesn't
Expand All @@ -83,43 +133,14 @@ def _make_local_details(self, path):

return fn

def hash_name(self, path, same_name):
# the main change in this function is that, for same_name, it returns
# the full path
if same_name:
if self.hash_prefix is not None:
# optionally make the name relative to a parent path
# using the hash of parent path as a prefix, to flatten a bit
hash = Path(path).relative_to(Path(self.hash_prefix))
return hash

return path
else:
raise NotImplementedError()


class PinsRscCache(PinsCache):
"""Modifies the PinsCache to allow hash_prefix to be an RSC server url.
Note that this class also modifies the first / in a path to be a -, so that
pin contents will not be put into subdirectories, for e.g. michael/mtcars/data.txt.
"""

protocol = "pinsrsccache"

def hash_name(self, path, same_name):
# the main change in this function is that, for same_name, it returns
# the full path
if same_name:
if self.hash_prefix is None:
raise NotImplementedError()

# change pin path of form <user>/<content> to <user>+<content>
hash = path.replace("/", "+", 1)
return hash

else:
raise NotImplementedError()
# same as upstream, brought in to preserve backwards compatibility
def _check_file(self, path):
self._check_cache()
sha = self._mapper(path)
for storage in self.storage:
fn = os.path.join(storage, sha)
if os.path.exists(fn):
return fn


class PinsUrlCache(PinsCache):
Expand Down Expand Up @@ -154,15 +175,15 @@ def hash_name(self, path, same_name):
class PinsAccessTimeCache(SimpleCacheFileSystem):
name = "pinsaccesstimecache"

def hash_name(self, path, same_name):
if same_name:
raise NotImplementedError("same_name not implemented.")
def __init__(
self, *args, hash_prefix=None, mapper=PinsAccessTimeCacheMapper, **kwargs
):
super().__init__(*args, **kwargs)
self.hash_prefix = hash_prefix
self._mapper = mapper(hash_prefix)

# hash full path, and put anything after the final / at the end, just
# to make it easier to browse.
base_name = super().hash_name(path, same_name)
suffix = Path(path).name
return f"{base_name}_{suffix}"
def hash_name(self, path, *args, **kwargs):
return self._mapper(path)

def _open(self, path, mode="rb", **kwargs):
f = super()._open(path, mode=mode, **kwargs)
Expand All @@ -177,6 +198,15 @@ def _open(self, path, mode="rb", **kwargs):

return f

# same as upstream, brought in to preserve backwards compatibility
def _check_file(self, path):
self._check_cache()
sha = self._mapper(path)
for storage in self.storage:
fn = os.path.join(storage, sha)
if os.path.exists(fn):
return fn


class CachePruner:
"""Prunes the cache directory, across multiple boards.
Expand Down
22 changes: 13 additions & 9 deletions pins/constructors.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import tempfile

from .boards import BaseBoard, BoardRsConnect, BoardManual
from .cache import PinsCache, PinsRscCache, PinsAccessTimeCache, prefix_cache
from .cache import PinsCache, PinsRscCacheMapper, PinsAccessTimeCache, prefix_cache
from .config import get_data_dir, get_cache_dir


Expand Down Expand Up @@ -56,9 +56,9 @@ def board_deparse(board: BaseBoard):
if prot == "rsc":
url = board.fs.api.server_url
return f"board_connect(server_url={repr(url)}{allow_pickle})"
elif prot == "file":
elif prot in ["file", ("file", "local")]:
return f"board_folder({repr(board.board)}{allow_pickle})"
elif prot == ["s3", "s3a"]:
elif prot in [["s3", "s3a"], ("s3", "s3a")]:
return f"board_s3({repr(board.board)}{allow_pickle})"
elif prot == "abfs":
return f"board_azure({repr(board.board)}{allow_pickle})"
Expand Down Expand Up @@ -93,15 +93,15 @@ def board(
Parameters
----------
protocol:
protocol: str
File system protocol. E.g. file, s3, github, rsc (for Posit Connect).
See `fsspec.filesystem` for more information.
path:
path: str
A base path the board should use. For example, the directory the board lives in,
or the path to its S3 bucket.
versioned:
versioned: bool
Whether or not pins should be versioned.
cache:
cache: optional, type[DEFAULT]
Whether to use a cache. By default, pins attempts to select the right cache
directory, given your filesystem. If `None` is passed, then no cache will be
used. You can set the cache using the `PINS_CACHE_DIR` environment variable.
Expand Down Expand Up @@ -154,8 +154,12 @@ def board(
board_cache = prefix_cache(fs, hash_prefix)
cache_dir = os.path.join(base_cache_dir, board_cache)

fs = PinsRscCache(
cache_storage=cache_dir, fs=fs, hash_prefix=hash_prefix, same_names=True
fs = PinsCache(
cache_storage=cache_dir,
fs=fs,
hash_prefix=hash_prefix,
same_names=True,
mapper=PinsRscCacheMapper,
)
else:
# ensures each subdir path is its own cache directory
Expand Down
9 changes: 9 additions & 0 deletions pins/rsconnect/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
RsConnectApiRequestError,
RSC_CODE_OBJECT_DOES_NOT_EXIST,
)
from ..utils import isfilelike

# Misc ----

Expand Down Expand Up @@ -277,6 +278,14 @@ def get(self, rpath, lpath, recursive=False, *args, **kwargs) -> None:
bundle["content_guid"], bundle["id"], parsed.file_name, lpath
)

def get_file(self, rpath, lpath, **kwargs):
data = self.cat_file(rpath, **kwargs)
if isfilelike(lpath):
lpath.write(data)
else:
with open(lpath, "wb") as f:
f.write(data)

def exists(self, path: str, **kwargs) -> bool:
try:
self.info(path)
Expand Down
3 changes: 1 addition & 2 deletions pins/tests/test_boards.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ def test_board_pin_download_filename(board_with_cache, tmp_path):

def test_board_pin_download_no_cache_error(board, tmp_path):
df = pd.DataFrame({"x": [1, 2, 3]})

path = tmp_path / "data.csv"
df.to_csv(path, index=False)

Expand All @@ -210,7 +209,7 @@ def test_board_pin_download_no_cache_error(board, tmp_path):
assert meta.type == "file"

# file boards work okay, since the board directory itself is the cache
if board.fs.protocol == "file":
if board.fs.protocol in ["file", ("file", "local")]:
pytest.skip()

# uncached boards should fail, since nowhere to store the download
Expand Down
4 changes: 2 additions & 2 deletions pins/tests/test_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ def test_touch_access_time_auto(some_file):


def test_pins_cache_hash_name_preserves():
cache = PinsCache(fs=filesystem("file"))
assert cache.hash_name("a/b/c.txt", True) == "a/b/c.txt"
cache = PinsCache(fs=filesystem("file"), hash_prefix="")
assert cache.hash_name("a/b/c.txt") == Path("a/b/c.txt")


def test_pins_cache_url_hash_name():
Expand Down
6 changes: 3 additions & 3 deletions pins/tests/test_constructors.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def construct_from_board(board):
prot = board.fs.protocol
fs_name = prot if isinstance(prot, str) else prot[0]

if fs_name == "file":
if fs_name in ["file", ("file", "local")]:
board = c.board_folder(board.board)
elif fs_name == "rsc":
board = c.board_rsconnect(
Expand Down Expand Up @@ -149,7 +149,7 @@ def test_constructor_board_url_file(tmp_cache, http_example_board_path):

@pytest.mark.skip_on_github
def test_constructor_board_github(tmp_cache, http_example_board_path, df_csv):
board = c.board_github("machow", "pins-python", EXAMPLE_REL_PATH) # noqa
board = c.board_github("rstudio", "pins-python", EXAMPLE_REL_PATH) # noqa

df = board.pin_read("df_csv")
assert_frame_equal(df, df_csv)
Expand Down Expand Up @@ -190,7 +190,7 @@ def test_constructor_boards(board, df_csv, tmp_cache):
# check the cache structure -----------------------------------------------

# check cache
if board.fs.protocol == "file":
if board.fs.protocol in ["file", ("file", "local")]:
# no caching for local file boards
pass
else:
Expand Down
18 changes: 18 additions & 0 deletions pins/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import hashlib
import os
import sys

from functools import update_wrapper
Expand All @@ -19,6 +21,14 @@ def warn_deprecated(msg):
warn(msg, DeprecationWarning)


def hash_name(path, same_name):
if same_name:
hash = os.path.basename(path)
else:
hash = hashlib.sha256(path.encode()).hexdigest()
return hash


class ExtendMethodDoc:
# Note that the indentation assumes these are top-level method docstrings,
# so are indented 8 spaces (after the initial sentence).
Expand Down Expand Up @@ -68,3 +78,11 @@ def __call__(self, *args, **kwargs):
# which allows all the inspect machinery to give sphinx the __call__
# attribute we set in __init__.
raise NotImplementedError()


# based off fsspec.isfilelike
def isfilelike(file) -> bool:
for attr in ["read", "close", "tell"]:
if not hasattr(file, attr):
return False
return True
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ zipsafe = False

python_requires = >=3.8
install_requires =
fsspec>=2022.2.0,<2023.9.0
fsspec>=2022.2.0
pyyaml>=3.13
xxhash>=1.0.0
pandas>=0.23.0
Expand Down

0 comments on commit d0d1095

Please sign in to comment.