diff --git a/src/zarr/core/codec_pipeline.py b/src/zarr/core/codec_pipeline.py index eed49556d3..96b63e06d1 100644 --- a/src/zarr/core/codec_pipeline.py +++ b/src/zarr/core/codec_pipeline.py @@ -1,6 +1,6 @@ from __future__ import annotations -from dataclasses import dataclass +from dataclasses import dataclass, field from itertools import islice, pairwise from typing import TYPE_CHECKING, Any, TypeVar from warnings import warn @@ -13,6 +13,7 @@ BytesBytesCodec, Codec, CodecPipeline, + SupportsSyncCodec, ) from zarr.core.common import concurrent_map from zarr.core.config import config @@ -68,6 +69,115 @@ def fill_value_or_default(chunk_spec: ArraySpec) -> Any: return fill_value +@dataclass(slots=True, kw_only=True) +class ChunkTransform: + """A synchronous codec chain bound to an ArraySpec. + + Provides ``encode_chunk`` and ``decode_chunk`` for pure-compute + codec operations (no IO, no threading, no batching). + + ``shape`` and ``dtype`` reflect the representation **after** all + ArrayArrayCodec transforms — i.e. the spec that feeds the + ArrayBytesCodec. + + All codecs must implement ``SupportsSyncCodec``. Construction will + raise ``TypeError`` if any codec does not. + """ + + codecs: tuple[Codec, ...] + array_spec: ArraySpec + + # (ArrayArrayCodec, input_spec) pairs in pipeline order. + _aa_codecs: tuple[tuple[ArrayArrayCodec, ArraySpec], ...] = field( + init=False, repr=False, compare=False + ) + _ab_codec: ArrayBytesCodec = field(init=False, repr=False, compare=False) + _ab_spec: ArraySpec = field(init=False, repr=False, compare=False) + _bb_codecs: tuple[BytesBytesCodec, ...] = field(init=False, repr=False, compare=False) + + def __post_init__(self) -> None: + non_sync = [c for c in self.codecs if not isinstance(c, SupportsSyncCodec)] + if non_sync: + names = ", ".join(type(c).__name__ for c in non_sync) + raise TypeError( + f"All codecs must implement SupportsSyncCodec. The following do not: {names}" + ) + + aa, ab, bb = codecs_from_list(list(self.codecs)) + + aa_codecs: tuple[tuple[ArrayArrayCodec, ArraySpec], ...] = () + spec = self.array_spec + for aa_codec in aa: + aa_codecs = (*aa_codecs, (aa_codec, spec)) + spec = aa_codec.resolve_metadata(spec) + + self._aa_codecs = aa_codecs + self._ab_codec = ab + self._ab_spec = spec + self._bb_codecs = bb + + @property + def shape(self) -> tuple[int, ...]: + """Shape after all ArrayArrayCodec transforms (input to the ArrayBytesCodec).""" + return self._ab_spec.shape + + @property + def dtype(self) -> ZDType[TBaseDType, TBaseScalar]: + """Dtype after all ArrayArrayCodec transforms (input to the ArrayBytesCodec).""" + return self._ab_spec.dtype + + def decode( + self, + chunk_bytes: Buffer, + ) -> NDBuffer: + """Decode a single chunk through the full codec chain, synchronously. + + Pure compute -- no IO. + """ + bb_out: Any = chunk_bytes + for bb_codec in reversed(self._bb_codecs): + bb_out = bb_codec._decode_sync(bb_out, self._ab_spec) # type: ignore[attr-defined] + + ab_out: Any = self._ab_codec._decode_sync(bb_out, self._ab_spec) # type: ignore[attr-defined] + + for aa_codec, spec in reversed(self._aa_codecs): + ab_out = aa_codec._decode_sync(ab_out, spec) # type: ignore[attr-defined] + + return ab_out # type: ignore[no-any-return] + + def encode( + self, + chunk_array: NDBuffer, + ) -> Buffer | None: + """Encode a single chunk through the full codec chain, synchronously. + + Pure compute -- no IO. + """ + aa_out: Any = chunk_array + + for aa_codec, spec in self._aa_codecs: + if aa_out is None: + return None + aa_out = aa_codec._encode_sync(aa_out, spec) # type: ignore[attr-defined] + + if aa_out is None: + return None + bb_out: Any = self._ab_codec._encode_sync(aa_out, self._ab_spec) # type: ignore[attr-defined] + + for bb_codec in self._bb_codecs: + if bb_out is None: + return None + bb_out = bb_codec._encode_sync(bb_out, self._ab_spec) # type: ignore[attr-defined] + + return bb_out # type: ignore[no-any-return] + + def compute_encoded_size(self, byte_length: int, array_spec: ArraySpec) -> int: + for codec in self.codecs: + byte_length = codec.compute_encoded_size(byte_length, array_spec) + array_spec = codec.resolve_metadata(array_spec) + return byte_length + + @dataclass(frozen=True) class BatchedCodecPipeline(CodecPipeline): """Default codec pipeline. @@ -270,7 +380,7 @@ async def read_batch( out[out_selection] = fill_value_or_default(chunk_spec) else: chunk_bytes_batch = await concurrent_map( - [(byte_getter, array_spec.prototype) for byte_getter, array_spec, *_ in batch_info], + [(byte_getter, chunk_spec.prototype) for byte_getter, chunk_spec, *_ in batch_info], lambda byte_getter, prototype: byte_getter.get(prototype), config.get("async.concurrency"), ) diff --git a/tests/test_sync_codec_pipeline.py b/tests/test_sync_codec_pipeline.py new file mode 100644 index 0000000000..5e5bc12a52 --- /dev/null +++ b/tests/test_sync_codec_pipeline.py @@ -0,0 +1,222 @@ +from __future__ import annotations + +from typing import Any + +import numpy as np +import pytest + +from zarr.abc.codec import ArrayBytesCodec +from zarr.codecs.bytes import BytesCodec +from zarr.codecs.crc32c_ import Crc32cCodec +from zarr.codecs.gzip import GzipCodec +from zarr.codecs.transpose import TransposeCodec +from zarr.codecs.zstd import ZstdCodec +from zarr.core.array_spec import ArrayConfig, ArraySpec +from zarr.core.buffer import Buffer, NDBuffer, default_buffer_prototype +from zarr.core.codec_pipeline import ChunkTransform +from zarr.core.dtype import get_data_type_from_native_dtype + + +def _make_array_spec(shape: tuple[int, ...], dtype: np.dtype[np.generic]) -> ArraySpec: + zdtype = get_data_type_from_native_dtype(dtype) + return ArraySpec( + shape=shape, + dtype=zdtype, + fill_value=zdtype.cast_scalar(0), + config=ArrayConfig(order="C", write_empty_chunks=True), + prototype=default_buffer_prototype(), + ) + + +def _make_nd_buffer(arr: np.ndarray[Any, np.dtype[Any]]) -> NDBuffer: + return default_buffer_prototype().nd_buffer.from_numpy_array(arr) + + +class TestChunkTransform: + def test_construction_bytes_only(self) -> None: + # Construction succeeds when all codecs implement SupportsSyncCodec. + spec = _make_array_spec((100,), np.dtype("float64")) + ChunkTransform(codecs=(BytesCodec(),), array_spec=spec) + + def test_construction_with_compression(self) -> None: + # AB + BB codec chain where both implement SupportsSyncCodec. + spec = _make_array_spec((100,), np.dtype("float64")) + ChunkTransform(codecs=(BytesCodec(), GzipCodec()), array_spec=spec) + + def test_construction_full_chain(self) -> None: + # All three codec types (AA + AB + BB), all implementing SupportsSyncCodec. + spec = _make_array_spec((3, 4), np.dtype("float64")) + ChunkTransform( + codecs=(TransposeCodec(order=(1, 0)), BytesCodec(), ZstdCodec()), array_spec=spec + ) + + def test_encode_decode_roundtrip_bytes_only(self) -> None: + # Minimal round-trip: BytesCodec serializes the array to bytes and back. + # No compression, no AA transform. + arr = np.arange(100, dtype="float64") + spec = _make_array_spec(arr.shape, arr.dtype) + chain = ChunkTransform(codecs=(BytesCodec(),), array_spec=spec) + nd_buf = _make_nd_buffer(arr) + + encoded = chain.encode(nd_buf) + assert encoded is not None + decoded = chain.decode(encoded) + np.testing.assert_array_equal(arr, decoded.as_numpy_array()) + + def test_shape_dtype_no_aa_codecs(self) -> None: + # Without AA codecs, shape and dtype should match the input ArraySpec + # (no transforms applied before the AB codec). + spec = _make_array_spec((100,), np.dtype("float64")) + chunk = ChunkTransform(codecs=(BytesCodec(),), array_spec=spec) + assert chunk.shape == (100,) + assert chunk.dtype == spec.dtype + + def test_shape_dtype_with_transpose(self) -> None: + # TransposeCodec(order=(1,0)) on a (3, 4) array produces (4, 3). + # shape/dtype reflect what the AB codec sees after all AA transforms. + spec = _make_array_spec((3, 4), np.dtype("float64")) + chunk = ChunkTransform(codecs=(TransposeCodec(order=(1, 0)), BytesCodec()), array_spec=spec) + assert chunk.shape == (4, 3) + assert chunk.dtype == spec.dtype + + def test_encode_decode_roundtrip_with_compression(self) -> None: + # Round-trip with a BB codec (GzipCodec) to verify that bytes-bytes + # compression/decompression is wired correctly. + arr = np.arange(100, dtype="float64") + spec = _make_array_spec(arr.shape, arr.dtype) + chain = ChunkTransform(codecs=(BytesCodec(), GzipCodec(level=1)), array_spec=spec) + nd_buf = _make_nd_buffer(arr) + + encoded = chain.encode(nd_buf) + assert encoded is not None + decoded = chain.decode(encoded) + np.testing.assert_array_equal(arr, decoded.as_numpy_array()) + + def test_encode_decode_roundtrip_with_transpose(self) -> None: + # Full AA + AB + BB chain round-trip. Transpose permutes axes on encode, + # then BytesCodec serializes, then ZstdCodec compresses. Decode reverses + # all three stages. Verifies the full pipeline works end to end. + arr = np.arange(12, dtype="float64").reshape(3, 4) + spec = _make_array_spec(arr.shape, arr.dtype) + chain = ChunkTransform( + codecs=(TransposeCodec(order=(1, 0)), BytesCodec(), ZstdCodec(level=1)), + array_spec=spec, + ) + nd_buf = _make_nd_buffer(arr) + + encoded = chain.encode(nd_buf) + assert encoded is not None + decoded = chain.decode(encoded) + np.testing.assert_array_equal(arr, decoded.as_numpy_array()) + + def test_rejects_non_sync_codec(self) -> None: + # Construction must raise TypeError when a codec lacks SupportsSyncCodec. + + class AsyncOnlyCodec(ArrayBytesCodec): + is_fixed_size = True + + async def _decode_single(self, chunk_bytes: Buffer, chunk_spec: ArraySpec) -> NDBuffer: + raise NotImplementedError # pragma: no cover + + async def _encode_single( + self, chunk_array: NDBuffer, chunk_spec: ArraySpec + ) -> Buffer | None: + raise NotImplementedError # pragma: no cover + + def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int: + return input_byte_length # pragma: no cover + + spec = _make_array_spec((100,), np.dtype("float64")) + with pytest.raises(TypeError, match="AsyncOnlyCodec"): + ChunkTransform(codecs=(AsyncOnlyCodec(),), array_spec=spec) + + def test_rejects_mixed_sync_and_non_sync(self) -> None: + # Even if some codecs support sync, a single non-sync codec should + # cause construction to fail. + + class AsyncOnlyCodec(ArrayBytesCodec): + is_fixed_size = True + + async def _decode_single(self, chunk_bytes: Buffer, chunk_spec: ArraySpec) -> NDBuffer: + raise NotImplementedError # pragma: no cover + + async def _encode_single( + self, chunk_array: NDBuffer, chunk_spec: ArraySpec + ) -> Buffer | None: + raise NotImplementedError # pragma: no cover + + def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int: + return input_byte_length # pragma: no cover + + spec = _make_array_spec((3, 4), np.dtype("float64")) + with pytest.raises(TypeError, match="AsyncOnlyCodec"): + ChunkTransform( + codecs=(TransposeCodec(order=(1, 0)), AsyncOnlyCodec()), + array_spec=spec, + ) + + def test_compute_encoded_size_bytes_only(self) -> None: + # BytesCodec is size-preserving: encoded size == input size. + spec = _make_array_spec((100,), np.dtype("float64")) + chain = ChunkTransform(codecs=(BytesCodec(),), array_spec=spec) + assert chain.compute_encoded_size(800, spec) == 800 + + def test_compute_encoded_size_with_crc32c(self) -> None: + # Crc32cCodec appends a 4-byte checksum, so encoded size = input + 4. + spec = _make_array_spec((100,), np.dtype("float64")) + chain = ChunkTransform(codecs=(BytesCodec(), Crc32cCodec()), array_spec=spec) + assert chain.compute_encoded_size(800, spec) == 804 + + def test_compute_encoded_size_with_transpose(self) -> None: + # TransposeCodec reorders axes but doesn't change the byte count. + # Verifies that compute_encoded_size walks through AA codecs correctly. + spec = _make_array_spec((3, 4), np.dtype("float64")) + chain = ChunkTransform(codecs=(TransposeCodec(order=(1, 0)), BytesCodec()), array_spec=spec) + assert chain.compute_encoded_size(96, spec) == 96 + + def test_encode_returns_none_propagation(self) -> None: + # When an AA codec returns None (signaling "this chunk is the fill value, + # don't store it"), encode must short-circuit and return None + # instead of passing None into the next codec. + + class NoneReturningAACodec(TransposeCodec): + """An ArrayArrayCodec that always returns None from encode.""" + + def _encode_sync(self, chunk_array: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer | None: + return None + + spec = _make_array_spec((3, 4), np.dtype("float64")) + chain = ChunkTransform( + codecs=(NoneReturningAACodec(order=(1, 0)), BytesCodec()), + array_spec=spec, + ) + arr = np.arange(12, dtype="float64").reshape(3, 4) + nd_buf = _make_nd_buffer(arr) + assert chain.encode(nd_buf) is None + + def test_encode_decode_roundtrip_with_crc32c(self) -> None: + # Round-trip through BytesCodec + Crc32cCodec. Crc32c appends a checksum + # on encode and verifies it on decode, so this tests that the BB codec + # pipeline runs correctly in both directions. + arr = np.arange(100, dtype="float64") + spec = _make_array_spec(arr.shape, arr.dtype) + chain = ChunkTransform(codecs=(BytesCodec(), Crc32cCodec()), array_spec=spec) + nd_buf = _make_nd_buffer(arr) + + encoded = chain.encode(nd_buf) + assert encoded is not None + decoded = chain.decode(encoded) + np.testing.assert_array_equal(arr, decoded.as_numpy_array()) + + def test_encode_decode_roundtrip_int32(self) -> None: + # Round-trip with int32 data to verify that the codec chain is not + # float-specific. Exercises a different dtype path through BytesCodec. + arr = np.arange(50, dtype="int32") + spec = _make_array_spec(arr.shape, arr.dtype) + chain = ChunkTransform(codecs=(BytesCodec(), ZstdCodec(level=1)), array_spec=spec) + nd_buf = _make_nd_buffer(arr) + + encoded = chain.encode(nd_buf) + assert encoded is not None + decoded = chain.decode(encoded) + np.testing.assert_array_equal(arr, decoded.as_numpy_array())