diff --git a/tests/blosc810_findings.md b/tests/blosc810_findings.md new file mode 100644 index 00000000..1c1d0001 --- /dev/null +++ b/tests/blosc810_findings.md @@ -0,0 +1,54 @@ +# numcodecs#810 investigation findings + +Env: numcodecs 0.15.1, blosc clib 1.21.6, zarr 2.18.2, xarray 2026.4.0, dask 2026.3.0. +Source codec: `Blosc(cname='lz4', clevel=5, shuffle=SHUFFLE, blocksize=0)`, float32, chunk (1080,1440). + +## Error +`RuntimeError: error during blosc decompression: -1` (numcodecs/blosc.pyx:414), +intermittent, during distributed (coiled) reads of a blosc-compressed zarr on S3. +Reported mitigations BLOSC_NTHREADS=1 / BLOSC_NOLOCK=1 / NUMEXPR_NUM_THREADS=1 did not help. + +## What did NOT reproduce it +Standalone `Blosc.decode()` hammered concurrently (no S3, no dask, no zarr): +- 48 threads x 3000 iters x 8 chunks = 144k concurrent decodes, blosc nthreads=1: clean. +- 32 threads x 2000 iters, blosc internal nthreads=4 and =8: clean. +=> Concurrent in-memory decode of intact buffers is thread-safe here. The race +theory (numcodecs/blosc thread-safety) is not supported by these runs. + +## What DID reproduce the exact error (deterministic) +Feeding blosc a **truncated** compressed buffer reproduces the identical error: + enc = codec.encode(float32 (1080,1440)) # len 4,944,670 bytes + codec.decode(enc[:int(len*0.99)]) -> RuntimeError: error during blosc decompression: -1 + codec.decode(enc[:int(len*0.50)]) -> same + codec.decode(enc[:int(len*0.999)]) -> same + +## Hypothesis +The `-1` is blosc correctly rejecting an **incomplete/truncated input buffer**. The bug +is therefore upstream of the codec: under high read concurrency, the chunk bytes handed +to blosc are occasionally short (a partial / mis-retried S3 range request, or a +connection-pool race in s3fs/fsspec/zarr). This explains: + - BLOSC_NTHREADS=1 not helping (not a blosc thread bug), + - failures clustering near completion of large jobs (more chunk reads = more chances), + - the absence of any pure-codec reproducer. + +## Real-world traceback captured + +Confirms the failure is on the READ/decode path during a distributed write +(`to_zarr` reading & decompressing source chunks from S3), exactly matching the +truncation finding above: + +``` + File ".../dask/array/core.py", line 1220, in store + dask.compute(arrays, **kwargs) + ... + File ".../zarr/core.py", line 2187, in _chunk_getitems + File ".../zarr/core.py", line 2100, in _process_chunk + File ".../zarr/core.py", line 2356, in _decode_chunk + File "numcodecs/blosc.pyx", line 585, in numcodecs.blosc.Blosc.decode + File "numcodecs/blosc.pyx", line 414, in numcodecs.blosc.decompress +RuntimeError: error during blosc decompression: -1 +``` + +The error is raised inside `_decode_chunk` (a read) -- blosc was handed the bytes of +a chunk just fetched from the store -> consistent with a truncated read from the +storage/transport layer (s3fs/fsspec/zarr), not a codec thread-safety race. diff --git a/tests/reproduce_blosc810.py b/tests/reproduce_blosc810.py new file mode 100644 index 00000000..5f6a1b4d --- /dev/null +++ b/tests/reproduce_blosc810.py @@ -0,0 +1,219 @@ +"""Reproducer for numcodecs#810: ``RuntimeError: error during blosc decompression: -1``. + +Context +------- +The error was first seen intermittently while reading a blosc-compressed Zarr store +from S3 across many Dask/Coiled workers, near the end of large jobs. The reported +mitigations (BLOSC_NTHREADS=1, BLOSC_NOLOCK=1, NUMEXPR_NUM_THREADS=1) did not help. + +Findings (see the three parts below) +------------------------------------ +1. DETERMINISTIC: feeding blosc a *truncated/incomplete* compressed buffer raises the + exact ``-1`` error. So ``-1`` is blosc correctly rejecting bad input bytes. +2. NEGATIVE CONTROL: hammering ``Blosc.decode()`` concurrently on *intact* buffers + (tens of thousands of decodes, with and without blosc's internal threadpool) does + NOT reproduce it. The codec itself is not the culprit. +3. FIELD REPRO (opt-in, needs S3): fetch raw chunk bytes concurrently and compare each + buffer's length to the size declared in its own blosc header (``cbytes``). A + mismatch proves the storage/transport layer (s3fs/fsspec/zarr) occasionally hands + blosc a short read -- which then surfaces as the ``-1`` from part 1. + +Conclusion: this is most likely a truncated-read bug upstream of numcodecs, not a +codec thread-safety race. Parts 1+2 are runnable anywhere with just numpy+numcodecs. + +Run +--- + python reproduce_blosc810.py # parts 1 and 2 (deterministic) + BLOSC810_S3_URL=s3://bucket/path/OM4.zarr \ + BLOSC810_S3_ENDPOINT=https://nyu1.osn.mghpcc.org/ \ + AWS_ACCESS_KEY_ID=... AWS_SECRET_ACCESS_KEY=... \ + python reproduce_blosc810.py # also runs part 3 (field repro) + +Also importable as pytest tests (the test_* functions). + +Env: numcodecs 0.15.1, c-blosc 1.21.6, zarr 2.18.2 (but only numcodecs+numpy are +required for parts 1 and 2). +""" + +from __future__ import annotations + +import os +import struct +import sys +import threading +from concurrent.futures import ThreadPoolExecutor, as_completed + +import numpy as np +from numcodecs import Blosc +from numcodecs import blosc as ncb + +# Match the codec on the OM4 source arrays exactly. +CODEC = Blosc(cname="lz4", clevel=5, shuffle=Blosc.SHUFFLE) +SHAPE = (1080, 1440) # a real chunk: float32 ~ 6.2 MB +MINUS_ONE = "error during blosc decompression: -1" + +# Blosc1 header is 16 bytes; the compressed total size (incl. header) is a +# little-endian uint32 at offset 12. https://github.com/Blosc/c-blosc +_BLOSC_HEADER = struct.Struct(" bytes: + rng = np.random.default_rng(seed) + # smooth-ish, semi-compressible float32, like an ocean field + arr = np.cumsum(rng.standard_normal(SHAPE).astype("float32"), axis=1) + return CODEC.encode(np.ascontiguousarray(arr)) + + +def blosc_cbytes(buf: bytes) -> int: + """Compressed total size (including header) that the blosc header *claims*.""" + return _BLOSC_HEADER.unpack_from(buf, 0)[6] + + +# --------------------------------------------------------------------------- # +# Part 1 -- DETERMINISTIC reproduction: a truncated buffer yields exactly -1. +# --------------------------------------------------------------------------- # +def test_truncated_buffer_reproduces_minus_one() -> None: + enc = _sample_chunk() + # sanity: the full buffer decodes + CODEC.decode(enc) + declared = blosc_cbytes(enc) + assert declared == len(enc), (declared, len(enc)) + + reproduced = 0 + for frac in (0.999, 0.99, 0.5, 0.1): + truncated = enc[: int(len(enc) * frac)] + try: + CODEC.decode(truncated) + except RuntimeError as e: + assert MINUS_ONE in str(e), str(e) + reproduced += 1 + assert reproduced == 4 + return reproduced + + +# --------------------------------------------------------------------------- # +# Part 2 -- NEGATIVE CONTROL: concurrent decode of intact buffers is clean. +# --------------------------------------------------------------------------- # +def test_concurrent_decode_of_intact_buffers_is_clean( + n_threads: int = 32, n_iters: int = 2000, n_chunks: int = 8, blosc_nthreads: int = 1 +) -> int: + ncb.set_nthreads(blosc_nthreads) + chunks = [_sample_chunk(i) for i in range(n_chunks)] + sizes = [len(CODEC.decode(c)) for c in chunks] + errors: list[tuple] = [] + lock = threading.Lock() + + def worker(tid: int) -> None: + for j in range(n_iters): + idx = (tid + j) % n_chunks + try: + out = CODEC.decode(chunks[idx]) + except Exception as e: # noqa: BLE001 - we want any failure + with lock: + errors.append((tid, j, type(e).__name__, str(e))) + return + if len(out) != sizes[idx]: + with lock: + errors.append((tid, j, "ShortDecode", f"{len(out)} != {sizes[idx]}")) + return + + with ThreadPoolExecutor(max_workers=n_threads) as ex: + for f in as_completed([ex.submit(worker, t) for t in range(n_threads)]): + f.result() + + assert not errors, errors[:5] + return n_threads * n_iters + + +# --------------------------------------------------------------------------- # +# Part 3 -- FIELD REPRO (opt-in): catch a truncated chunk read from S3. +# --------------------------------------------------------------------------- # +def field_repro_s3(url: str, endpoint: str | None, rounds: int = 50, n_threads: int = 64) -> None: + """Concurrently fetch raw chunk bytes for one array and verify length vs header. + + For every chunk object under ``//`` we read the raw (still-compressed) + bytes and compare ``len(bytes)`` to the ``cbytes`` declared by that chunk's own + blosc header. A mismatch is a short read from the storage layer -- the upstream + cause of the ``-1`` decode error. We also attempt ``CODEC.decode`` and record any + ``-1``. Repeating ``rounds`` times amplifies the intermittent failure. + """ + import s3fs # lazy: only needed for the S3 path + + client_kwargs = {"endpoint_url": endpoint} if endpoint else {} + fs = s3fs.S3FileSystem(client_kwargs=client_kwargs) + root = url[len("s3://"):] if url.startswith("s3://") else url + + # Pick a blosc-compressed array directory with many chunks. + array_dir = None + for name in ("thetao", "so", "uo", "vo", "zos"): + cand = f"{root}/{name}" + if fs.exists(cand): + array_dir = cand + break + if array_dir is None: + raise SystemExit(f"no candidate array under {root}") + + chunk_keys = [ + k for k in fs.find(array_dir) + if not k.rsplit("/", 1)[-1].startswith(".") # skip .zarray/.zattrs + ] + print(f"[field] array={array_dir} chunks={len(chunk_keys)} " + f"rounds={rounds} threads={n_threads}") + + short_reads: list[tuple] = [] + decode_errors: list[tuple] = [] + lock = threading.Lock() + + def check(key: str, rnd: int) -> None: + try: + buf = fs.cat_file(key) + except Exception as e: # noqa: BLE001 + with lock: + short_reads.append((rnd, key, "FetchError", str(e))) + return + declared = blosc_cbytes(buf) + if len(buf) != declared: + with lock: + short_reads.append((rnd, key, "LengthMismatch", f"{len(buf)} != {declared}")) + try: + CODEC.decode(buf) + except RuntimeError as e: + if MINUS_ONE in str(e): + with lock: + decode_errors.append((rnd, key, str(e))) + + with ThreadPoolExecutor(max_workers=n_threads) as ex: + futs = [] + for rnd in range(rounds): + for key in chunk_keys: + futs.append(ex.submit(check, key, rnd)) + for f in as_completed(futs): + f.result() + + n = rounds * len(chunk_keys) + print(f"[field] {n} concurrent chunk fetches") + if short_reads or decode_errors: + print(f"[field] REPRODUCED upstream truncation: " + f"{len(short_reads)} short reads, {len(decode_errors)} decode -1") + for r in (short_reads + decode_errors)[:10]: + print(" ", r) + sys.exit(1) + print("[field] no short read this run (intermittent -- raise rounds/threads, rerun)") + + +def main() -> None: + print(f"numcodecs blosc clib {ncb.__version__} ({getattr(ncb, 'VERSION_STRING', '?')})") + n = test_truncated_buffer_reproduces_minus_one() + print(f"[part1] DETERMINISTIC: truncated buffer -> '{MINUS_ONE}' ({n}/4 truncations)") + total = test_concurrent_decode_of_intact_buffers_is_clean() + print(f"[part2] negative control: {total} concurrent intact decodes, no error") + + url = os.environ.get("BLOSC810_S3_URL") + if url: + field_repro_s3(url, os.environ.get("BLOSC810_S3_ENDPOINT")) + else: + print("[part3] skipped (set BLOSC810_S3_URL + creds to run the S3 field repro)") + + +if __name__ == "__main__": + main()