From cc6187e44827d18c8442d56d44fcd8ec8da92cdb Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Thu, 4 Jun 2026 17:07:52 -0700 Subject: [PATCH 1/2] Reproduce #810 Claude was able to diagnose and properly create a reproduction of #810. This is it's hypothesis and a script to explain what the root cause is. The implication is that blosc is doing the correct thing here, but there might be something else we could do in the Zarr-Python ecosystem to make Zarr robust to this class of error. --- tests/blosc810_findings.md | 32 ++++++ tests/reproduce_blosc810.py | 219 ++++++++++++++++++++++++++++++++++++ 2 files changed, 251 insertions(+) create mode 100644 tests/blosc810_findings.md create mode 100644 tests/reproduce_blosc810.py diff --git a/tests/blosc810_findings.md b/tests/blosc810_findings.md new file mode 100644 index 00000000..18be2780 --- /dev/null +++ b/tests/blosc810_findings.md @@ -0,0 +1,32 @@ +# numcodecs#810 investigation findings + +Env: numcodecs 0.15.1, blosc clib 1.21.6, zarr 2.18.2, xarray 2026.4.0, dask 2026.3.0. +Source codec: `Blosc(cname='lz4', clevel=5, shuffle=SHUFFLE, blocksize=0)`, float32, chunk (1080,1440). + +## Error +`RuntimeError: error during blosc decompression: -1` (numcodecs/blosc.pyx:414), +intermittent, during distributed (coiled) reads of a blosc-compressed zarr on S3. +Reported mitigations BLOSC_NTHREADS=1 / BLOSC_NOLOCK=1 / NUMEXPR_NUM_THREADS=1 did not help. + +## What did NOT reproduce it +Standalone `Blosc.decode()` hammered concurrently (no S3, no dask, no zarr): +- 48 threads x 3000 iters x 8 chunks = 144k concurrent decodes, blosc nthreads=1: clean. +- 32 threads x 2000 iters, blosc internal nthreads=4 and =8: clean. +=> Concurrent in-memory decode of intact buffers is thread-safe here. The race +theory (numcodecs/blosc thread-safety) is not supported by these runs. + +## What DID reproduce the exact error (deterministic) +Feeding blosc a **truncated** compressed buffer reproduces the identical error: + enc = codec.encode(float32 (1080,1440)) # len 4,944,670 bytes + codec.decode(enc[:int(len*0.99)]) -> RuntimeError: error during blosc decompression: -1 + codec.decode(enc[:int(len*0.50)]) -> same + codec.decode(enc[:int(len*0.999)]) -> same + +## Hypothesis +The `-1` is blosc correctly rejecting an **incomplete/truncated input buffer**. The bug +is therefore upstream of the codec: under high read concurrency, the chunk bytes handed +to blosc are occasionally short (a partial / mis-retried S3 range request, or a +connection-pool race in s3fs/fsspec/zarr). This explains: + - BLOSC_NTHREADS=1 not helping (not a blosc thread bug), + - failures clustering near completion of large jobs (more chunk reads = more chances), + - the absence of any pure-codec reproducer. \ No newline at end of file diff --git a/tests/reproduce_blosc810.py b/tests/reproduce_blosc810.py new file mode 100644 index 00000000..5f6a1b4d --- /dev/null +++ b/tests/reproduce_blosc810.py @@ -0,0 +1,219 @@ +"""Reproducer for numcodecs#810: ``RuntimeError: error during blosc decompression: -1``. + +Context +------- +The error was first seen intermittently while reading a blosc-compressed Zarr store +from S3 across many Dask/Coiled workers, near the end of large jobs. The reported +mitigations (BLOSC_NTHREADS=1, BLOSC_NOLOCK=1, NUMEXPR_NUM_THREADS=1) did not help. + +Findings (see the three parts below) +------------------------------------ +1. DETERMINISTIC: feeding blosc a *truncated/incomplete* compressed buffer raises the + exact ``-1`` error. So ``-1`` is blosc correctly rejecting bad input bytes. +2. NEGATIVE CONTROL: hammering ``Blosc.decode()`` concurrently on *intact* buffers + (tens of thousands of decodes, with and without blosc's internal threadpool) does + NOT reproduce it. The codec itself is not the culprit. +3. FIELD REPRO (opt-in, needs S3): fetch raw chunk bytes concurrently and compare each + buffer's length to the size declared in its own blosc header (``cbytes``). A + mismatch proves the storage/transport layer (s3fs/fsspec/zarr) occasionally hands + blosc a short read -- which then surfaces as the ``-1`` from part 1. + +Conclusion: this is most likely a truncated-read bug upstream of numcodecs, not a +codec thread-safety race. Parts 1+2 are runnable anywhere with just numpy+numcodecs. + +Run +--- + python reproduce_blosc810.py # parts 1 and 2 (deterministic) + BLOSC810_S3_URL=s3://bucket/path/OM4.zarr \ + BLOSC810_S3_ENDPOINT=https://nyu1.osn.mghpcc.org/ \ + AWS_ACCESS_KEY_ID=... AWS_SECRET_ACCESS_KEY=... \ + python reproduce_blosc810.py # also runs part 3 (field repro) + +Also importable as pytest tests (the test_* functions). + +Env: numcodecs 0.15.1, c-blosc 1.21.6, zarr 2.18.2 (but only numcodecs+numpy are +required for parts 1 and 2). +""" + +from __future__ import annotations + +import os +import struct +import sys +import threading +from concurrent.futures import ThreadPoolExecutor, as_completed + +import numpy as np +from numcodecs import Blosc +from numcodecs import blosc as ncb + +# Match the codec on the OM4 source arrays exactly. +CODEC = Blosc(cname="lz4", clevel=5, shuffle=Blosc.SHUFFLE) +SHAPE = (1080, 1440) # a real chunk: float32 ~ 6.2 MB +MINUS_ONE = "error during blosc decompression: -1" + +# Blosc1 header is 16 bytes; the compressed total size (incl. header) is a +# little-endian uint32 at offset 12. https://github.com/Blosc/c-blosc +_BLOSC_HEADER = struct.Struct(" bytes: + rng = np.random.default_rng(seed) + # smooth-ish, semi-compressible float32, like an ocean field + arr = np.cumsum(rng.standard_normal(SHAPE).astype("float32"), axis=1) + return CODEC.encode(np.ascontiguousarray(arr)) + + +def blosc_cbytes(buf: bytes) -> int: + """Compressed total size (including header) that the blosc header *claims*.""" + return _BLOSC_HEADER.unpack_from(buf, 0)[6] + + +# --------------------------------------------------------------------------- # +# Part 1 -- DETERMINISTIC reproduction: a truncated buffer yields exactly -1. +# --------------------------------------------------------------------------- # +def test_truncated_buffer_reproduces_minus_one() -> None: + enc = _sample_chunk() + # sanity: the full buffer decodes + CODEC.decode(enc) + declared = blosc_cbytes(enc) + assert declared == len(enc), (declared, len(enc)) + + reproduced = 0 + for frac in (0.999, 0.99, 0.5, 0.1): + truncated = enc[: int(len(enc) * frac)] + try: + CODEC.decode(truncated) + except RuntimeError as e: + assert MINUS_ONE in str(e), str(e) + reproduced += 1 + assert reproduced == 4 + return reproduced + + +# --------------------------------------------------------------------------- # +# Part 2 -- NEGATIVE CONTROL: concurrent decode of intact buffers is clean. +# --------------------------------------------------------------------------- # +def test_concurrent_decode_of_intact_buffers_is_clean( + n_threads: int = 32, n_iters: int = 2000, n_chunks: int = 8, blosc_nthreads: int = 1 +) -> int: + ncb.set_nthreads(blosc_nthreads) + chunks = [_sample_chunk(i) for i in range(n_chunks)] + sizes = [len(CODEC.decode(c)) for c in chunks] + errors: list[tuple] = [] + lock = threading.Lock() + + def worker(tid: int) -> None: + for j in range(n_iters): + idx = (tid + j) % n_chunks + try: + out = CODEC.decode(chunks[idx]) + except Exception as e: # noqa: BLE001 - we want any failure + with lock: + errors.append((tid, j, type(e).__name__, str(e))) + return + if len(out) != sizes[idx]: + with lock: + errors.append((tid, j, "ShortDecode", f"{len(out)} != {sizes[idx]}")) + return + + with ThreadPoolExecutor(max_workers=n_threads) as ex: + for f in as_completed([ex.submit(worker, t) for t in range(n_threads)]): + f.result() + + assert not errors, errors[:5] + return n_threads * n_iters + + +# --------------------------------------------------------------------------- # +# Part 3 -- FIELD REPRO (opt-in): catch a truncated chunk read from S3. +# --------------------------------------------------------------------------- # +def field_repro_s3(url: str, endpoint: str | None, rounds: int = 50, n_threads: int = 64) -> None: + """Concurrently fetch raw chunk bytes for one array and verify length vs header. + + For every chunk object under ``//`` we read the raw (still-compressed) + bytes and compare ``len(bytes)`` to the ``cbytes`` declared by that chunk's own + blosc header. A mismatch is a short read from the storage layer -- the upstream + cause of the ``-1`` decode error. We also attempt ``CODEC.decode`` and record any + ``-1``. Repeating ``rounds`` times amplifies the intermittent failure. + """ + import s3fs # lazy: only needed for the S3 path + + client_kwargs = {"endpoint_url": endpoint} if endpoint else {} + fs = s3fs.S3FileSystem(client_kwargs=client_kwargs) + root = url[len("s3://"):] if url.startswith("s3://") else url + + # Pick a blosc-compressed array directory with many chunks. + array_dir = None + for name in ("thetao", "so", "uo", "vo", "zos"): + cand = f"{root}/{name}" + if fs.exists(cand): + array_dir = cand + break + if array_dir is None: + raise SystemExit(f"no candidate array under {root}") + + chunk_keys = [ + k for k in fs.find(array_dir) + if not k.rsplit("/", 1)[-1].startswith(".") # skip .zarray/.zattrs + ] + print(f"[field] array={array_dir} chunks={len(chunk_keys)} " + f"rounds={rounds} threads={n_threads}") + + short_reads: list[tuple] = [] + decode_errors: list[tuple] = [] + lock = threading.Lock() + + def check(key: str, rnd: int) -> None: + try: + buf = fs.cat_file(key) + except Exception as e: # noqa: BLE001 + with lock: + short_reads.append((rnd, key, "FetchError", str(e))) + return + declared = blosc_cbytes(buf) + if len(buf) != declared: + with lock: + short_reads.append((rnd, key, "LengthMismatch", f"{len(buf)} != {declared}")) + try: + CODEC.decode(buf) + except RuntimeError as e: + if MINUS_ONE in str(e): + with lock: + decode_errors.append((rnd, key, str(e))) + + with ThreadPoolExecutor(max_workers=n_threads) as ex: + futs = [] + for rnd in range(rounds): + for key in chunk_keys: + futs.append(ex.submit(check, key, rnd)) + for f in as_completed(futs): + f.result() + + n = rounds * len(chunk_keys) + print(f"[field] {n} concurrent chunk fetches") + if short_reads or decode_errors: + print(f"[field] REPRODUCED upstream truncation: " + f"{len(short_reads)} short reads, {len(decode_errors)} decode -1") + for r in (short_reads + decode_errors)[:10]: + print(" ", r) + sys.exit(1) + print("[field] no short read this run (intermittent -- raise rounds/threads, rerun)") + + +def main() -> None: + print(f"numcodecs blosc clib {ncb.__version__} ({getattr(ncb, 'VERSION_STRING', '?')})") + n = test_truncated_buffer_reproduces_minus_one() + print(f"[part1] DETERMINISTIC: truncated buffer -> '{MINUS_ONE}' ({n}/4 truncations)") + total = test_concurrent_decode_of_intact_buffers_is_clean() + print(f"[part2] negative control: {total} concurrent intact decodes, no error") + + url = os.environ.get("BLOSC810_S3_URL") + if url: + field_repro_s3(url, os.environ.get("BLOSC810_S3_ENDPOINT")) + else: + print("[part3] skipped (set BLOSC810_S3_URL + creds to run the S3 field repro)") + + +if __name__ == "__main__": + main() From 8aa51362c4764fcd13a7e6ec382a25ea35374640 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Thu, 4 Jun 2026 17:17:18 -0700 Subject: [PATCH 2/2] Recent repro in our pipeline, documented by Claude. --- tests/blosc810_findings.md | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/tests/blosc810_findings.md b/tests/blosc810_findings.md index 18be2780..1c1d0001 100644 --- a/tests/blosc810_findings.md +++ b/tests/blosc810_findings.md @@ -29,4 +29,26 @@ to blosc are occasionally short (a partial / mis-retried S3 range request, or a connection-pool race in s3fs/fsspec/zarr). This explains: - BLOSC_NTHREADS=1 not helping (not a blosc thread bug), - failures clustering near completion of large jobs (more chunk reads = more chances), - - the absence of any pure-codec reproducer. \ No newline at end of file + - the absence of any pure-codec reproducer. + +## Real-world traceback captured + +Confirms the failure is on the READ/decode path during a distributed write +(`to_zarr` reading & decompressing source chunks from S3), exactly matching the +truncation finding above: + +``` + File ".../dask/array/core.py", line 1220, in store + dask.compute(arrays, **kwargs) + ... + File ".../zarr/core.py", line 2187, in _chunk_getitems + File ".../zarr/core.py", line 2100, in _process_chunk + File ".../zarr/core.py", line 2356, in _decode_chunk + File "numcodecs/blosc.pyx", line 585, in numcodecs.blosc.Blosc.decode + File "numcodecs/blosc.pyx", line 414, in numcodecs.blosc.decompress +RuntimeError: error during blosc decompression: -1 +``` + +The error is raised inside `_decode_chunk` (a read) -- blosc was handed the bytes of +a chunk just fetched from the store -> consistent with a truncated read from the +storage/transport layer (s3fs/fsspec/zarr), not a codec thread-safety race.