Skip to content

Commit

Permalink
Providing filesystem credentials through storage_options (#436)
Browse files Browse the repository at this point in the history
* providing filesystem credentials to spec

* remove blank line

* added storage_options to LazyZarrArray

* place storage_options after target_path

* remove S3-specific code form zarr.py

* storage_options added to from_zarr

* spec=None in from_zarr

---------

Co-authored-by: balanz24 <pau.balanza@urv.cat>
  • Loading branch information
balanz24 and balanz24 authored Mar 28, 2024
1 parent 3d08513 commit 7b4f7e2
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 7 deletions.
8 changes: 7 additions & 1 deletion cubed/core/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
replace_ellipsis,
)

from cubed import config
from cubed.backend_array_api import namespace as nxp
from cubed.backend_array_api import numpy_array_to_backend_array
from cubed.core.array import CoreArray, check_array_specs, compute, gensym
from cubed.core.plan import Plan, new_temp_path
from cubed.primitive.blockwise import blockwise as primitive_blockwise
from cubed.primitive.blockwise import general_blockwise as primitive_general_blockwise
from cubed.primitive.rechunk import rechunk as primitive_rechunk
from cubed.spec import spec_from_config
from cubed.utils import (
_concatenate2,
chunk_memory,
Expand Down Expand Up @@ -109,7 +111,8 @@ def from_zarr(store, path=None, spec=None) -> "Array":
The array loaded from Zarr storage.
"""
name = gensym()
target = zarr.open(store, path=path, mode="r")
spec = spec or spec_from_config(config)
target = zarr.open(store, path=path, mode="r", storage_options=spec.storage_options)

from cubed.array_api import Array

Expand Down Expand Up @@ -299,6 +302,7 @@ def blockwise(
extra_projected_mem=extra_projected_mem,
target_store=target_store,
target_path=target_path,
storage_options=spec.storage_options,
shape=shape,
dtype=dtype,
chunks=_chunks,
Expand Down Expand Up @@ -361,6 +365,7 @@ def general_blockwise(
extra_projected_mem=extra_projected_mem,
target_store=target_store,
target_path=target_path,
storage_options=spec.storage_options,
shape=shape,
dtype=dtype,
chunks=chunks,
Expand Down Expand Up @@ -746,6 +751,7 @@ def rechunk(x, chunks, target_store=None):
reserved_mem=spec.reserved_mem,
target_store=target_store,
temp_store=temp_store,
storage_options=spec.storage_options,
)

from cubed.array_api import Array
Expand Down
10 changes: 9 additions & 1 deletion cubed/primitive/blockwise.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ def blockwise(
reserved_mem: int,
target_store: T_Store,
target_path: Optional[str] = None,
storage_options: Optional[Dict[str, Any]] = None,
shape: T_Shape,
dtype: T_DType,
chunks: T_Chunks,
Expand Down Expand Up @@ -206,6 +207,7 @@ def blockwise(
reserved_mem=reserved_mem,
target_store=target_store,
target_path=target_path,
storage_options=storage_options,
shape=shape,
dtype=dtype,
chunks=chunks,
Expand All @@ -226,6 +228,7 @@ def general_blockwise(
reserved_mem: int,
target_store: T_Store,
target_path: Optional[str] = None,
storage_options: Optional[Dict[str, Any]] = None,
shape: T_Shape,
dtype: T_DType,
chunks: T_Chunks,
Expand Down Expand Up @@ -281,7 +284,12 @@ def general_blockwise(
target_array = target_store
else:
target_array = lazy_zarr_array(
target_store, shape, dtype, chunks=chunksize, path=target_path
target_store,
shape,
dtype,
chunks=chunksize,
path=target_path,
storage_options=storage_options,
)

func_kwargs = extra_func_kwargs or {}
Expand Down
17 changes: 14 additions & 3 deletions cubed/primitive/rechunk.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import itertools
import math
from math import ceil, prod
from typing import Any, Iterable, Iterator, List, Optional, Sequence, Tuple
from typing import Any, Dict, Iterable, Iterator, List, Optional, Sequence, Tuple

import numpy as np

Expand Down Expand Up @@ -29,6 +29,7 @@ def rechunk(
reserved_mem: int,
target_store: T_Store,
temp_store: Optional[T_Store] = None,
storage_options: Optional[Dict[str, Any]] = None,
) -> List[PrimitiveOperation]:
"""Change the chunking of an array, without changing its shape or dtype.
Expand Down Expand Up @@ -64,6 +65,7 @@ def rechunk(
max_mem=rechunker_max_mem,
target_store=target_store,
temp_store=temp_store,
storage_options=storage_options,
)

intermediate = int_proxy.array
Expand Down Expand Up @@ -120,6 +122,7 @@ def _setup_array_rechunk(
max_mem: int,
target_store: T_Store,
temp_store: Optional[T_Store] = None,
storage_options: Optional[Dict[str, Any]] = None,
) -> Tuple[CubedArrayProxy, CubedArrayProxy, CubedArrayProxy]:
shape = source_array.shape
source_chunks = source_array.chunks
Expand All @@ -140,15 +143,23 @@ def _setup_array_rechunk(
int_chunks = tuple(int(x) for x in int_chunks)
write_chunks = tuple(int(x) for x in write_chunks)

target_array = lazy_zarr_array(target_store, shape, dtype, chunks=target_chunks)
target_array = lazy_zarr_array(
target_store,
shape,
dtype,
chunks=target_chunks,
storage_options=storage_options,
)

if read_chunks == write_chunks:
int_array = None
else:
# do intermediate store
if temp_store is None:
raise ValueError("A temporary store location must be provided.")
int_array = lazy_zarr_array(temp_store, shape, dtype, chunks=int_chunks)
int_array = lazy_zarr_array(
temp_store, shape, dtype, chunks=int_chunks, storage_options=storage_options
)

read_proxy = CubedArrayProxy(source_array, read_chunks)
int_proxy = CubedArrayProxy(int_array, int_chunks)
Expand Down
11 changes: 9 additions & 2 deletions cubed/storage/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ def __init__(
self.dtype = template.dtype
self.chunks = template.chunks
self.nbytes = template.nbytes

self.store = store
self.path = path
self.kwargs = kwargs
Expand Down Expand Up @@ -72,6 +71,7 @@ def open(self) -> zarr.Array:
dtype=self.dtype,
chunks=self.chunks,
path=self.path,
**self.kwargs,
)

def __repr__(self) -> str:
Expand All @@ -89,7 +89,14 @@ def lazy_zarr_array(
path: Optional[str] = None,
**kwargs,
) -> LazyZarrArray:
return LazyZarrArray(store, shape, dtype, chunks, path=path, **kwargs)
return LazyZarrArray(
store,
shape,
dtype,
chunks,
path=path,
**kwargs,
)


def open_if_lazy_zarr_array(array: T_ZarrArray) -> zarr.Array:
Expand Down

0 comments on commit 7b4f7e2

Please sign in to comment.