Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC implementation of ZEP003 #1483

Draft
wants to merge 16 commits into
base: main
Choose a base branch
from
Draft
34 changes: 21 additions & 13 deletions zarr/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2067,13 +2067,15 @@ def _process_chunk(
fields,
out_selection,
partial_read_decode=False,
coords=None,
):
"""Take binary data from storage and fill output array"""
shape = [c if isinstance(c, int) else c[coords[i]] for i, c in enumerate(self._chunks)]
if (
out_is_ndarray
and not fields
and is_contiguous_selection(out_selection)
and is_total_slice(chunk_selection, self._chunks)
and is_total_slice(chunk_selection, shape)
and not self._filters
and self._dtype != object
):
Expand All @@ -2100,7 +2102,7 @@ def _process_chunk(
if isinstance(cdata, UncompressedPartialReadBufferV3):
cdata = cdata.read_full()
chunk = ensure_ndarray_like(cdata).view(self._dtype)
chunk = chunk.reshape(self._chunks, order=self._order)
chunk = chunk.reshape(shape, order=self._order)
np.copyto(dest, chunk)
return

Expand All @@ -2109,7 +2111,7 @@ def _process_chunk(
if partial_read_decode:
cdata.prepare_chunk()
# size of chunk
tmp = np.empty_like(self._meta_array, shape=self._chunks, dtype=self.dtype)
tmp = np.empty_like(self._meta_array, shape=shape, dtype=self.dtype)
index_selection = PartialChunkIterator(chunk_selection, self.chunks)
for start, nitems, partial_out_selection in index_selection:
expected_shape = [
Expand Down Expand Up @@ -2138,7 +2140,7 @@ def _process_chunk(
return
except ArrayIndexError:
cdata = cdata.read_full()
chunk = self._decode_chunk(cdata)
chunk = self._decode_chunk(cdata, expected_shape=shape)

# select data from chunk
if fields:
Expand Down Expand Up @@ -2223,7 +2225,9 @@ def _chunk_getitems(
contexts = ConstantMap(ckeys, constant=Context(meta_array=self._meta_array))
cdatas = self.chunk_store.getitems(ckeys, contexts=contexts)

for ckey, chunk_select, out_select in zip(ckeys, lchunk_selection, lout_selection):
for ckey, coords, chunk_select, out_select in zip(
ckeys, lchunk_coords, lchunk_selection, lout_selection
):
if ckey in cdatas:
self._process_chunk(
out,
Expand All @@ -2234,6 +2238,7 @@ def _chunk_getitems(
fields,
out_select,
partial_read_decode=partial_read_decode,
coords=coords,
)
else:
# check exception type
Expand Down Expand Up @@ -2305,16 +2310,19 @@ def _chunk_setitem(self, chunk_coords, chunk_selection, value, fields=None):

def _chunk_setitem_nosync(self, chunk_coords, chunk_selection, value, fields=None):
ckey = self._chunk_key(chunk_coords)
cdata = self._process_for_setitem(ckey, chunk_selection, value, fields=fields)
cdata = self._process_for_setitem(
ckey, chunk_selection, value, coords=chunk_coords, fields=fields
)

# attempt to delete chunk if it only contains the fill value
if (not self.write_empty_chunks) and all_equal(self.fill_value, cdata):
self._chunk_delitem(ckey)
else:
self.chunk_store[ckey] = self._encode_chunk(cdata)

def _process_for_setitem(self, ckey, chunk_selection, value, fields=None):
if is_total_slice(chunk_selection, self._chunks) and not fields:
def _process_for_setitem(self, ckey, chunk_selection, value, coords=None, fields=None):
shape = [c if isinstance(c, int) else c[coords[i]] for i, c in enumerate(self._chunks)]
if is_total_slice(chunk_selection, shape) and not fields:
# totally replace chunk

# optimization: we are completely replacing the chunk, so no need
Expand All @@ -2324,7 +2332,7 @@ def _process_for_setitem(self, ckey, chunk_selection, value, fields=None):

# setup array filled with value
chunk = np.empty_like(
self._meta_array, shape=self._chunks, dtype=self._dtype, order=self._order
self._meta_array, shape=shape, dtype=self._dtype, order=self._order
)
chunk.fill(value)

Expand All @@ -2346,22 +2354,22 @@ def _process_for_setitem(self, ckey, chunk_selection, value, fields=None):
# chunk not initialized
if self._fill_value is not None:
chunk = np.empty_like(
self._meta_array, shape=self._chunks, dtype=self._dtype, order=self._order
self._meta_array, shape=shape, dtype=self._dtype, order=self._order
)
chunk.fill(self._fill_value)
elif self._dtype == object:
chunk = np.empty(self._chunks, dtype=self._dtype, order=self._order)
chunk = np.empty(self.shape, dtype=self._dtype, order=self._order)
else:
# N.B., use zeros here so any region beyond the array has consistent
# and compressible data
chunk = np.zeros_like(
self._meta_array, shape=self._chunks, dtype=self._dtype, order=self._order
self._meta_array, shape=self.shape, dtype=self._dtype, order=self._order
)

else:

# decode chunk
chunk = self._decode_chunk(cdata)
chunk = self._decode_chunk(cdata, expected_shape=shape)
if not chunk.flags.writeable:
chunk = chunk.copy(order="K")

Expand Down
93 changes: 89 additions & 4 deletions zarr/indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,79 @@ def __iter__(self):
yield ChunkDimProjection(dim_chunk_ix, dim_chunk_sel, dim_out_sel)


class VarSliceDimIndexer:
def __init__(self, dim_sel: slice, dim_len: int, chunk_lengths: list[int]):
# normalize
start = dim_sel.start
if start is None:
start = 0
elif start < 0:
start = dim_len - start

stop = dim_sel.stop
if stop is None:
stop = dim_len
elif stop < 0:
stop = dim_len - stop
step = dim_sel.step or 1
if step < 0:
raise NotImplementedError

# store attributes
self.offsets = np.cumsum([0] + chunk_lengths)
self.dim_len = dim_len
self.dim_sel = dim_sel
self.projections = []

remainder = 0
nfilled = 0
for i in range(len(chunk_lengths)):
if start > self.offsets[i + 1]:
# not yet at first chunk
continue
if stop < self.offsets[i]:
# past last valid chunk
break
slice_start = max(start - self.offsets[i], 0 + remainder)
slice_end = min(stop - self.offsets[i], self.offsets[i + 1] - self.offsets[i])
if slice_start == slice_end:
continue
remainder = ((self.offsets[i] + slice_start) - self.offsets[i + 1]) % step
nelem = (slice_end - slice_start) // step
self.projections.append(
ChunkDimProjection(
i, slice(slice_start, slice_end, step), slice(nfilled, nfilled + nelem)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this will only work if the chunks boundaries are multiples of the step size.

import numpy as np, zarr

z = zarr.array(np.arange(10), chunks=[[1, 2, 2, 5]])
z[::2]
# array([6, 8], dtype=int32)

I've opened a PR into your branch fixing this plus adding some tests: martindurant#21

)
)
nfilled += nelem

self.nitems = nfilled

def __iter__(self):
yield from self.projections


class VarIntDimIndexer:
def __init__(self, dim_sel: int, dim_len: int, chunk_lengths: list[int]):

self.offsets = np.cumsum([0] + chunk_lengths)
self.dim_len = dim_len

# normalize
dim_sel = normalize_integer_selection(dim_sel, self.dim_len)

# store attributes
self.dim_sel = dim_sel
self.nitems = 1

def __iter__(self):
for ix, off in enumerate(self.offsets):
if off > self.dim_sel:
break
ix -= 1
yield ChunkDimProjection(ix, self.dim_sel - self.offsets[ix], None)


def ceildiv(a, b):
return math.ceil(a / b)

Expand Down Expand Up @@ -339,10 +412,16 @@ def __init__(self, selection, array):
for dim_sel, dim_len, dim_chunk_len in zip(selection, array._shape, array._chunks):

if is_integer(dim_sel):
dim_indexer = IntDimIndexer(dim_sel, dim_len, dim_chunk_len)
if isinstance(dim_chunk_len, int):
dim_indexer = IntDimIndexer(dim_sel, dim_len, dim_chunk_len)
else:
dim_indexer = VarIntDimIndexer(dim_sel, dim_len, dim_chunk_len)

elif is_slice(dim_sel):
dim_indexer = SliceDimIndexer(dim_sel, dim_len, dim_chunk_len)
if isinstance(dim_chunk_len, int):
dim_indexer = SliceDimIndexer(dim_sel, dim_len, dim_chunk_len)
else:
dim_indexer = VarSliceDimIndexer(dim_sel, dim_len, dim_chunk_len)

else:
raise IndexError(
Expand Down Expand Up @@ -614,10 +693,16 @@ def __init__(self, selection, array):
for dim_sel, dim_len, dim_chunk_len in zip(selection, array._shape, array._chunks):

if is_integer(dim_sel):
dim_indexer = IntDimIndexer(dim_sel, dim_len, dim_chunk_len)
if isinstance(dim_chunk_len, int):
dim_indexer = IntDimIndexer(dim_sel, dim_len, dim_chunk_len)
else:
dim_indexer = VarIntDimIndexer(dim_sel, dim_len, dim_chunk_len)

elif isinstance(dim_sel, slice):
dim_indexer = SliceDimIndexer(dim_sel, dim_len, dim_chunk_len)
if isinstance(dim_chunk_len, int):
dim_indexer = SliceDimIndexer(dim_sel, dim_len, dim_chunk_len)
else:
dim_indexer = VarSliceDimIndexer(dim_sel, dim_len, dim_chunk_len)

elif is_integer_array(dim_sel):
dim_indexer = IntArrayDimIndexer(dim_sel, dim_len, dim_chunk_len)
Expand Down