Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC implementation of ZEP003 #1483

Draft
wants to merge 16 commits into
base: main
Choose a base branch
from
Draft
34 changes: 21 additions & 13 deletions zarr/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2067,13 +2067,15 @@ def _process_chunk(
fields,
out_selection,
partial_read_decode=False,
coords=None,
):
"""Take binary data from storage and fill output array"""
shape = tuple(c if isinstance(c, int) else c[coords[i]] for i, c in enumerate(self._chunks))
if (
out_is_ndarray
and not fields
and is_contiguous_selection(out_selection)
and is_total_slice(chunk_selection, self._chunks)
and is_total_slice(chunk_selection, shape)
and not self._filters
and self._dtype != object
):
Expand All @@ -2100,7 +2102,7 @@ def _process_chunk(
if isinstance(cdata, UncompressedPartialReadBufferV3):
cdata = cdata.read_full()
chunk = ensure_ndarray_like(cdata).view(self._dtype)
chunk = chunk.reshape(self._chunks, order=self._order)
chunk = chunk.reshape(shape, order=self._order)
np.copyto(dest, chunk)
return

Expand All @@ -2109,7 +2111,7 @@ def _process_chunk(
if partial_read_decode:
cdata.prepare_chunk()
# size of chunk
tmp = np.empty_like(self._meta_array, shape=self._chunks, dtype=self.dtype)
tmp = np.empty_like(self._meta_array, shape=shape, dtype=self.dtype)
index_selection = PartialChunkIterator(chunk_selection, self.chunks)
for start, nitems, partial_out_selection in index_selection:
expected_shape = [
Expand Down Expand Up @@ -2138,7 +2140,7 @@ def _process_chunk(
return
except ArrayIndexError:
cdata = cdata.read_full()
chunk = self._decode_chunk(cdata)
chunk = self._decode_chunk(cdata, expected_shape=shape)

# select data from chunk
if fields:
Expand Down Expand Up @@ -2223,7 +2225,9 @@ def _chunk_getitems(
contexts = ConstantMap(ckeys, constant=Context(meta_array=self._meta_array))
cdatas = self.chunk_store.getitems(ckeys, contexts=contexts)

for ckey, chunk_select, out_select in zip(ckeys, lchunk_selection, lout_selection):
for ckey, coords, chunk_select, out_select in zip(
ckeys, lchunk_coords, lchunk_selection, lout_selection
):
if ckey in cdatas:
self._process_chunk(
out,
Expand All @@ -2234,6 +2238,7 @@ def _chunk_getitems(
fields,
out_select,
partial_read_decode=partial_read_decode,
coords=coords,
)
else:
# check exception type
Expand Down Expand Up @@ -2305,16 +2310,19 @@ def _chunk_setitem(self, chunk_coords, chunk_selection, value, fields=None):

def _chunk_setitem_nosync(self, chunk_coords, chunk_selection, value, fields=None):
ckey = self._chunk_key(chunk_coords)
cdata = self._process_for_setitem(ckey, chunk_selection, value, fields=fields)
cdata = self._process_for_setitem(
ckey, chunk_selection, value, coords=chunk_coords, fields=fields
)

# attempt to delete chunk if it only contains the fill value
if (not self.write_empty_chunks) and all_equal(self.fill_value, cdata):
self._chunk_delitem(ckey)
else:
self.chunk_store[ckey] = self._encode_chunk(cdata)

def _process_for_setitem(self, ckey, chunk_selection, value, fields=None):
if is_total_slice(chunk_selection, self._chunks) and not fields:
def _process_for_setitem(self, ckey, chunk_selection, value, coords=None, fields=None):
shape = tuple(c if isinstance(c, int) else c[coords[i]] for i, c in enumerate(self._chunks))
if is_total_slice(chunk_selection, shape) and not fields:
# totally replace chunk

# optimization: we are completely replacing the chunk, so no need
Expand All @@ -2324,7 +2332,7 @@ def _process_for_setitem(self, ckey, chunk_selection, value, fields=None):

# setup array filled with value
chunk = np.empty_like(
self._meta_array, shape=self._chunks, dtype=self._dtype, order=self._order
self._meta_array, shape=shape, dtype=self._dtype, order=self._order
)
chunk.fill(value)

Expand All @@ -2346,22 +2354,22 @@ def _process_for_setitem(self, ckey, chunk_selection, value, fields=None):
# chunk not initialized
if self._fill_value is not None:
chunk = np.empty_like(
self._meta_array, shape=self._chunks, dtype=self._dtype, order=self._order
self._meta_array, shape=shape, dtype=self._dtype, order=self._order
)
chunk.fill(self._fill_value)
elif self._dtype == object:
chunk = np.empty(self._chunks, dtype=self._dtype, order=self._order)
chunk = np.empty(shape, dtype=self._dtype, order=self._order)
else:
# N.B., use zeros here so any region beyond the array has consistent
# and compressible data
chunk = np.zeros_like(
self._meta_array, shape=self._chunks, dtype=self._dtype, order=self._order
self._meta_array, shape=shape, dtype=self._dtype, order=self._order
)

else:

# decode chunk
chunk = self._decode_chunk(cdata)
chunk = self._decode_chunk(cdata, expected_shape=shape)
if not chunk.flags.writeable:
chunk = chunk.copy(order="K")

Expand Down
Loading