From 1a9888637c847e88126b100c5cfa99f5c654bb03 Mon Sep 17 00:00:00 2001 From: Norman Rzepka Date: Wed, 6 Dec 2023 16:15:35 +0100 Subject: [PATCH] Extensible codecs for V3 (#1588) * Pull Zarrita into Zarr-Python @ 78274781ad64aef95772eb4b083f7ea9b7d03d06 No code changes to Zarrita were made. * apply zarr lint rules * zarrita -> v3 * v3/abc [wip] * use abcs plus implementation notes * working on making codecs extensible * adds index_location * adds support for codec entry points * adds tests from zarrita * fixes types * Apply suggestions from code review Co-authored-by: Joe Hamman * remove test codec from pyproject.toml --------- Co-authored-by: Joseph Hamman Co-authored-by: Joe Hamman --- .pre-commit-config.yaml | 3 +- zarr/tests/test_codecs_v3.py | 989 +++++++++++++++++++++++++++++++ zarr/v3/abc/codec.py | 42 +- zarr/v3/array.py | 12 +- zarr/v3/codecs.py | 514 ---------------- zarr/v3/codecs/__init__.py | 232 ++++++++ zarr/v3/codecs/blosc.py | 99 ++++ zarr/v3/codecs/bytes.py | 105 ++++ zarr/v3/codecs/crc32c_.py | 64 ++ zarr/v3/codecs/gzip.py | 70 +++ zarr/v3/codecs/registry.py | 56 ++ zarr/v3/{ => codecs}/sharding.py | 101 +++- zarr/v3/codecs/transpose.py | 114 ++++ zarr/v3/codecs/zstd.py | 80 +++ zarr/v3/common.py | 28 +- zarr/v3/metadata.py | 106 +--- zarr/v3/store.py | 49 +- 17 files changed, 2001 insertions(+), 663 deletions(-) create mode 100644 zarr/tests/test_codecs_v3.py delete mode 100644 zarr/v3/codecs.py create mode 100644 zarr/v3/codecs/__init__.py create mode 100644 zarr/v3/codecs/blosc.py create mode 100644 zarr/v3/codecs/bytes.py create mode 100644 zarr/v3/codecs/crc32c_.py create mode 100644 zarr/v3/codecs/gzip.py create mode 100644 zarr/v3/codecs/registry.py rename zarr/v3/{ => codecs}/sharding.py (85%) create mode 100644 zarr/v3/codecs/transpose.py create mode 100644 zarr/v3/codecs/zstd.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index f22dc39832..a8ee599137 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -27,7 +27,7 @@ repos: hooks: - id: check-yaml - repo: https://github.com/pre-commit/mirrors-mypy - rev: v1.3.0 + rev: v1.7.1 hooks: - id: mypy files: zarr @@ -35,3 +35,4 @@ repos: additional_dependencies: - types-redis - types-setuptools + - attrs diff --git a/zarr/tests/test_codecs_v3.py b/zarr/tests/test_codecs_v3.py new file mode 100644 index 0000000000..93acdb2ba1 --- /dev/null +++ b/zarr/tests/test_codecs_v3.py @@ -0,0 +1,989 @@ +from __future__ import annotations + +import json +from typing import Iterator, List, Literal, Optional +from attr import frozen + +import numpy as np +import pytest +import zarr +from zarr.v3 import codecs +from zarr.v3.array import Array, AsyncArray +from zarr.v3.common import Selection +from zarr.v3.indexing import morton_order_iter +from zarr.v3.metadata import CodecMetadata, ShardingCodecIndexLocation, runtime_configuration + +from zarr.v3.store import MemoryStore, Store + + +@frozen +class _AsyncArrayProxy: + array: AsyncArray + + def __getitem__(self, selection: Selection) -> _AsyncArraySelectionProxy: + return _AsyncArraySelectionProxy(self.array, selection) + + +@frozen +class _AsyncArraySelectionProxy: + array: AsyncArray + selection: Selection + + async def get(self) -> np.ndarray: + return await self.array.getitem(self.selection) + + async def set(self, value: np.ndarray): + return await self.array.setitem(self.selection, value) + + +@pytest.fixture +def store() -> Iterator[Store]: + yield MemoryStore() + + +@pytest.fixture +def sample_data() -> np.ndarray: + return np.arange(0, 128 * 128 * 128, dtype="uint16").reshape((128, 128, 128), order="F") + + +@pytest.mark.parametrize( + "index_location", [ShardingCodecIndexLocation.start, ShardingCodecIndexLocation.end] +) +def test_sharding( + store: Store, sample_data: np.ndarray, index_location: ShardingCodecIndexLocation +): + a = Array.create( + store / "sample", + shape=sample_data.shape, + chunk_shape=(64, 64, 64), + dtype=sample_data.dtype, + fill_value=0, + codecs=[ + codecs.sharding_codec( + (32, 32, 32), + [ + codecs.transpose_codec("F", sample_data.ndim), + codecs.bytes_codec(), + codecs.blosc_codec(typesize=sample_data.dtype.itemsize, cname="lz4"), + ], + index_location=index_location, + ) + ], + ) + + a[:, :, :] = sample_data + + read_data = a[0 : sample_data.shape[0], 0 : sample_data.shape[1], 0 : sample_data.shape[2]] + assert sample_data.shape == read_data.shape + assert np.array_equal(sample_data, read_data) + + +@pytest.mark.parametrize( + "index_location", [ShardingCodecIndexLocation.start, ShardingCodecIndexLocation.end] +) +def test_sharding_partial( + store: Store, sample_data: np.ndarray, index_location: ShardingCodecIndexLocation +): + a = Array.create( + store / "sample", + shape=tuple(a + 10 for a in sample_data.shape), + chunk_shape=(64, 64, 64), + dtype=sample_data.dtype, + fill_value=0, + codecs=[ + codecs.sharding_codec( + (32, 32, 32), + [ + codecs.transpose_codec("F", sample_data.ndim), + codecs.bytes_codec(), + codecs.blosc_codec(typesize=sample_data.dtype.itemsize, cname="lz4"), + ], + index_location=index_location, + ) + ], + ) + + a[10:, 10:, 10:] = sample_data + + read_data = a[0:10, 0:10, 0:10] + assert np.all(read_data == 0) + + read_data = a[10:, 10:, 10:] + assert sample_data.shape == read_data.shape + assert np.array_equal(sample_data, read_data) + + +@pytest.mark.parametrize( + "index_location", [ShardingCodecIndexLocation.start, ShardingCodecIndexLocation.end] +) +def test_sharding_partial_read( + store: Store, sample_data: np.ndarray, index_location: ShardingCodecIndexLocation +): + a = Array.create( + store / "sample", + shape=tuple(a + 10 for a in sample_data.shape), + chunk_shape=(64, 64, 64), + dtype=sample_data.dtype, + fill_value=1, + codecs=[ + codecs.sharding_codec( + (32, 32, 32), + [ + codecs.transpose_codec("F", sample_data.ndim), + codecs.bytes_codec(), + codecs.blosc_codec(typesize=sample_data.dtype.itemsize, cname="lz4"), + ], + index_location=index_location, + ) + ], + ) + + read_data = a[0:10, 0:10, 0:10] + assert np.all(read_data == 1) + + +@pytest.mark.parametrize( + "index_location", [ShardingCodecIndexLocation.start, ShardingCodecIndexLocation.end] +) +def test_sharding_partial_overwrite( + store: Store, sample_data: np.ndarray, index_location: ShardingCodecIndexLocation +): + data = sample_data[:10, :10, :10] + + a = Array.create( + store / "sample", + shape=tuple(a + 10 for a in data.shape), + chunk_shape=(64, 64, 64), + dtype=data.dtype, + fill_value=1, + codecs=[ + codecs.sharding_codec( + (32, 32, 32), + [ + codecs.transpose_codec("F", data.ndim), + codecs.bytes_codec(), + codecs.blosc_codec(typesize=data.dtype.itemsize, cname="lz4"), + ], + index_location=index_location, + ) + ], + ) + + a[:10, :10, :10] = data + + read_data = a[0:10, 0:10, 0:10] + assert np.array_equal(data, read_data) + + data = data + 10 + a[:10, :10, :10] = data + read_data = a[0:10, 0:10, 0:10] + assert np.array_equal(data, read_data) + + +@pytest.mark.parametrize( + "outer_index_location", + [ShardingCodecIndexLocation.start, ShardingCodecIndexLocation.end], +) +@pytest.mark.parametrize( + "inner_index_location", + [ShardingCodecIndexLocation.start, ShardingCodecIndexLocation.end], +) +def test_nested_sharding( + store: Store, + sample_data: np.ndarray, + outer_index_location: ShardingCodecIndexLocation, + inner_index_location: ShardingCodecIndexLocation, +): + a = Array.create( + store / "l4_sample" / "color" / "1", + shape=sample_data.shape, + chunk_shape=(64, 64, 64), + dtype=sample_data.dtype, + fill_value=0, + codecs=[ + codecs.sharding_codec( + (32, 32, 32), + [codecs.sharding_codec((16, 16, 16), index_location=inner_index_location)], + index_location=outer_index_location, + ) + ], + ) + + a[:, :, :] = sample_data + + read_data = a[0 : sample_data.shape[0], 0 : sample_data.shape[1], 0 : sample_data.shape[2]] + assert sample_data.shape == read_data.shape + assert np.array_equal(sample_data, read_data) + + +@pytest.mark.parametrize("input_order", ["F", "C"]) +@pytest.mark.parametrize("store_order", ["F", "C"]) +@pytest.mark.parametrize("runtime_write_order", ["F", "C"]) +@pytest.mark.parametrize("runtime_read_order", ["F", "C"]) +@pytest.mark.parametrize("with_sharding", [True, False]) +@pytest.mark.asyncio +async def test_order( + store: Store, + input_order: Literal["F", "C"], + store_order: Literal["F", "C"], + runtime_write_order: Literal["F", "C"], + runtime_read_order: Literal["F", "C"], + with_sharding: bool, +): + data = np.arange(0, 256, dtype="uint16").reshape((32, 8), order=input_order) + + codecs_: List[CodecMetadata] = ( + [ + codecs.sharding_codec( + (16, 8), + codecs=[codecs.transpose_codec(store_order, data.ndim), codecs.bytes_codec()], + ) + ] + if with_sharding + else [codecs.transpose_codec(store_order, data.ndim), codecs.bytes_codec()] + ) + + a = await AsyncArray.create( + store / "order", + shape=data.shape, + chunk_shape=(32, 8), + dtype=data.dtype, + fill_value=0, + chunk_key_encoding=("v2", "."), + codecs=codecs_, + runtime_configuration=runtime_configuration(runtime_write_order), + ) + + await _AsyncArrayProxy(a)[:, :].set(data) + read_data = await _AsyncArrayProxy(a)[:, :].get() + assert np.array_equal(data, read_data) + + a = await AsyncArray.open( + store / "order", + runtime_configuration=runtime_configuration(order=runtime_read_order), + ) + read_data = await _AsyncArrayProxy(a)[:, :].get() + assert np.array_equal(data, read_data) + + if runtime_read_order == "F": + assert read_data.flags["F_CONTIGUOUS"] + assert not read_data.flags["C_CONTIGUOUS"] + else: + assert not read_data.flags["F_CONTIGUOUS"] + assert read_data.flags["C_CONTIGUOUS"] + + if not with_sharding: + # Compare with zarr-python + z = zarr.create( + shape=data.shape, + chunks=(32, 8), + dtype="u2", "u2", " int: def resolve_metadata(self) -> CoreArrayMetadata: return self.array_metadata + @classmethod + @abstractmethod + def from_metadata( + cls, codec_metadata: "CodecMetadata", array_metadata: CoreArrayMetadata + ) -> Codec: + pass + + @classmethod + @abstractmethod + def get_metadata_class(cls) -> "Type[CodecMetadata]": + pass + class ArrayArrayCodec(Codec): @abstractmethod @@ -68,6 +79,27 @@ async def encode( pass +class ArrayBytesCodecPartialDecodeMixin: + @abstractmethod + async def decode_partial( + self, + store_path: StorePath, + selection: SliceSelection, + ) -> Optional[np.ndarray]: + pass + + +class ArrayBytesCodecPartialEncodeMixin: + @abstractmethod + async def encode_partial( + self, + store_path: StorePath, + chunk_array: np.ndarray, + selection: SliceSelection, + ) -> None: + pass + + class BytesBytesCodec(Codec): @abstractmethod async def decode( diff --git a/zarr/v3/array.py b/zarr/v3/array.py index 3c0d7eba5c..8c54cfd91c 100644 --- a/zarr/v3/array.py +++ b/zarr/v3/array.py @@ -18,6 +18,7 @@ from attr import evolve, frozen from zarr.v3.abc.array import SynchronousArray, AsynchronousArray +from zarr.v3.abc.codec import ArrayBytesCodecPartialDecodeMixin # from zarr.v3.array_v2 import ArrayV2 from zarr.v3.codecs import CodecMetadata, CodecPipeline, bytes_codec @@ -41,7 +42,7 @@ V2ChunkKeyEncodingMetadata, dtype_to_data_type, ) -from zarr.v3.sharding import ShardingCodec +from zarr.v3.codecs.sharding import ShardingCodec from zarr.v3.store import StoreLike, StorePath, make_store_path from zarr.v3.sync import sync @@ -253,7 +254,7 @@ async def _read_chunk( store_path = self.store_path / chunk_key if len(self.codec_pipeline.codecs) == 1 and isinstance( - self.codec_pipeline.codecs[0], ShardingCodec + self.codec_pipeline.codecs[0], ArrayBytesCodecPartialDecodeMixin ): chunk_array = await self.codec_pipeline.codecs[0].decode_partial( store_path, chunk_selection @@ -373,7 +374,7 @@ async def _write_chunk_to_store(self, store_path: StorePath, chunk_array: np.nda else: await store_path.set_async(chunk_bytes) - async def resize(self, new_shape: ChunkCoords) -> Array: + async def resize(self, new_shape: ChunkCoords) -> AsyncArray: assert len(new_shape) == len(self.metadata.shape) new_metadata = evolve(self.metadata, shape=new_shape) @@ -472,7 +473,6 @@ def open( store: StoreLike, runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), ) -> Array: - async_array = sync( AsyncArray.open(store, runtime_configuration=runtime_configuration), runtime_configuration.asyncio_loop, @@ -512,6 +512,10 @@ def dtype(self) -> np.dtype: def attrs(self) -> dict: return self._async_array.attrs + @property + def metadata(self) -> ArrayMetadata: + return self._async_array.metadata + @property def store_path(self) -> str: return self._async_array.store_path diff --git a/zarr/v3/codecs.py b/zarr/v3/codecs.py deleted file mode 100644 index ff913c42b2..0000000000 --- a/zarr/v3/codecs.py +++ /dev/null @@ -1,514 +0,0 @@ -from __future__ import annotations - -from functools import reduce -from typing import TYPE_CHECKING, Iterable, List, Literal, Optional, Tuple, Union -from warnings import warn - -import numcodecs -import numpy as np -from attr import asdict, evolve, frozen -from crc32c import crc32c -from numcodecs.blosc import Blosc -from numcodecs.gzip import GZip -from zstandard import ZstdCompressor, ZstdDecompressor - -from zarr.v3.abc.codec import Codec, ArrayArrayCodec, ArrayBytesCodec, BytesBytesCodec -from zarr.v3.common import BytesLike, to_thread -from zarr.v3.metadata import ( - BloscCodecConfigurationMetadata, - BloscCodecMetadata, - BytesCodecConfigurationMetadata, - BytesCodecMetadata, - CodecMetadata, - Crc32cCodecMetadata, - GzipCodecConfigurationMetadata, - GzipCodecMetadata, - ShardingCodecConfigurationMetadata, - ShardingCodecMetadata, - TransposeCodecConfigurationMetadata, - TransposeCodecMetadata, - ZstdCodecConfigurationMetadata, - ZstdCodecMetadata, -) - -if TYPE_CHECKING: - from zarr.v3.metadata import CoreArrayMetadata - -# See https://zarr.readthedocs.io/en/stable/tutorial.html#configuring-blosc -numcodecs.blosc.use_threads = False - - -@frozen -class CodecPipeline: - codecs: List[Codec] - - @classmethod - def from_metadata( - cls, - codecs_metadata: Iterable[CodecMetadata], - array_metadata: CoreArrayMetadata, - ) -> CodecPipeline: - out: List[Codec] = [] - for codec_metadata in codecs_metadata or []: - if codec_metadata.name == "endian": - codec_metadata = evolve(codec_metadata, name="bytes") # type: ignore - - codec: Codec - if codec_metadata.name == "blosc": - codec = BloscCodec.from_metadata(codec_metadata, array_metadata) - elif codec_metadata.name == "gzip": - codec = GzipCodec.from_metadata(codec_metadata, array_metadata) - elif codec_metadata.name == "zstd": - codec = ZstdCodec.from_metadata(codec_metadata, array_metadata) - elif codec_metadata.name == "transpose": - codec = TransposeCodec.from_metadata(codec_metadata, array_metadata) - elif codec_metadata.name == "bytes": - codec = BytesCodec.from_metadata(codec_metadata, array_metadata) - elif codec_metadata.name == "crc32c": - codec = Crc32cCodec.from_metadata(codec_metadata, array_metadata) - elif codec_metadata.name == "sharding_indexed": - from zarr.v3.sharding import ShardingCodec - - codec = ShardingCodec.from_metadata(codec_metadata, array_metadata) - else: - raise RuntimeError(f"Unsupported codec: {codec_metadata}") - - out.append(codec) - array_metadata = codec.resolve_metadata() - CodecPipeline._validate_codecs(out, array_metadata) - return cls(out) - - @staticmethod - def _validate_codecs(codecs: List[Codec], array_metadata: CoreArrayMetadata) -> None: - from zarr.v3.sharding import ShardingCodec - - assert any( - isinstance(codec, ArrayBytesCodec) for codec in codecs - ), "Exactly one array-to-bytes codec is required." - - prev_codec: Optional[Codec] = None - for codec in codecs: - if prev_codec is not None: - assert not isinstance(codec, ArrayBytesCodec) or not isinstance( - prev_codec, ArrayBytesCodec - ), ( - f"ArrayBytesCodec '{type(codec)}' cannot follow after " - + f"ArrayBytesCodec '{type(prev_codec)}' because exactly " - + "1 ArrayBytesCodec is allowed." - ) - assert not isinstance(codec, ArrayBytesCodec) or not isinstance( - prev_codec, BytesBytesCodec - ), ( - f"ArrayBytesCodec '{type(codec)}' cannot follow after " - + f"BytesBytesCodec '{type(prev_codec)}'." - ) - assert not isinstance(codec, ArrayArrayCodec) or not isinstance( - prev_codec, ArrayBytesCodec - ), ( - f"ArrayArrayCodec '{type(codec)}' cannot follow after " - + f"ArrayBytesCodec '{type(prev_codec)}'." - ) - assert not isinstance(codec, ArrayArrayCodec) or not isinstance( - prev_codec, BytesBytesCodec - ), ( - f"ArrayArrayCodec '{type(codec)}' cannot follow after " - + f"BytesBytesCodec '{type(prev_codec)}'." - ) - - if isinstance(codec, ShardingCodec): - assert len(codec.configuration.chunk_shape) == len(array_metadata.shape), ( - "The shard's `chunk_shape` and array's `shape` need to have the " - + "same number of dimensions." - ) - assert all( - s % c == 0 - for s, c in zip( - array_metadata.chunk_shape, - codec.configuration.chunk_shape, - ) - ), ( - "The array's `chunk_shape` needs to be divisible by the " - + "shard's inner `chunk_shape`." - ) - prev_codec = codec - - if any(isinstance(codec, ShardingCodec) for codec in codecs) and len(codecs) > 1: - warn( - "Combining a `sharding_indexed` codec disables partial reads and " - + "writes, which may lead to inefficient performance." - ) - - def _array_array_codecs(self) -> List[ArrayArrayCodec]: - return [codec for codec in self.codecs if isinstance(codec, ArrayArrayCodec)] - - def _array_bytes_codec(self) -> ArrayBytesCodec: - return next(codec for codec in self.codecs if isinstance(codec, ArrayBytesCodec)) - - def _bytes_bytes_codecs(self) -> List[BytesBytesCodec]: - return [codec for codec in self.codecs if isinstance(codec, BytesBytesCodec)] - - async def decode(self, chunk_bytes: BytesLike) -> np.ndarray: - for bb_codec in self._bytes_bytes_codecs()[::-1]: - chunk_bytes = await bb_codec.decode(chunk_bytes) - - chunk_array = await self._array_bytes_codec().decode(chunk_bytes) - - for aa_codec in self._array_array_codecs()[::-1]: - chunk_array = await aa_codec.decode(chunk_array) - - return chunk_array - - async def encode(self, chunk_array: np.ndarray) -> Optional[BytesLike]: - for aa_codec in self._array_array_codecs(): - chunk_array_maybe = await aa_codec.encode(chunk_array) - if chunk_array_maybe is None: - return None - chunk_array = chunk_array_maybe - - chunk_bytes_maybe = await self._array_bytes_codec().encode(chunk_array) - if chunk_bytes_maybe is None: - return None - chunk_bytes = chunk_bytes_maybe - - for bb_codec in self._bytes_bytes_codecs(): - chunk_bytes_maybe = await bb_codec.encode(chunk_bytes) - if chunk_bytes_maybe is None: - return None - chunk_bytes = chunk_bytes_maybe - - return chunk_bytes - - def compute_encoded_size(self, byte_length: int) -> int: - return reduce(lambda acc, codec: codec.compute_encoded_size(acc), self.codecs, byte_length) - - -@frozen -class BloscCodec(BytesBytesCodec): - array_metadata: CoreArrayMetadata - configuration: BloscCodecConfigurationMetadata - blosc_codec: Blosc - is_fixed_size = False - - @classmethod - def from_metadata( - cls, codec_metadata: BloscCodecMetadata, array_metadata: CoreArrayMetadata - ) -> BloscCodec: - configuration = codec_metadata.configuration - if configuration.typesize == 0: - configuration = evolve(configuration, typesize=array_metadata.data_type.byte_count) - config_dict = asdict(codec_metadata.configuration) - config_dict.pop("typesize", None) - map_shuffle_str_to_int = {"noshuffle": 0, "shuffle": 1, "bitshuffle": 2} - config_dict["shuffle"] = map_shuffle_str_to_int[config_dict["shuffle"]] - return cls( - array_metadata=array_metadata, - configuration=configuration, - blosc_codec=Blosc.from_config(config_dict), - ) - - async def decode( - self, - chunk_bytes: bytes, - ) -> BytesLike: - return await to_thread(self.blosc_codec.decode, chunk_bytes) - - async def encode( - self, - chunk_bytes: bytes, - ) -> Optional[BytesLike]: - chunk_array = np.frombuffer(chunk_bytes, dtype=self.array_metadata.dtype) - return await to_thread(self.blosc_codec.encode, chunk_array) - - def compute_encoded_size(self, _input_byte_length: int) -> int: - raise NotImplementedError - - -@frozen -class BytesCodec(ArrayBytesCodec): - array_metadata: CoreArrayMetadata - configuration: BytesCodecConfigurationMetadata - is_fixed_size = True - - @classmethod - def from_metadata( - cls, codec_metadata: BytesCodecMetadata, array_metadata: CoreArrayMetadata - ) -> BytesCodec: - assert ( - array_metadata.dtype.itemsize == 1 or codec_metadata.configuration.endian is not None - ), "The `endian` configuration needs to be specified for multi-byte data types." - return cls( - array_metadata=array_metadata, - configuration=codec_metadata.configuration, - ) - - def _get_byteorder(self, array: np.ndarray) -> Literal["big", "little"]: - if array.dtype.byteorder == "<": - return "little" - elif array.dtype.byteorder == ">": - return "big" - else: - import sys - - return sys.byteorder - - async def decode( - self, - chunk_bytes: BytesLike, - ) -> np.ndarray: - if self.array_metadata.dtype.itemsize > 0: - if self.configuration.endian == "little": - prefix = "<" - else: - prefix = ">" - dtype = np.dtype(f"{prefix}{self.array_metadata.data_type.to_numpy_shortname()}") - else: - dtype = np.dtype(f"|{self.array_metadata.data_type.to_numpy_shortname()}") - chunk_array = np.frombuffer(chunk_bytes, dtype) - - # ensure correct chunk shape - if chunk_array.shape != self.array_metadata.chunk_shape: - chunk_array = chunk_array.reshape( - self.array_metadata.chunk_shape, - ) - return chunk_array - - async def encode( - self, - chunk_array: np.ndarray, - ) -> Optional[BytesLike]: - if chunk_array.dtype.itemsize > 1: - byteorder = self._get_byteorder(chunk_array) - if self.configuration.endian != byteorder: - new_dtype = chunk_array.dtype.newbyteorder(self.configuration.endian) - chunk_array = chunk_array.astype(new_dtype) - return chunk_array.tobytes() - - def compute_encoded_size(self, input_byte_length: int) -> int: - return input_byte_length - - -@frozen -class TransposeCodec(ArrayArrayCodec): - array_metadata: CoreArrayMetadata - order: Tuple[int, ...] - is_fixed_size = True - - @classmethod - def from_metadata( - cls, codec_metadata: TransposeCodecMetadata, array_metadata: CoreArrayMetadata - ) -> TransposeCodec: - configuration = codec_metadata.configuration - if configuration.order == "F": - order = tuple(array_metadata.ndim - x - 1 for x in range(array_metadata.ndim)) - - elif configuration.order == "C": - order = tuple(range(array_metadata.ndim)) - - else: - assert len(configuration.order) == array_metadata.ndim, ( - "The `order` tuple needs have as many entries as " - + f"there are dimensions in the array. Got: {configuration.order}" - ) - assert len(configuration.order) == len(set(configuration.order)), ( - "There must not be duplicates in the `order` tuple. " - + f"Got: {configuration.order}" - ) - assert all(0 <= x < array_metadata.ndim for x in configuration.order), ( - "All entries in the `order` tuple must be between 0 and " - + f"the number of dimensions in the array. Got: {configuration.order}" - ) - order = tuple(configuration.order) - - return cls( - array_metadata=array_metadata, - order=order, - ) - - def resolve_metadata(self) -> CoreArrayMetadata: - from zarr.v3.metadata import CoreArrayMetadata - - return CoreArrayMetadata( - shape=tuple( - self.array_metadata.shape[self.order[i]] for i in range(self.array_metadata.ndim) - ), - chunk_shape=tuple( - self.array_metadata.chunk_shape[self.order[i]] - for i in range(self.array_metadata.ndim) - ), - data_type=self.array_metadata.data_type, - fill_value=self.array_metadata.fill_value, - runtime_configuration=self.array_metadata.runtime_configuration, - ) - - async def decode( - self, - chunk_array: np.ndarray, - ) -> np.ndarray: - inverse_order = [0 for _ in range(self.array_metadata.ndim)] - for x, i in enumerate(self.order): - inverse_order[x] = i - chunk_array = chunk_array.transpose(inverse_order) - return chunk_array - - async def encode( - self, - chunk_array: np.ndarray, - ) -> Optional[np.ndarray]: - chunk_array = chunk_array.transpose(self.order) - return chunk_array - - def compute_encoded_size(self, input_byte_length: int) -> int: - return input_byte_length - - -@frozen -class GzipCodec(BytesBytesCodec): - array_metadata: CoreArrayMetadata - configuration: GzipCodecConfigurationMetadata - is_fixed_size = True - - @classmethod - def from_metadata( - cls, codec_metadata: GzipCodecMetadata, array_metadata: CoreArrayMetadata - ) -> GzipCodec: - return cls( - array_metadata=array_metadata, - configuration=codec_metadata.configuration, - ) - - async def decode( - self, - chunk_bytes: bytes, - ) -> BytesLike: - return await to_thread(GZip(self.configuration.level).decode, chunk_bytes) - - async def encode( - self, - chunk_bytes: bytes, - ) -> Optional[BytesLike]: - return await to_thread(GZip(self.configuration.level).encode, chunk_bytes) - - def compute_encoded_size(self, _input_byte_length: int) -> int: - raise NotImplementedError - - -@frozen -class ZstdCodec(BytesBytesCodec): - array_metadata: CoreArrayMetadata - configuration: ZstdCodecConfigurationMetadata - is_fixed_size = True - - @classmethod - def from_metadata( - cls, codec_metadata: ZstdCodecMetadata, array_metadata: CoreArrayMetadata - ) -> ZstdCodec: - return cls( - array_metadata=array_metadata, - configuration=codec_metadata.configuration, - ) - - def _compress(self, data: bytes) -> bytes: - ctx = ZstdCompressor( - level=self.configuration.level, write_checksum=self.configuration.checksum - ) - return ctx.compress(data) - - def _decompress(self, data: bytes) -> bytes: - ctx = ZstdDecompressor() - return ctx.decompress(data) - - async def decode( - self, - chunk_bytes: bytes, - ) -> BytesLike: - return await to_thread(self._decompress, chunk_bytes) - - async def encode( - self, - chunk_bytes: bytes, - ) -> Optional[BytesLike]: - return await to_thread(self._compress, chunk_bytes) - - def compute_encoded_size(self, _input_byte_length: int) -> int: - raise NotImplementedError - - -@frozen -class Crc32cCodec(BytesBytesCodec): - array_metadata: CoreArrayMetadata - is_fixed_size = True - - @classmethod - def from_metadata( - cls, codec_metadata: Crc32cCodecMetadata, array_metadata: CoreArrayMetadata - ) -> Crc32cCodec: - return cls(array_metadata=array_metadata) - - async def decode( - self, - chunk_bytes: bytes, - ) -> BytesLike: - crc32_bytes = chunk_bytes[-4:] - inner_bytes = chunk_bytes[:-4] - - assert np.uint32(crc32c(inner_bytes)).tobytes() == bytes(crc32_bytes) - return inner_bytes - - async def encode( - self, - chunk_bytes: bytes, - ) -> Optional[BytesLike]: - return chunk_bytes + np.uint32(crc32c(chunk_bytes)).tobytes() - - def compute_encoded_size(self, input_byte_length: int) -> int: - return input_byte_length + 4 - - -def blosc_codec( - typesize: int, - cname: Literal["lz4", "lz4hc", "blosclz", "zstd", "snappy", "zlib"] = "zstd", - clevel: int = 5, - shuffle: Literal["noshuffle", "shuffle", "bitshuffle"] = "noshuffle", - blocksize: int = 0, -) -> BloscCodecMetadata: - return BloscCodecMetadata( - configuration=BloscCodecConfigurationMetadata( - cname=cname, - clevel=clevel, - shuffle=shuffle, - blocksize=blocksize, - typesize=typesize, - ) - ) - - -def bytes_codec(endian: Optional[Literal["big", "little"]] = "little") -> BytesCodecMetadata: - return BytesCodecMetadata(configuration=BytesCodecConfigurationMetadata(endian)) - - -def transpose_codec(order: Union[Tuple[int, ...], Literal["C", "F"]]) -> TransposeCodecMetadata: - return TransposeCodecMetadata(configuration=TransposeCodecConfigurationMetadata(order)) - - -def gzip_codec(level: int = 5) -> GzipCodecMetadata: - return GzipCodecMetadata(configuration=GzipCodecConfigurationMetadata(level)) - - -def zstd_codec(level: int = 0, checksum: bool = False) -> ZstdCodecMetadata: - return ZstdCodecMetadata(configuration=ZstdCodecConfigurationMetadata(level, checksum)) - - -def crc32c_codec() -> Crc32cCodecMetadata: - return Crc32cCodecMetadata() - - -def sharding_codec( - chunk_shape: Tuple[int, ...], - codecs: Optional[List[CodecMetadata]] = None, - index_codecs: Optional[List[CodecMetadata]] = None, -) -> ShardingCodecMetadata: - codecs = codecs or [bytes_codec()] - index_codecs = index_codecs or [bytes_codec(), crc32c_codec()] - return ShardingCodecMetadata( - configuration=ShardingCodecConfigurationMetadata(chunk_shape, codecs, index_codecs) - ) diff --git a/zarr/v3/codecs/__init__.py b/zarr/v3/codecs/__init__.py new file mode 100644 index 0000000000..30a42c8ad5 --- /dev/null +++ b/zarr/v3/codecs/__init__.py @@ -0,0 +1,232 @@ +from __future__ import annotations + +from functools import reduce +from typing import ( + TYPE_CHECKING, + Iterable, + List, + Literal, + Optional, + Tuple, + Union, +) +from warnings import warn + +import numpy as np +from attr import frozen + +from zarr.v3.abc.codec import Codec, ArrayArrayCodec, ArrayBytesCodec, BytesBytesCodec +from zarr.v3.common import BytesLike +from zarr.v3.metadata import CodecMetadata, ShardingCodecIndexLocation +from zarr.v3.codecs.registry import get_codec_class + +if TYPE_CHECKING: + from zarr.v3.metadata import CoreArrayMetadata + from zarr.v3.codecs.sharding import ShardingCodecMetadata + from zarr.v3.codecs.blosc import BloscCodecMetadata + from zarr.v3.codecs.bytes import BytesCodecMetadata + from zarr.v3.codecs.transpose import TransposeCodecMetadata + from zarr.v3.codecs.gzip import GzipCodecMetadata + from zarr.v3.codecs.zstd import ZstdCodecMetadata + from zarr.v3.codecs.crc32c_ import Crc32cCodecMetadata + + +@frozen +class CodecPipeline: + codecs: List[Codec] + + @classmethod + def from_metadata( + cls, + codecs_metadata: Iterable[CodecMetadata], + array_metadata: CoreArrayMetadata, + ) -> CodecPipeline: + out: List[Codec] = [] + for codec_metadata in codecs_metadata or []: + codec_cls = get_codec_class(codec_metadata.name) + codec = codec_cls.from_metadata(codec_metadata, array_metadata) + out.append(codec) + array_metadata = codec.resolve_metadata() + CodecPipeline._validate_codecs(out, array_metadata) + return cls(out) + + @staticmethod + def _validate_codecs(codecs: List[Codec], array_metadata: CoreArrayMetadata) -> None: + from zarr.v3.codecs.sharding import ShardingCodec + + assert any( + isinstance(codec, ArrayBytesCodec) for codec in codecs + ), "Exactly one array-to-bytes codec is required." + + prev_codec: Optional[Codec] = None + for codec in codecs: + if prev_codec is not None: + assert not isinstance(codec, ArrayBytesCodec) or not isinstance( + prev_codec, ArrayBytesCodec + ), ( + f"ArrayBytesCodec '{type(codec)}' cannot follow after " + + f"ArrayBytesCodec '{type(prev_codec)}' because exactly " + + "1 ArrayBytesCodec is allowed." + ) + assert not isinstance(codec, ArrayBytesCodec) or not isinstance( + prev_codec, BytesBytesCodec + ), ( + f"ArrayBytesCodec '{type(codec)}' cannot follow after " + + f"BytesBytesCodec '{type(prev_codec)}'." + ) + assert not isinstance(codec, ArrayArrayCodec) or not isinstance( + prev_codec, ArrayBytesCodec + ), ( + f"ArrayArrayCodec '{type(codec)}' cannot follow after " + + f"ArrayBytesCodec '{type(prev_codec)}'." + ) + assert not isinstance(codec, ArrayArrayCodec) or not isinstance( + prev_codec, BytesBytesCodec + ), ( + f"ArrayArrayCodec '{type(codec)}' cannot follow after " + + f"BytesBytesCodec '{type(prev_codec)}'." + ) + + if isinstance(codec, ShardingCodec): + assert len(codec.configuration.chunk_shape) == len(array_metadata.shape), ( + "The shard's `chunk_shape` and array's `shape` need to have the " + + "same number of dimensions." + ) + assert all( + s % c == 0 + for s, c in zip( + array_metadata.chunk_shape, + codec.configuration.chunk_shape, + ) + ), ( + "The array's `chunk_shape` needs to be divisible by the " + + "shard's inner `chunk_shape`." + ) + prev_codec = codec + + if any(isinstance(codec, ShardingCodec) for codec in codecs) and len(codecs) > 1: + warn( + "Combining a `sharding_indexed` codec disables partial reads and " + + "writes, which may lead to inefficient performance." + ) + + def _array_array_codecs(self) -> List[ArrayArrayCodec]: + return [codec for codec in self.codecs if isinstance(codec, ArrayArrayCodec)] + + def _array_bytes_codec(self) -> ArrayBytesCodec: + return next(codec for codec in self.codecs if isinstance(codec, ArrayBytesCodec)) + + def _bytes_bytes_codecs(self) -> List[BytesBytesCodec]: + return [codec for codec in self.codecs if isinstance(codec, BytesBytesCodec)] + + async def decode(self, chunk_bytes: BytesLike) -> np.ndarray: + for bb_codec in self._bytes_bytes_codecs()[::-1]: + chunk_bytes = await bb_codec.decode(chunk_bytes) + + chunk_array = await self._array_bytes_codec().decode(chunk_bytes) + + for aa_codec in self._array_array_codecs()[::-1]: + chunk_array = await aa_codec.decode(chunk_array) + + return chunk_array + + async def encode(self, chunk_array: np.ndarray) -> Optional[BytesLike]: + for aa_codec in self._array_array_codecs(): + chunk_array_maybe = await aa_codec.encode(chunk_array) + if chunk_array_maybe is None: + return None + chunk_array = chunk_array_maybe + + chunk_bytes_maybe = await self._array_bytes_codec().encode(chunk_array) + if chunk_bytes_maybe is None: + return None + chunk_bytes = chunk_bytes_maybe + + for bb_codec in self._bytes_bytes_codecs(): + chunk_bytes_maybe = await bb_codec.encode(chunk_bytes) + if chunk_bytes_maybe is None: + return None + chunk_bytes = chunk_bytes_maybe + + return chunk_bytes + + def compute_encoded_size(self, byte_length: int) -> int: + return reduce(lambda acc, codec: codec.compute_encoded_size(acc), self.codecs, byte_length) + + +def blosc_codec( + typesize: int, + cname: Literal["lz4", "lz4hc", "blosclz", "zstd", "snappy", "zlib"] = "zstd", + clevel: int = 5, + shuffle: Literal["noshuffle", "shuffle", "bitshuffle"] = "noshuffle", + blocksize: int = 0, +) -> "BloscCodecMetadata": + from zarr.v3.codecs.blosc import BloscCodecMetadata, BloscCodecConfigurationMetadata + + return BloscCodecMetadata( + configuration=BloscCodecConfigurationMetadata( + cname=cname, + clevel=clevel, + shuffle=shuffle, + blocksize=blocksize, + typesize=typesize, + ) + ) + + +def bytes_codec(endian: Optional[Literal["big", "little"]] = "little") -> "BytesCodecMetadata": + from zarr.v3.codecs.bytes import BytesCodecMetadata, BytesCodecConfigurationMetadata + + return BytesCodecMetadata(configuration=BytesCodecConfigurationMetadata(endian)) + + +def transpose_codec( + order: Union[Tuple[int, ...], Literal["C", "F"]], ndim: Optional[int] = None +) -> "TransposeCodecMetadata": + from zarr.v3.codecs.transpose import TransposeCodecMetadata, TransposeCodecConfigurationMetadata + + if order == "C" or order == "F": + assert ( + isinstance(ndim, int) and ndim > 0 + ), 'When using "C" or "F" the `ndim` argument needs to be provided.' + if order == "C": + order = tuple(range(ndim)) + if order == "F": + order = tuple(ndim - i - 1 for i in range(ndim)) + + return TransposeCodecMetadata(configuration=TransposeCodecConfigurationMetadata(order)) + + +def gzip_codec(level: int = 5) -> "GzipCodecMetadata": + from zarr.v3.codecs.gzip import GzipCodecMetadata, GzipCodecConfigurationMetadata + + return GzipCodecMetadata(configuration=GzipCodecConfigurationMetadata(level)) + + +def zstd_codec(level: int = 0, checksum: bool = False) -> "ZstdCodecMetadata": + from zarr.v3.codecs.zstd import ZstdCodecMetadata, ZstdCodecConfigurationMetadata + + return ZstdCodecMetadata(configuration=ZstdCodecConfigurationMetadata(level, checksum)) + + +def crc32c_codec() -> "Crc32cCodecMetadata": + from zarr.v3.codecs.crc32c_ import Crc32cCodecMetadata + + return Crc32cCodecMetadata() + + +def sharding_codec( + chunk_shape: Tuple[int, ...], + codecs: Optional[List[CodecMetadata]] = None, + index_codecs: Optional[List[CodecMetadata]] = None, + index_location: ShardingCodecIndexLocation = ShardingCodecIndexLocation.end, +) -> "ShardingCodecMetadata": + from zarr.v3.codecs.sharding import ShardingCodecMetadata, ShardingCodecConfigurationMetadata + + codecs = codecs or [bytes_codec()] + index_codecs = index_codecs or [bytes_codec(), crc32c_codec()] + return ShardingCodecMetadata( + configuration=ShardingCodecConfigurationMetadata( + chunk_shape, codecs, index_codecs, index_location + ) + ) diff --git a/zarr/v3/codecs/blosc.py b/zarr/v3/codecs/blosc.py new file mode 100644 index 0000000000..8fb32faaa7 --- /dev/null +++ b/zarr/v3/codecs/blosc.py @@ -0,0 +1,99 @@ +from __future__ import annotations + +from typing import ( + TYPE_CHECKING, + Dict, + Literal, + Optional, + Type, +) + +import numcodecs +import numpy as np +from attr import asdict, evolve, frozen, field +from numcodecs.blosc import Blosc + +from zarr.v3.abc.codec import BytesBytesCodec +from zarr.v3.codecs.registry import register_codec +from zarr.v3.common import BytesLike, to_thread +from zarr.v3.metadata import CodecMetadata + +if TYPE_CHECKING: + from zarr.v3.metadata import CoreArrayMetadata + + +BloscShuffle = Literal["noshuffle", "shuffle", "bitshuffle"] + +# See https://zarr.readthedocs.io/en/stable/tutorial.html#configuring-blosc +numcodecs.blosc.use_threads = False + + +@frozen +class BloscCodecConfigurationMetadata: + typesize: int + cname: Literal["lz4", "lz4hc", "blosclz", "zstd", "snappy", "zlib"] = "zstd" + clevel: int = 5 + shuffle: BloscShuffle = "noshuffle" + blocksize: int = 0 + + +blosc_shuffle_int_to_str: Dict[int, BloscShuffle] = { + 0: "noshuffle", + 1: "shuffle", + 2: "bitshuffle", +} + + +@frozen +class BloscCodecMetadata: + configuration: BloscCodecConfigurationMetadata + name: Literal["blosc"] = field(default="blosc", init=False) + + +@frozen +class BloscCodec(BytesBytesCodec): + array_metadata: CoreArrayMetadata + configuration: BloscCodecConfigurationMetadata + blosc_codec: Blosc + is_fixed_size = False + + @classmethod + def from_metadata( + cls, codec_metadata: CodecMetadata, array_metadata: CoreArrayMetadata + ) -> BloscCodec: + assert isinstance(codec_metadata, BloscCodecMetadata) + configuration = codec_metadata.configuration + if configuration.typesize == 0: + configuration = evolve(configuration, typesize=array_metadata.data_type.byte_count) + config_dict = asdict(codec_metadata.configuration) + config_dict.pop("typesize", None) + map_shuffle_str_to_int = {"noshuffle": 0, "shuffle": 1, "bitshuffle": 2} + config_dict["shuffle"] = map_shuffle_str_to_int[config_dict["shuffle"]] + return cls( + array_metadata=array_metadata, + configuration=configuration, + blosc_codec=Blosc.from_config(config_dict), + ) + + @classmethod + def get_metadata_class(cls) -> Type[BloscCodecMetadata]: + return BloscCodecMetadata + + async def decode( + self, + chunk_bytes: bytes, + ) -> BytesLike: + return await to_thread(self.blosc_codec.decode, chunk_bytes) + + async def encode( + self, + chunk_bytes: bytes, + ) -> Optional[BytesLike]: + chunk_array = np.frombuffer(chunk_bytes, dtype=self.array_metadata.dtype) + return await to_thread(self.blosc_codec.encode, chunk_array) + + def compute_encoded_size(self, _input_byte_length: int) -> int: + raise NotImplementedError + + +register_codec("blosc", BloscCodec) diff --git a/zarr/v3/codecs/bytes.py b/zarr/v3/codecs/bytes.py new file mode 100644 index 0000000000..80a3f155d0 --- /dev/null +++ b/zarr/v3/codecs/bytes.py @@ -0,0 +1,105 @@ +from __future__ import annotations + +from typing import ( + TYPE_CHECKING, + Literal, + Optional, + Type, +) + +import numpy as np +from attr import frozen, field + +from zarr.v3.abc.codec import ArrayBytesCodec +from zarr.v3.codecs.registry import register_codec +from zarr.v3.common import BytesLike +from zarr.v3.metadata import CodecMetadata + +if TYPE_CHECKING: + from zarr.v3.metadata import CoreArrayMetadata + + +@frozen +class BytesCodecConfigurationMetadata: + endian: Optional[Literal["big", "little"]] = "little" + + +@frozen +class BytesCodecMetadata: + configuration: BytesCodecConfigurationMetadata + name: Literal["bytes"] = field(default="bytes", init=False) + + +@frozen +class BytesCodec(ArrayBytesCodec): + array_metadata: CoreArrayMetadata + configuration: BytesCodecConfigurationMetadata + is_fixed_size = True + + @classmethod + def from_metadata( + cls, codec_metadata: CodecMetadata, array_metadata: CoreArrayMetadata + ) -> BytesCodec: + assert isinstance(codec_metadata, BytesCodecMetadata) + assert ( + array_metadata.dtype.itemsize == 1 or codec_metadata.configuration.endian is not None + ), "The `endian` configuration needs to be specified for multi-byte data types." + return cls( + array_metadata=array_metadata, + configuration=codec_metadata.configuration, + ) + + @classmethod + def get_metadata_class(cls) -> Type[BytesCodecMetadata]: + return BytesCodecMetadata + + def _get_byteorder(self, array: np.ndarray) -> Literal["big", "little"]: + if array.dtype.byteorder == "<": + return "little" + elif array.dtype.byteorder == ">": + return "big" + else: + import sys + + return sys.byteorder + + async def decode( + self, + chunk_bytes: BytesLike, + ) -> np.ndarray: + if self.array_metadata.dtype.itemsize > 0: + if self.configuration.endian == "little": + prefix = "<" + else: + prefix = ">" + dtype = np.dtype(f"{prefix}{self.array_metadata.data_type.to_numpy_shortname()}") + else: + dtype = np.dtype(f"|{self.array_metadata.data_type.to_numpy_shortname()}") + chunk_array = np.frombuffer(chunk_bytes, dtype) + + # ensure correct chunk shape + if chunk_array.shape != self.array_metadata.chunk_shape: + chunk_array = chunk_array.reshape( + self.array_metadata.chunk_shape, + ) + return chunk_array + + async def encode( + self, + chunk_array: np.ndarray, + ) -> Optional[BytesLike]: + if chunk_array.dtype.itemsize > 1: + byteorder = self._get_byteorder(chunk_array) + if self.configuration.endian != byteorder: + new_dtype = chunk_array.dtype.newbyteorder(self.configuration.endian) + chunk_array = chunk_array.astype(new_dtype) + return chunk_array.tobytes() + + def compute_encoded_size(self, input_byte_length: int) -> int: + return input_byte_length + + +register_codec("bytes", BytesCodec) + +# compatibility with earlier versions of ZEP1 +register_codec("endian", BytesCodec) diff --git a/zarr/v3/codecs/crc32c_.py b/zarr/v3/codecs/crc32c_.py new file mode 100644 index 0000000000..c4fab3c9b9 --- /dev/null +++ b/zarr/v3/codecs/crc32c_.py @@ -0,0 +1,64 @@ +from __future__ import annotations + +from typing import ( + TYPE_CHECKING, + Literal, + Optional, + Type, +) + +import numpy as np +from attr import frozen, field +from crc32c import crc32c + +from zarr.v3.abc.codec import BytesBytesCodec +from zarr.v3.codecs.registry import register_codec +from zarr.v3.common import BytesLike +from zarr.v3.metadata import CodecMetadata + +if TYPE_CHECKING: + from zarr.v3.metadata import CoreArrayMetadata + + +@frozen +class Crc32cCodecMetadata: + name: Literal["crc32c"] = field(default="crc32c", init=False) + + +@frozen +class Crc32cCodec(BytesBytesCodec): + array_metadata: CoreArrayMetadata + is_fixed_size = True + + @classmethod + def from_metadata( + cls, codec_metadata: CodecMetadata, array_metadata: CoreArrayMetadata + ) -> Crc32cCodec: + assert isinstance(codec_metadata, Crc32cCodecMetadata) + return cls(array_metadata=array_metadata) + + @classmethod + def get_metadata_class(cls) -> Type[Crc32cCodecMetadata]: + return Crc32cCodecMetadata + + async def decode( + self, + chunk_bytes: bytes, + ) -> BytesLike: + crc32_bytes = chunk_bytes[-4:] + inner_bytes = chunk_bytes[:-4] + + assert np.uint32(crc32c(inner_bytes)).tobytes() == bytes(crc32_bytes) + return inner_bytes + + async def encode( + self, + chunk_bytes: bytes, + ) -> Optional[BytesLike]: + return chunk_bytes + np.uint32(crc32c(chunk_bytes)).tobytes() + + def compute_encoded_size(self, input_byte_length: int) -> int: + return input_byte_length + 4 + + +register_codec("crc32c", Crc32cCodec) diff --git a/zarr/v3/codecs/gzip.py b/zarr/v3/codecs/gzip.py new file mode 100644 index 0000000000..be1ebcdc9f --- /dev/null +++ b/zarr/v3/codecs/gzip.py @@ -0,0 +1,70 @@ +from __future__ import annotations + +from typing import ( + TYPE_CHECKING, + Literal, + Optional, + Type, +) + +from attr import frozen, field +from numcodecs.gzip import GZip + +from zarr.v3.abc.codec import BytesBytesCodec +from zarr.v3.codecs.registry import register_codec +from zarr.v3.common import BytesLike, to_thread +from zarr.v3.metadata import CodecMetadata + +if TYPE_CHECKING: + from zarr.v3.metadata import CoreArrayMetadata + + +@frozen +class GzipCodecConfigurationMetadata: + level: int = 5 + + +@frozen +class GzipCodecMetadata: + configuration: GzipCodecConfigurationMetadata + name: Literal["gzip"] = field(default="gzip", init=False) + + +@frozen +class GzipCodec(BytesBytesCodec): + array_metadata: CoreArrayMetadata + configuration: GzipCodecConfigurationMetadata + is_fixed_size = True + + @classmethod + def from_metadata( + cls, codec_metadata: CodecMetadata, array_metadata: CoreArrayMetadata + ) -> GzipCodec: + assert isinstance(codec_metadata, GzipCodecMetadata) + + return cls( + array_metadata=array_metadata, + configuration=codec_metadata.configuration, + ) + + @classmethod + def get_metadata_class(cls) -> Type[GzipCodecMetadata]: + return GzipCodecMetadata + + async def decode( + self, + chunk_bytes: bytes, + ) -> BytesLike: + return await to_thread(GZip(self.configuration.level).decode, chunk_bytes) + + async def encode( + self, + chunk_bytes: bytes, + ) -> Optional[BytesLike]: + return await to_thread(GZip(self.configuration.level).encode, chunk_bytes) + + def compute_encoded_size(self, _input_byte_length: int) -> int: + raise NotImplementedError + + +register_codec("gzip", GzipCodec) diff --git a/zarr/v3/codecs/registry.py b/zarr/v3/codecs/registry.py new file mode 100644 index 0000000000..642c0feebb --- /dev/null +++ b/zarr/v3/codecs/registry.py @@ -0,0 +1,56 @@ +from __future__ import annotations + +from typing import Dict, NamedTuple, Type +from importlib.metadata import EntryPoint, entry_points as get_entry_points + +from zarr.v3.abc.codec import Codec +from zarr.v3.metadata import CodecMetadata + + +class CodecRegistryItem(NamedTuple): + codec_cls: Type[Codec] + codec_metadata_cls: Type[CodecMetadata] + + +__codec_registry: Dict[str, CodecRegistryItem] = {} +__lazy_load_codecs: Dict[str, EntryPoint] = {} + + +def _collect_entrypoints() -> None: + entry_points = get_entry_points() + if hasattr(entry_points, "select"): + # If entry_points() has a select method, use that. Python 3.10+ + for e in entry_points.select(group="zarr.codecs"): + __lazy_load_codecs[e.name] = e + else: + # Otherwise, fallback to using get + for e in entry_points.get("zarr.codecs", []): + __lazy_load_codecs[e.name] = e + + +def register_codec(key: str, codec_cls: Type[Codec]) -> None: + __codec_registry[key] = CodecRegistryItem(codec_cls, codec_cls.get_metadata_class()) + + +def _get_codec_item(key: str) -> CodecRegistryItem: + item = __codec_registry.get(key) + if item is None: + if key in __lazy_load_codecs: + # logger.debug("Auto loading codec '%s' from entrypoint", codec_id) + cls = __lazy_load_codecs[key].load() + register_codec(key, cls) + item = __codec_registry.get(key) + if item: + return item + raise KeyError(key) + + +def get_codec_metadata_class(key: str) -> Type[CodecMetadata]: + return _get_codec_item(key).codec_metadata_cls + + +def get_codec_class(key: str) -> Type[Codec]: + return _get_codec_item(key).codec_cls + + +_collect_entrypoints() diff --git a/zarr/v3/sharding.py b/zarr/v3/codecs/sharding.py similarity index 85% rename from zarr/v3/sharding.py rename to zarr/v3/codecs/sharding.py index 3c5b4bd12d..edbe327a6b 100644 --- a/zarr/v3/sharding.py +++ b/zarr/v3/codecs/sharding.py @@ -1,11 +1,29 @@ from __future__ import annotations -from typing import Iterator, List, Mapping, NamedTuple, Optional, Set, Tuple +from typing import ( + Awaitable, + Callable, + Iterator, + List, + Literal, + Mapping, + NamedTuple, + Optional, + Set, + Tuple, + Type, +) +from attr import field, frozen import numpy as np -from attrs import frozen +from zarr.v3.abc.codec import ( + ArrayBytesCodec, + ArrayBytesCodecPartialDecodeMixin, + ArrayBytesCodecPartialEncodeMixin, +) -from zarr.v3.codecs import ArrayBytesCodec, CodecPipeline +from zarr.v3.codecs import CodecPipeline +from zarr.v3.codecs.registry import register_codec from zarr.v3.common import ( BytesLike, ChunkCoords, @@ -22,14 +40,28 @@ from zarr.v3.metadata import ( CoreArrayMetadata, DataType, - ShardingCodecConfigurationMetadata, - ShardingCodecMetadata, + CodecMetadata, + ShardingCodecIndexLocation, ) from zarr.v3.store import StorePath MAX_UINT_64 = 2**64 - 1 +@frozen +class ShardingCodecConfigurationMetadata: + chunk_shape: ChunkCoords + codecs: List["CodecMetadata"] + index_codecs: List["CodecMetadata"] + index_location: ShardingCodecIndexLocation = ShardingCodecIndexLocation.end + + +@frozen +class ShardingCodecMetadata: + configuration: ShardingCodecConfigurationMetadata + name: Literal["sharding_indexed"] = field(default="sharding_indexed", init=False) + + class _ShardIndex(NamedTuple): # dtype uint64, shape (chunks_per_shard_0, chunks_per_shard_1, ..., 2) offsets_and_lengths: np.ndarray @@ -49,7 +81,7 @@ def get_chunk_slice(self, chunk_coords: ChunkCoords) -> Optional[Tuple[int, int] if (chunk_start, chunk_len) == (MAX_UINT_64, MAX_UINT_64): return None else: - return (int(chunk_start), int(chunk_start + chunk_len)) + return (int(chunk_start), int(chunk_start) + int(chunk_len)) def set_chunk_slice(self, chunk_coords: ChunkCoords, chunk_slice: Optional[slice]) -> None: localized_chunk = self._localize_chunk(chunk_coords) @@ -95,9 +127,15 @@ class _ShardProxy(Mapping): @classmethod async def from_bytes(cls, buf: BytesLike, codec: ShardingCodec) -> _ShardProxy: + shard_index_size = codec._shard_index_size() obj = cls() obj.buf = memoryview(buf) - obj.index = await codec._decode_shard_index(obj.buf[-codec._shard_index_size() :]) + if codec.configuration.index_location == ShardingCodecIndexLocation.start: + shard_index_bytes = obj.buf[:shard_index_size] + else: + shard_index_bytes = obj.buf[-shard_index_size:] + + obj.index = await codec._decode_shard_index(shard_index_bytes) return obj @classmethod @@ -156,13 +194,27 @@ def append(self, chunk_coords: ChunkCoords, value: BytesLike): self.buf.extend(value) self.index.set_chunk_slice(chunk_coords, slice(chunk_start, chunk_start + chunk_length)) - def finalize(self, index_bytes: BytesLike) -> BytesLike: - self.buf.extend(index_bytes) - return self.buf + async def finalize( + self, + index_location: ShardingCodecIndexLocation, + index_encoder: Callable[[_ShardIndex], Awaitable[BytesLike]], + ) -> BytesLike: + index_bytes = await index_encoder(self.index) + if index_location == ShardingCodecIndexLocation.start: + self.index.offsets_and_lengths[..., 0] += len(index_bytes) + index_bytes = await index_encoder(self.index) # encode again with corrected offsets + out_buf = bytearray(index_bytes) + out_buf.extend(self.buf) + else: + out_buf = self.buf + out_buf.extend(index_bytes) + return out_buf @frozen -class ShardingCodec(ArrayBytesCodec): +class ShardingCodec( + ArrayBytesCodec, ArrayBytesCodecPartialDecodeMixin, ArrayBytesCodecPartialEncodeMixin +): array_metadata: CoreArrayMetadata configuration: ShardingCodecConfigurationMetadata codec_pipeline: CodecPipeline @@ -172,9 +224,11 @@ class ShardingCodec(ArrayBytesCodec): @classmethod def from_metadata( cls, - codec_metadata: ShardingCodecMetadata, + codec_metadata: CodecMetadata, array_metadata: CoreArrayMetadata, ) -> ShardingCodec: + assert isinstance(codec_metadata, ShardingCodecMetadata) + chunks_per_shard = tuple( s // c for s, c in zip( @@ -211,6 +265,10 @@ def from_metadata( chunks_per_shard=chunks_per_shard, ) + @classmethod + def get_metadata_class(cls) -> Type[ShardingCodecMetadata]: + return ShardingCodecMetadata + async def decode( self, shard_bytes: BytesLike, @@ -260,7 +318,6 @@ async def decode_partial( store_path: StorePath, selection: SliceSelection, ) -> Optional[np.ndarray]: - # print("decode_partial") shard_shape = self.array_metadata.chunk_shape chunk_shape = self.configuration.chunk_shape @@ -390,7 +447,9 @@ async def _write_chunk( if chunk_bytes is not None: shard_builder.append(chunk_coords, chunk_bytes) - return shard_builder.finalize(await self._encode_shard_index(shard_builder.index)) + return await shard_builder.finalize( + self.configuration.index_location, self._encode_shard_index + ) async def encode_partial( self, @@ -477,7 +536,10 @@ async def _write_chunk( await store_path.delete_async() else: await store_path.set_async( - shard_builder.finalize(await self._encode_shard_index(shard_builder.index)) + await shard_builder.finalize( + self.configuration.index_location, + self._encode_shard_index, + ) ) def _is_total_shard(self, all_chunk_coords: Set[ChunkCoords]) -> bool: @@ -497,7 +559,11 @@ def _shard_index_size(self) -> int: return self.index_codec_pipeline.compute_encoded_size(16 * product(self.chunks_per_shard)) async def _load_shard_index_maybe(self, store_path: StorePath) -> Optional[_ShardIndex]: - index_bytes = await store_path.get_async((-self._shard_index_size(), None)) + shard_index_size = self._shard_index_size() + if self.configuration.index_location == ShardingCodecIndexLocation.start: + index_bytes = await store_path.get_async((0, shard_index_size)) + else: + index_bytes = await store_path.get_async((-shard_index_size, None)) if index_bytes is not None: return await self._decode_shard_index(index_bytes) return None @@ -514,3 +580,6 @@ async def _load_full_shard_maybe(self, store_path: StorePath) -> Optional[_Shard def compute_encoded_size(self, input_byte_length: int) -> int: return input_byte_length + self._shard_index_size() + + +register_codec("sharding_indexed", ShardingCodec) diff --git a/zarr/v3/codecs/transpose.py b/zarr/v3/codecs/transpose.py new file mode 100644 index 0000000000..d160f2a88d --- /dev/null +++ b/zarr/v3/codecs/transpose.py @@ -0,0 +1,114 @@ +from __future__ import annotations + +from typing import ( + TYPE_CHECKING, + Literal, + Optional, + Tuple, + Type, +) + +import numpy as np +from attr import frozen, field + +from zarr.v3.abc.codec import ArrayArrayCodec +from zarr.v3.codecs.registry import register_codec +from zarr.v3.metadata import CodecMetadata + +if TYPE_CHECKING: + from zarr.v3.metadata import CoreArrayMetadata + + +@frozen +class TransposeCodecConfigurationMetadata: + order: Tuple[int, ...] + + +@frozen +class TransposeCodecMetadata: + configuration: TransposeCodecConfigurationMetadata + name: Literal["transpose"] = field(default="transpose", init=False) + + +@frozen +class TransposeCodec(ArrayArrayCodec): + array_metadata: CoreArrayMetadata + order: Tuple[int, ...] + is_fixed_size = True + + @classmethod + def from_metadata( + cls, codec_metadata: CodecMetadata, array_metadata: CoreArrayMetadata + ) -> TransposeCodec: + assert isinstance(codec_metadata, TransposeCodecMetadata) + + configuration = codec_metadata.configuration + # Compatibility with older version of ZEP1 + if configuration.order == "F": # type: ignore + order = tuple(array_metadata.ndim - x - 1 for x in range(array_metadata.ndim)) + + elif configuration.order == "C": # type: ignore + order = tuple(range(array_metadata.ndim)) + + else: + assert len(configuration.order) == array_metadata.ndim, ( + "The `order` tuple needs have as many entries as " + + f"there are dimensions in the array. Got: {configuration.order}" + ) + assert len(configuration.order) == len(set(configuration.order)), ( + "There must not be duplicates in the `order` tuple. " + + f"Got: {configuration.order}" + ) + assert all(0 <= x < array_metadata.ndim for x in configuration.order), ( + "All entries in the `order` tuple must be between 0 and " + + f"the number of dimensions in the array. Got: {configuration.order}" + ) + order = tuple(configuration.order) + + return cls( + array_metadata=array_metadata, + order=order, + ) + + @classmethod + def get_metadata_class(cls) -> Type[TransposeCodecMetadata]: + return TransposeCodecMetadata + + def resolve_metadata(self) -> CoreArrayMetadata: + from zarr.v3.metadata import CoreArrayMetadata + + return CoreArrayMetadata( + shape=tuple( + self.array_metadata.shape[self.order[i]] for i in range(self.array_metadata.ndim) + ), + chunk_shape=tuple( + self.array_metadata.chunk_shape[self.order[i]] + for i in range(self.array_metadata.ndim) + ), + data_type=self.array_metadata.data_type, + fill_value=self.array_metadata.fill_value, + runtime_configuration=self.array_metadata.runtime_configuration, + ) + + async def decode( + self, + chunk_array: np.ndarray, + ) -> np.ndarray: + inverse_order = [0 for _ in range(self.array_metadata.ndim)] + for x, i in enumerate(self.order): + inverse_order[x] = i + chunk_array = chunk_array.transpose(inverse_order) + return chunk_array + + async def encode( + self, + chunk_array: np.ndarray, + ) -> Optional[np.ndarray]: + chunk_array = chunk_array.transpose(self.order) + return chunk_array + + def compute_encoded_size(self, input_byte_length: int) -> int: + return input_byte_length + + +register_codec("transpose", TransposeCodec) diff --git a/zarr/v3/codecs/zstd.py b/zarr/v3/codecs/zstd.py new file mode 100644 index 0000000000..e66d9e0700 --- /dev/null +++ b/zarr/v3/codecs/zstd.py @@ -0,0 +1,80 @@ +from __future__ import annotations + +from typing import ( + TYPE_CHECKING, + Literal, + Optional, + Type, +) + +from attr import frozen, field +from zstandard import ZstdCompressor, ZstdDecompressor + +from zarr.v3.abc.codec import BytesBytesCodec +from zarr.v3.codecs.registry import register_codec +from zarr.v3.common import BytesLike, to_thread +from zarr.v3.metadata import CodecMetadata + +if TYPE_CHECKING: + from zarr.v3.metadata import CoreArrayMetadata + + +@frozen +class ZstdCodecConfigurationMetadata: + level: int = 0 + checksum: bool = False + + +@frozen +class ZstdCodecMetadata: + configuration: ZstdCodecConfigurationMetadata + name: Literal["zstd"] = field(default="zstd", init=False) + + +@frozen +class ZstdCodec(BytesBytesCodec): + array_metadata: CoreArrayMetadata + configuration: ZstdCodecConfigurationMetadata + is_fixed_size = True + + @classmethod + def from_metadata( + cls, codec_metadata: CodecMetadata, array_metadata: CoreArrayMetadata + ) -> ZstdCodec: + assert isinstance(codec_metadata, ZstdCodecMetadata) + return cls( + array_metadata=array_metadata, + configuration=codec_metadata.configuration, + ) + + @classmethod + def get_metadata_class(cls) -> Type[ZstdCodecMetadata]: + return ZstdCodecMetadata + + def _compress(self, data: bytes) -> bytes: + ctx = ZstdCompressor( + level=self.configuration.level, write_checksum=self.configuration.checksum + ) + return ctx.compress(data) + + def _decompress(self, data: bytes) -> bytes: + ctx = ZstdDecompressor() + return ctx.decompress(data) + + async def decode( + self, + chunk_bytes: bytes, + ) -> BytesLike: + return await to_thread(self._decompress, chunk_bytes) + + async def encode( + self, + chunk_bytes: bytes, + ) -> Optional[BytesLike]: + return await to_thread(self._compress, chunk_bytes) + + def compute_encoded_size(self, _input_byte_length: int) -> int: + raise NotImplementedError + + +register_codec("zstd", ZstdCodec) diff --git a/zarr/v3/common.py b/zarr/v3/common.py index 0e55a7c1fd..e91356c4e2 100644 --- a/zarr/v3/common.py +++ b/zarr/v3/common.py @@ -32,18 +32,12 @@ def make_cattr(): from zarr.v3.metadata import ( - BloscCodecMetadata, - BytesCodecMetadata, ChunkKeyEncodingMetadata, CodecMetadata, - Crc32cCodecMetadata, DefaultChunkKeyEncodingMetadata, - GzipCodecMetadata, - ShardingCodecMetadata, - TransposeCodecMetadata, V2ChunkKeyEncodingMetadata, - ZstdCodecMetadata, ) + from zarr.v3.codecs.registry import get_codec_metadata_class converter = Converter() @@ -59,24 +53,8 @@ def _structure_chunk_key_encoding_metadata(d: Dict[str, Any], _t) -> ChunkKeyEnc ) def _structure_codec_metadata(d: Dict[str, Any], _t=None) -> CodecMetadata: - if d["name"] == "endian": - d["name"] = "bytes" - - if d["name"] == "blosc": - return converter.structure(d, BloscCodecMetadata) - if d["name"] == "bytes": - return converter.structure(d, BytesCodecMetadata) - if d["name"] == "transpose": - return converter.structure(d, TransposeCodecMetadata) - if d["name"] == "gzip": - return converter.structure(d, GzipCodecMetadata) - if d["name"] == "zstd": - return converter.structure(d, ZstdCodecMetadata) - if d["name"] == "sharding_indexed": - return converter.structure(d, ShardingCodecMetadata) - if d["name"] == "crc32c": - return converter.structure(d, Crc32cCodecMetadata) - raise KeyError + codec_metadata_cls = get_codec_metadata_class(d["name"]) + return converter.structure(d, codec_metadata_cls) converter.register_structure_hook(CodecMetadata, _structure_codec_metadata) diff --git a/zarr/v3/metadata.py b/zarr/v3/metadata.py index 1fc43b19f0..53b300d3f8 100644 --- a/zarr/v3/metadata.py +++ b/zarr/v3/metadata.py @@ -3,7 +3,7 @@ import json from asyncio import AbstractEventLoop from enum import Enum -from typing import Any, Dict, List, Literal, Optional, Tuple, Union +from typing import Any, Dict, List, Literal, Optional, Protocol, Tuple, Union import numpy as np from attr import asdict, field, frozen @@ -142,103 +142,15 @@ def encode_chunk_key(self, chunk_coords: ChunkCoords) -> str: ChunkKeyEncodingMetadata = Union[DefaultChunkKeyEncodingMetadata, V2ChunkKeyEncodingMetadata] -BloscShuffle = Literal["noshuffle", "shuffle", "bitshuffle"] - - -@frozen -class BloscCodecConfigurationMetadata: - typesize: int - cname: Literal["lz4", "lz4hc", "blosclz", "zstd", "snappy", "zlib"] = "zstd" - clevel: int = 5 - shuffle: BloscShuffle = "noshuffle" - blocksize: int = 0 - - -blosc_shuffle_int_to_str: Dict[int, BloscShuffle] = { - 0: "noshuffle", - 1: "shuffle", - 2: "bitshuffle", -} - - -@frozen -class BloscCodecMetadata: - configuration: BloscCodecConfigurationMetadata - name: Literal["blosc"] = "blosc" - - -@frozen -class BytesCodecConfigurationMetadata: - endian: Optional[Literal["big", "little"]] = "little" - - -@frozen -class BytesCodecMetadata: - configuration: BytesCodecConfigurationMetadata - name: Literal["bytes"] = "bytes" - - -@frozen -class TransposeCodecConfigurationMetadata: - order: Union[Literal["C", "F"], Tuple[int, ...]] = "C" - - -@frozen -class TransposeCodecMetadata: - configuration: TransposeCodecConfigurationMetadata - name: Literal["transpose"] = "transpose" - - -@frozen -class GzipCodecConfigurationMetadata: - level: int = 5 - - -@frozen -class GzipCodecMetadata: - configuration: GzipCodecConfigurationMetadata - name: Literal["gzip"] = "gzip" - - -@frozen -class ZstdCodecConfigurationMetadata: - level: int = 0 - checksum: bool = False - - -@frozen -class ZstdCodecMetadata: - configuration: ZstdCodecConfigurationMetadata - name: Literal["zstd"] = "zstd" - - -@frozen -class Crc32cCodecMetadata: - name: Literal["crc32c"] = "crc32c" - - -@frozen -class ShardingCodecConfigurationMetadata: - chunk_shape: ChunkCoords - codecs: List["CodecMetadata"] - index_codecs: List["CodecMetadata"] - - -@frozen -class ShardingCodecMetadata: - configuration: ShardingCodecConfigurationMetadata - name: Literal["sharding_indexed"] = "sharding_indexed" +class CodecMetadata(Protocol): + @property + def name(self) -> str: + pass -CodecMetadata = Union[ - BloscCodecMetadata, - BytesCodecMetadata, - TransposeCodecMetadata, - GzipCodecMetadata, - ZstdCodecMetadata, - ShardingCodecMetadata, - Crc32cCodecMetadata, -] +class ShardingCodecIndexLocation(Enum): + start = "start" + end = "end" @frozen @@ -290,7 +202,7 @@ def get_core_metadata(self, runtime_configuration: RuntimeConfiguration) -> Core def to_bytes(self) -> bytes: def _json_convert(o): - if isinstance(o, DataType): + if isinstance(o, Enum): return o.name raise TypeError diff --git a/zarr/v3/store.py b/zarr/v3/store.py index f7472c68d2..b6c20be41f 100644 --- a/zarr/v3/store.py +++ b/zarr/v3/store.py @@ -10,7 +10,7 @@ import asyncio import io from pathlib import Path -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union +from typing import TYPE_CHECKING, Any, Dict, List, MutableMapping, Optional, Tuple, Union from zarr.v3.common import BytesLike, to_thread @@ -284,6 +284,53 @@ def __repr__(self) -> str: return f"RemoteStore({repr(str(self))})" +class MemoryStore(Store): + supports_partial_writes = True + store_dict: MutableMapping[str, bytes] + + def __init__(self, store_dict: Optional[MutableMapping[str, bytes]] = None): + self.store_dict = store_dict or {} + + async def get_async( + self, key: str, byte_range: Optional[Tuple[int, Optional[int]]] = None + ) -> Optional[BytesLike]: + assert isinstance(key, str) + try: + value = self.store_dict[key] + if byte_range is not None: + value = value[byte_range[0] : byte_range[1]] + return value + except KeyError: + return None + + async def set_async( + self, key: str, value: BytesLike, byte_range: Optional[Tuple[int, int]] = None + ) -> None: + assert isinstance(key, str) + + if byte_range is not None: + buf = bytearray(self.store_dict[key]) + buf[byte_range[0] : byte_range[1]] = value + self.store_dict[key] = buf + else: + self.store_dict[key] = value + + async def delete_async(self, key: str) -> None: + try: + del self.store_dict[key] + except KeyError: + pass + + async def exists_async(self, key: str) -> bool: + return key in self.store_dict + + def __str__(self) -> str: + return f"memory://{id(self.store_dict)}" + + def __repr__(self) -> str: + return f"MemoryStore({repr(str(self))})" + + StoreLike = Union[Store, StorePath, Path, str]