Skip to content

feat: Implement ZSTD compression for DA blobs#3104

Open
alpe wants to merge 11 commits intomainfrom
alex/blob-compression
Open

feat: Implement ZSTD compression for DA blobs#3104
alpe wants to merge 11 commits intomainfrom
alex/blob-compression

Conversation

@alpe
Copy link
Contributor

@alpe alpe commented Feb 24, 2026

Resolves #2532

Overview

  • Compress via ZSTD
  • Dynamic compression level selection by load.

Ref:
Benchmarks Zstd
ethereum-optimism/optimistic-specs#174

Arbitrum uses Brotli with dynamic compaction levels: Level 11 for low traffic; Level 0-1 for spikes.
https://docs.arbitrum.io/how-arbitrum-works/inside-arbitrum-nitro#:~:text=The%20data%20then%20undergoes%20compression,toward%20speed%20during%20heavy%20backlogs.

Summary by CodeRabbit

  • New Features

    • Added transparent compression for data submissions with automatic level selection, decompression on retrieval, compression metrics, and skipping/logging of corrupted items.
  • Documentation

    • Added guideline: error messages must not start with "failed to".
  • Chores

    • Bumped klauspost/compress dependency to v1.18.4 across modules.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 24, 2026

📝 Walkthrough

Walkthrough

Adds zstd-based compression support and integration: new compression package (pkg/da), DA client now compresses blobs before Submit and decompresses on retrieval, dependency bumps for klauspost/compress, tests and small doc guideline change added.

Changes

Cohort / File(s) Summary
Documentation & Conventions
CLAUDE.md
Added error-handling guideline: do not start error messages with "failed to".
Dependency updates
go.mod, apps/evm/go.mod, apps/grpc/go.mod, apps/testapp/go.mod, execution/evm/test/go.mod, test/e2e/go.mod
Bumped github.com/klauspost/compress from v1.18.0 → v1.18.4 (moved to direct require in root; submodules updated).
Compression implementation
pkg/da/compression.go
New zstd-based compression API: CompressionLevel enum, Compress/Decompress/IsCompressed, magic-prefix, size limits, timeout-limited decompression, and ErrDecompressedSizeExceeded.
Compression tests
pkg/da/compression_test.go
Extensive unit tests covering round-trips, levels, edge cases, corruption, context cancellation, oversized-frame detection, and IsCompressed behavior.
DA client integration
block/internal/da/client.go
Submit signature extended with options param; per-blob dynamic compression level selection, compress-before-submit, logging of compression metrics, enforce compressed-size limits, decompression on retrieval with corrupted items skipped.
Test refactors / usage
node/execution_test.go, pkg/da/types/namespace_test.go, test/e2e/failover_e2e_test.go
Replaced manual checks with testify/require in tests; added pre-unmarshal decompression (da.IsCompressed/da.Decompress) when extracting block data.

Sequence Diagram

sequenceDiagram
    participant Client as DA Client
    participant Compress as pkg/da (Compress/IsCompressed)
    participant Submitter as DA Submit/Storage
    participant Retriever as DA Retrieve
    participant Decompress as pkg/da (Decompress)

    Client->>Client: determine compression level (batch size)
    Client->>Compress: Compress(blob, level)
    Compress-->>Client: compressed blob (magic prefix)
    Client->>Submitter: Submit(compressed blob, options)
    Submitter-->>Client: submission result

    Retriever->>Submitter: fetch stored blob(s)
    Submitter-->>Retriever: blob bytes
    Retriever->>Compress: IsCompressed(blob)
    alt compressed
        Retriever->>Decompress: Decompress(blob)
        Decompress-->>Retriever: original data
    else not compressed
        Compress-->>Retriever: blob unchanged
    end
    Retriever-->>Client: returned blob data (or skipped on corruption)
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

Suggested labels

T:dependencies, T:testing

Suggested reviewers

  • tac0turtle
  • tzdybal
  • gupadhyaya
  • tuxcanfly
  • randygrok

Poem

🐰 I zip and tuck each tiny blob,
A magic prefix, snug as a bob.
Fewer bytes to hop and send,
Faster trails from end to end.
Hooray — compressed, then decompressed with love.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 17.39% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and directly summarizes the main change: implementing ZSTD compression for DA blobs, which aligns perfectly with the core objectives.
Description check ✅ Passed The description adequately covers the Overview section with clear explanation of compression implementation and dynamic level selection, includes issue reference, and provides relevant benchmarking context.
Linked Issues check ✅ Passed The PR successfully implements core requirements from issue #2532: ZSTD compression algorithm, dynamic compression level selection by load, backward compatibility via magic header, and compression handling integrated into DA client.
Out of Scope Changes check ✅ Passed All changes are within scope: compression implementation (pkg/da/compression.go), DA client integration (block/internal/da/client.go), dependency updates for klauspost/compress, guideline documentation, and test refactoring for better maintainability.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch alex/blob-compression

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@claude
Copy link
Contributor

claude bot commented Feb 24, 2026

Claude finished @alpe's task in 2m 11s —— View job


Code Review: feat: Implement ZSTD compression for DA blobs

  • Read changed files
  • Review compression implementation
  • Review DA client integration
  • Review tests
  • Post comprehensive feedback

Overall this is a well-structured addition. The layered defense approach (frame header pre-check → decoder memory cap → timeout) is solid, test coverage is good, and backward compatibility via the magic prefix is correctly handled. I have a few issues worth addressing before merge.


pkg/da/compression.go

Duplicate constant — maxDecoderMemory vs maxDecompressedSize (init(), line 64)

maxDecoderMemory is a local constant inside init() that duplicates maxDecompressedSize at the package level. julienrbrt raised this directly. Since the decoder memory cap is derived from the same limit, they should share the constant:

// in init():
decoder, err = zstd.NewReader(nil, zstd.WithDecoderMaxMemory(maxDecompressedSize))

Fix this →

maxDecompressedSize (7MB) is inconsistent with common.DefaultMaxBlobSize (5MB)

block/internal/common/consts.go defines DefaultMaxBlobSize = 5 * 1024 * 1024, which is what the DA client uses to reject oversized compressed blobs before submission (line 120 of client.go). But the decompressor allows up to 7MB. This means a node could successfully receive a blob from the DA layer that decompresses to up to 7MB even though the submitter would never send one larger than 5MB. The comment says "Matches the … DA max blob size" but the values diverge. This should either be unified or explicitly documented.

Goroutine leak on timeout (line 124–127)

go func() {
    out, err := decoder.DecodeAll(payload, nil)
    ch <- decodeResult{data: out, err: err}
}()

When ctx.Done() fires, Decompress returns but the goroutine continues running DecodeAll. The buffered channel (size 1) means the goroutine won't block forever, but under repeated timeouts it can pile up background CPU work. The fix is to make the goroutine's send also respect the context:

go func() {
    out, err := decoder.DecodeAll(payload, nil)
    select {
    case ch <- decodeResult{data: out, err: err}:
    case <-ctx.Done():
    }
}()

Fix this →

decompressTimeout (500ms) is unrelated to blob size (line 24)

As julienrbrt noted, a fixed 500ms cap unrelated to maxDecompressedSize will become stale when the blob size limit increases. Consider expressing it as a function of the max size (e.g., maxDecompressedSize / (bytesPerMs)) or at least document the derivation with a comment that mentions maxDecompressedSize.

Missing "Layer 2" comment (line 119)

The comment jumps from "Layer 1" (frame header check) to "Layer 3" (timeout). The decoder memory cap in init() is presumably "Layer 2" — the comment structure is confusing.

IsCompressed — prefer bytes.HasPrefix (lines 145–148)

The manual 4-byte comparison is correct but non-idiomatic. bytes.HasPrefix(data, magic) is cleaner and requires only adding "bytes" to imports. This is minor but the Gemini review also called it out.


block/internal/da/client.go

"failed to" error message on line 132

Message: fmt.Sprintf("failed to build blob %d: %v", i, err),

CLAUDE.md explicitly states: "Never start an error message with 'failed to'". This was added in this PR and should be updated, e.g., "building blob %d: %v".

Fix this →

blobSize tracks uncompressed size (lines 76–79)

blobSize is set from the original data slices, but the actual bytes submitted to DA are the compressed blobs. The value ends up in BaseResult.BlobSize on both success and error paths — any caller using it for cost/quota estimation will see the wrong number. At minimum the existing BlobSize field should be documented as the uncompressed size, or a separate field added for the compressed size.

Magic numbers for compression level selection (lines 96–100)

case len(data) > 10:
    compLevel = da.LevelFastest
case len(data) > 3:
    compLevel = da.LevelDefault

These thresholds encode policy that operators may want to tune. As both CodeRabbit and julienrbrt noted, extract them as named constants (or config) so the decision criteria are self-documenting.

Silent blob skipping on decompression failure (lines 315–323, 404–411)

Skipping corrupt blobs with a Warn log means Retrieve can return StatusSuccess with a partial (or empty) data set when decompression fails for every blob. Callers have no way to distinguish "no blobs at this height" from "all blobs were corrupt". Consider returning an error when all blobs fail decompression, or at least surfacing partial failure clearly in the result code. This was flagged by both CodeRabbit and julienrbrt.


Performance Regression

The benchmark bot reported ~1.9x overhead on DASubmitter.SubmitData (931µs → 1761µs). This is expected for compression on the hot path, but worth acknowledging in the PR. Are there latency budgets or SLOs for DA submission that this change might violate? At minimum, document the expected trade-off.


Tests

TestCompress_AllLevelsRoundTrip — shared sizes slice (lines 63–83)

sizes is appended inside t.Run closures and read after the loop. This is a data race if any subtest ever calls t.Parallel(). The CodeRabbit suggestion to preallocate by index is the right fix.

TestDecompress_ContextCanceled — potential flakiness

The test uses a pre-cancelled context on []byte("test data for context cancellation") (34 bytes). For such small inputs, decoder.DecodeAll may complete before the select checks ctx.Done(), causing the success path to win. This test might pass intermittently even with a bug. Consider using a synthetic large input or an instrumented decoder to make this deterministic.


Summary

Severity Issue
⚠️ Major maxDecompressedSize (7MB) inconsistent with common.DefaultMaxBlobSize (5MB)
⚠️ Major Goroutine can outlive ctx timeout (CPU waste under load)
⚠️ Major Silent blob skip on decompression failure masks data loss
🔶 Moderate maxDecoderMemory duplicates maxDecompressedSize
🔶 Moderate decompressTimeout not tied to maxDecompressedSize
🔶 Moderate blobSize reports uncompressed size
🔶 Moderate Magic number thresholds (10, 3) for compression levels
🔵 Minor "failed to" error message on client.go:132 (CLAUDE.md violation)
🔵 Minor Missing "Layer 2" comment
🔵 Minor bytes.HasPrefix for IsCompressed
🔵 Minor Shared sizes slice in test

@github-actions
Copy link
Contributor

github-actions bot commented Feb 24, 2026

The latest Buf updates on your PR. Results from workflow CI / buf-check (pull_request).

BuildFormatLintBreakingUpdated (UTC)
✅ passed⏩ skipped✅ passed✅ passedFeb 26, 2026, 10:00 AM

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @alpe, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request integrates ZSTD compression for Data Availability (DA) blobs, enhancing the efficiency of data handling by reducing the size of blobs transmitted and stored. The changes involve updating the klauspost/compress library, introducing a dedicated compression utility package, and modifying the DA client to transparently apply compression during submission and decompression during retrieval. This optimization aims to decrease operational costs and improve performance without altering the external interface for blob operations.

Highlights

  • ZSTD Compression for DA Blobs: Implemented ZSTD compression for Data Availability (DA) blobs, significantly reducing their size before submission and after retrieval to optimize bandwidth and storage.
  • Dependency Update: Updated the klauspost/compress Go module dependency across various applications to version v1.18.4 to support the new compression features.
  • New Compression Package: Introduced a new pkg/da/compression package containing utility functions for ZSTD compression, decompression, and a custom magic prefix check for identifying compressed data.
  • DA Client Integration: Modified the DA client (block/internal/da/client.go) to automatically compress blobs before submission and decompress them upon retrieval, including logging compression ratios for monitoring.
  • Comprehensive Testing: Added comprehensive unit tests for the new compression and decompression logic to ensure data integrity, round-trip correctness, and efficiency across various data types and sizes.
  • Documentation Update: Updated development guidelines in CLAUDE.md to include a new rule against starting error messages with 'failed to'.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • CLAUDE.md
    • Added a new guideline for error message formatting.
  • apps/evm/go.mod
    • Updated github.com/klauspost/compress dependency to v1.18.4.
  • apps/evm/go.sum
    • Updated checksums for github.com/klauspost/compress.
  • apps/grpc/go.mod
    • Updated github.com/klauspost/compress dependency to v1.18.4.
  • apps/grpc/go.sum
    • Updated checksums for github.com/klauspost/compress.
  • apps/testapp/go.mod
    • Updated github.com/klauspost/compress dependency to v1.18.4.
  • apps/testapp/go.sum
    • Updated checksums for github.com/klauspost/compress.
  • block/internal/da/client.go
    • Integrated ZSTD compression for outgoing blobs and decompression for incoming blobs.
    • Added error handling for compression and decompression failures.
    • Included logging for compression ratios during blob submission.
  • execution/evm/go.sum
    • Updated checksums for github.com/klauspost/compress.
  • execution/evm/test/go.mod
    • Updated github.com/klauspost/compress dependency to v1.18.4.
  • execution/evm/test/go.sum
    • Updated checksums for github.com/klauspost/compress.
  • go.mod
    • Added github.com/klauspost/compress v1.18.4 as a direct dependency.
    • Removed github.com/klauspost/compress v1.18.0 as an indirect dependency.
  • go.sum
    • Updated checksums for github.com/klauspost/compress.
  • pkg/da/compression.go
    • Added a new file defining ZSTD compression and decompression functions.
    • Implemented a custom 4-byte magic prefix ('ZSTD') for compressed data identification.
    • Provided IsCompressed function to check for the magic prefix.
  • pkg/da/compression_test.go
    • Added unit tests to verify the correctness and efficiency of the ZSTD compression and decompression logic.
    • Included tests for edge cases like empty data, uncompressed passthrough, and corrupt data.
  • test/e2e/go.mod
    • Updated github.com/klauspost/compress dependency to v1.18.4.
  • test/e2e/go.sum
    • Updated checksums for github.com/klauspost/compress.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@codecov
Copy link

codecov bot commented Feb 24, 2026

Codecov Report

❌ Patch coverage is 69.38776% with 30 lines in your changes missing coverage. Please review.
✅ Project coverage is 60.94%. Comparing base (212ac08) to head (2a37443).

Files with missing lines Patch % Lines
block/internal/da/client.go 39.53% 22 Missing and 4 partials ⚠️
pkg/da/compression.go 92.72% 2 Missing and 2 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3104      +/-   ##
==========================================
- Coverage   60.95%   60.94%   -0.02%     
==========================================
  Files         113      114       +1     
  Lines       11617    11708      +91     
==========================================
+ Hits         7081     7135      +54     
- Misses       3738     3768      +30     
- Partials      798      805       +7     
Flag Coverage Δ
combined 60.94% <69.38%> (-0.02%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces ZSTD compression for DA blobs, which is a great feature for reducing bandwidth and storage costs. The implementation is well-contained within a new pkg/da package and includes a comprehensive set of tests. The changes in block/internal/da/client.go correctly integrate the compression and decompression logic. I have a few suggestions to improve the efficiency, readability, and style of the new compression package.

Comment on lines 39 to 46
compressed := encoder.EncodeAll(data, nil)

// Prepend magic prefix
result := make([]byte, len(magic)+len(compressed))
copy(result, magic)
copy(result[len(magic):], compressed)

return result, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current implementation of Compress can be made more memory-efficient. It currently involves at least two allocations. You can reduce this by creating a destination buffer with the magic prefix and then having EncodeAll append the compressed data to it.

Suggested change
compressed := encoder.EncodeAll(data, nil)
// Prepend magic prefix
result := make([]byte, len(magic)+len(compressed))
copy(result, magic)
copy(result[len(magic):], compressed)
return result, nil
dst := make([]byte, len(magic))
copy(dst, magic)
return encoder.EncodeAll(data, dst), nil

// Strip magic prefix and decompress
decompressed, err := decoder.DecodeAll(data[len(magic):], nil)
if err != nil {
return nil, fmt.Errorf("compression: zstd decompress failed: %w", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The new error handling guideline in CLAUDE.md suggests avoiding "failed to" in error messages. While this message doesn't use that exact phrase, "zstd decompress failed" can be improved to be more direct. The err != nil check already implies failure. Consider rephrasing to state the operation and wrap the underlying error.

Suggested change
return nil, fmt.Errorf("compression: zstd decompress failed: %w", err)
return nil, fmt.Errorf("compression: zstd decompression: %w", err)
References
  1. The project's coding guidelines in CLAUDE.md state to 'Never start an error message with "failed to"'. (link)

Comment on lines +71 to +74
return data[0] == magic[0] &&
data[1] == magic[1] &&
data[2] == magic[2] &&
data[3] == magic[3]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The manual byte-by-byte comparison to check for the magic prefix is correct, but using bytes.HasPrefix is more idiomatic and readable. You'll need to add import "bytes" to the file.

Suggested change
return data[0] == magic[0] &&
data[1] == magic[1] &&
data[2] == magic[2] &&
data[3] == magic[3]
return bytes.HasPrefix(data, magic)

alpe added 2 commits February 24, 2026 12:04
…on based on batch size.

(cherry picked from commit 3ff6211ec69d482437de79a617b5b39a88c91835)
@alpe alpe marked this pull request as ready for review February 24, 2026 15:28
@alpe alpe marked this pull request as draft February 24, 2026 15:29
@alpe
Copy link
Contributor Author

alpe commented Feb 24, 2026

E2e tests failed. I am not sure how this relates to the changes, yet.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (3)
block/internal/da/client.go (2)

74-79: BlobSize reports uncompressed size — document this or add a compressed-size field.

blobSize is calculated from the original data (pre-compression), but the bytes actually submitted to the DA layer are the compressed blobs. For operators relying on BlobSize for cost estimation or quota tracking, this mismatch can be misleading. Consider either documenting that BlobSize is the uncompressed size, or adding a separate field (e.g., CompressedBlobSize) so callers can distinguish.

Also applies to: 200-210

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/da/client.go` around lines 74 - 79, The current client.Submit
implementation computes blobSize from the original data slices (variable
blobSize) which reflects the uncompressed size while DA submissions use
compressed blobs; update Submit to either record and return both sizes by adding
a new CompressedBlobSize field to the datypes.ResultSubmit (in addition to the
existing BlobSize) and populate it with the length of the compressed payloads
before sending, or explicitly document in the Submit comment and datatypes that
BlobSize is the uncompressed size; change references in Submit and any callers
of datypes.ResultSubmit to use the new CompressedBlobSize when estimating
costs/quota.

91-100: Consider extracting compression-level thresholds as named constants.

The magic numbers 10 and 3 silently encode the backlog-pressure policy. Named constants (or a small config struct) would make the policy self-documenting and easier to tune without reading the switch logic.

Example
+const (
+	// highBacklogThreshold is the batch-size above which we use fastest compression.
+	highBacklogThreshold = 10
+	// moderateBacklogThreshold is the batch-size above which we use default compression.
+	moderateBacklogThreshold = 3
+)
+
 	compLevel := da.LevelBest
 	switch {
-	case len(data) > 10:
+	case len(data) > highBacklogThreshold:
 		compLevel = da.LevelFastest
-	case len(data) > 3:
+	case len(data) > moderateBacklogThreshold:
 		compLevel = da.LevelDefault
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/da/client.go` around lines 91 - 100, Extract the magic numeric
thresholds used in the compression selection switch into named constants (e.g.,
MinFastBatchSize = 10, MinDefaultBatchSize = 3) or into a small config struct,
then replace the literal `10` and `3` in the switch that sets `compLevel` (which
currently uses `da.LevelBest`, `da.LevelFastest`, and `da.LevelDefault`) with
those constants/fields so the backlog-pressure policy is self-documenting and
easily tunable without changing control flow.
pkg/da/compression_test.go (1)

61-81: Shared sizes slice across subtests is fragile if parallelized.

sizes is appended inside t.Run callbacks and then read after the loop. This works today because subtests run sequentially, but adding t.Parallel() in the future would introduce a data race. Consider collecting sizes outside the subtest or adding a brief comment warning against parallelizing these subtests.

One alternative: move the cross-level assertion into its own subtest
-	var sizes []int
 	for _, lvl := range levels {
+		lvl := lvl
 		t.Run(lvl.name, func(t *testing.T) {
 			compressed, err := Compress(data, lvl.level)
 			require.NoError(t, err)
 			assert.True(t, IsCompressed(compressed))
 
-			sizes = append(sizes, len(compressed))
-
 			decompressed, err := Decompress(compressed)
 			require.NoError(t, err)
 			assert.Equal(t, data, decompressed)
 
 			t.Logf("level=%s compressed=%d ratio=%.4f", lvl.name, len(compressed), float64(len(compressed))/float64(len(data)))
 		})
 	}
 
-	// Best should produce equal or smaller output than Fastest
-	if len(sizes) == 3 {
-		assert.LessOrEqual(t, sizes[2], sizes[0], "LevelBest should compress at least as well as LevelFastest")
-	}
+	t.Run("best_leq_fastest", func(t *testing.T) {
+		fastest, err := Compress(data, LevelFastest)
+		require.NoError(t, err)
+		best, err := Compress(data, LevelBest)
+		require.NoError(t, err)
+		assert.LessOrEqual(t, len(best), len(fastest),
+			"LevelBest should compress at least as well as LevelFastest")
+	})
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/da/compression_test.go` around lines 61 - 81, The test currently appends
to a shared sizes slice inside each t.Run which is unsafe if subtests are
parallelized; change to preallocate sizes := make([]int, len(levels)) before the
loop, capture the loop index (e.g., idx := i) inside the t.Run closure, and
assign sizes[idx] = len(compressed) instead of appending; keep the cross-level
assertion after the loop (or move it into its own t.Run) and add a short comment
on not calling t.Parallel() here to avoid races if someone enables parallel
subtests; refer to the t.Run loop, levels, Compress and Decompress calls and the
sizes slice when making the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@block/internal/da/client.go`:
- Around line 314-323: The decompression-error branch in Retrieve (where
da.Decompress returns decompErr) returns a ResultRetrieve without setting
BaseResult.Timestamp; update that error return to set Timestamp to the existing
block time (use the blockTime variable if available, otherwise time.Now()) so
the ResultRetrieve mirrors other error branches; modify the return constructing
datypes.ResultRetrieve / datypes.BaseResult in that decompErr branch to include
Timestamp: blockTime (or time.Now()) to ensure non-zero timestamps for callers.
- Around line 113-118: The log computes a compression ratio using
float64(len(compressed))/float64(len(raw)) which yields NaN for zero-length raw;
fix by computing the ratio before logging and guarding when len(raw) == 0 (e.g.,
set ratio := 0.0 or omit the field), then call
c.logger.Debug().Int("original_size", len(raw)).Int("compressed_size",
len(compressed)).Float64("ratio", ratio).Int("level",
int(compLevel)).Msg("compressed blob for DA submission"); this ensures no
division by zero produces NaN in the logs while keeping the same c.logger.Debug
call and the raw/compressed/compLevel identifiers.

In `@pkg/da/compression.go`:
- Around line 47-51: The zstd decoder is created without a max-memory cap
(decoder, err = zstd.NewReader(nil)), which risks OOM on untrusted DA blobs;
update the decoder initialization in pkg/da/compression.go to pass
zstd.WithDecoderMaxMemory(...) (e.g., 7*1024*1024 for a 7MB cap matching the DA
max blob size) so the decoder enforces a safe decompression memory limit.

---

Nitpick comments:
In `@block/internal/da/client.go`:
- Around line 74-79: The current client.Submit implementation computes blobSize
from the original data slices (variable blobSize) which reflects the
uncompressed size while DA submissions use compressed blobs; update Submit to
either record and return both sizes by adding a new CompressedBlobSize field to
the datypes.ResultSubmit (in addition to the existing BlobSize) and populate it
with the length of the compressed payloads before sending, or explicitly
document in the Submit comment and datatypes that BlobSize is the uncompressed
size; change references in Submit and any callers of datypes.ResultSubmit to use
the new CompressedBlobSize when estimating costs/quota.
- Around line 91-100: Extract the magic numeric thresholds used in the
compression selection switch into named constants (e.g., MinFastBatchSize = 10,
MinDefaultBatchSize = 3) or into a small config struct, then replace the literal
`10` and `3` in the switch that sets `compLevel` (which currently uses
`da.LevelBest`, `da.LevelFastest`, and `da.LevelDefault`) with those
constants/fields so the backlog-pressure policy is self-documenting and easily
tunable without changing control flow.

In `@pkg/da/compression_test.go`:
- Around line 61-81: The test currently appends to a shared sizes slice inside
each t.Run which is unsafe if subtests are parallelized; change to preallocate
sizes := make([]int, len(levels)) before the loop, capture the loop index (e.g.,
idx := i) inside the t.Run closure, and assign sizes[idx] = len(compressed)
instead of appending; keep the cross-level assertion after the loop (or move it
into its own t.Run) and add a short comment on not calling t.Parallel() here to
avoid races if someone enables parallel subtests; refer to the t.Run loop,
levels, Compress and Decompress calls and the sizes slice when making the
change.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 67e18bd and c875c0b.

⛔ Files ignored due to path filters (7)
  • apps/evm/go.sum is excluded by !**/*.sum
  • apps/grpc/go.sum is excluded by !**/*.sum
  • apps/testapp/go.sum is excluded by !**/*.sum
  • execution/evm/go.sum is excluded by !**/*.sum
  • execution/evm/test/go.sum is excluded by !**/*.sum
  • go.sum is excluded by !**/*.sum
  • test/e2e/go.sum is excluded by !**/*.sum
📒 Files selected for processing (10)
  • CLAUDE.md
  • apps/evm/go.mod
  • apps/grpc/go.mod
  • apps/testapp/go.mod
  • block/internal/da/client.go
  • execution/evm/test/go.mod
  • go.mod
  • pkg/da/compression.go
  • pkg/da/compression_test.go
  • test/e2e/go.mod

Copy link
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark 'Spamoor Trace Benchmarks'.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 1.50.

Benchmark suite Current: db89458 Previous: 212ac08 Ratio
SpamoorSmoke - DA.Submit (avg) 1089.3372093023256 us 636.5232558139535 us 1.71
SpamoorSmoke - DASubmitter.SubmitData (avg) 1761.9375 us 931.4242424242424 us 1.89

This comment was automatically generated by workflow using github-action-benchmark.

@alpe alpe marked this pull request as ready for review February 25, 2026 14:09
@alpe alpe requested a review from julienrbrt February 25, 2026 14:09
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
pkg/da/compression.go (1)

19-20: Use a single constant for decoder-memory cap and declared-size cap.

7 MiB is duplicated in two places. Centralizing avoids accidental drift between header checks and decoder limits.

♻️ Proposed refactor
 const maxDecompressedSize = 7 * 1024 * 1024 // 7 MiB
+const maxDecoderMemory = maxDecompressedSize
@@
- const maxDecoderMemory = 7 * 1024 * 1024 // 7 MiB cap
  var err error
  decoder, err = zstd.NewReader(nil, zstd.WithDecoderMaxMemory(maxDecoderMemory))

Also applies to: 64-67

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/da/compression.go` around lines 19 - 20, The duplicated 7 MiB value
should be centralized: introduce a single exported or package-level constant
(e.g., maxDecompressedSize) and use it for both the header declared-size cap and
the decoder memory cap instead of repeating the literal; update all occurrences
(including the existing const maxDecompressedSize and the checks in the
decoder/reader code paths referenced around the functions that validate header
size and set decoder limits) to reference that constant so both checks stay in
sync.
block/internal/da/client.go (1)

91-100: Externalize compression-level thresholds instead of hard-coding batch cutoffs.

The >3 / >10 cutoffs are policy knobs. Moving them into config (or constants wired from config) will make tuning easier under real traffic without code changes.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/da/client.go` around lines 91 - 100, The compression-level
batch cutoffs are hard-coded; replace the literals used in the switch (the
len(data) > 10 and len(data) > 3 checks that set compLevel to
da.LevelFastest/da.LevelDefault/da.LevelBest) with configurable thresholds
(e.g., constants or values read from the existing config object) so operators
can tune them without code changes; update the code that computes compLevel to
read two named thresholds (e.g., fastBatchThreshold and defaultBatchThreshold)
from config or constants and use those in the switch, and add
validation/defaults for those thresholds where configuration is loaded.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@block/internal/da/client.go`:
- Around line 314-323: In Retrieve/Get in client.go, don't swallow decompression
failures: when da.Decompress(ctx, b.Data()) returns an error (the branch that
currently logs via c.logger.Warn() and continues), return an error immediately
(or propagate a wrapped error) instead of skipping that blob and returning a
successful but incomplete payload; update the code paths around da.Decompress,
c.logger.Warn(), and the calling functions Retrieve/Get to propagate the
decompression error (or explicit partial-result sentinel) so callers can detect
corrupted/missing data.

In `@pkg/da/compression.go`:
- Around line 123-137: The goroutine that calls decoder.DecodeAll(payload, nil)
can continue running after Decompress returns on ctx.Done(), causing goroutine
leaks; modify the goroutine (the closure that currently writes to ch) to select
when sending the decodeResult so it respects ctx cancellation — i.e., capture
ctx in the closure, run out, err := decoder.DecodeAll(...), then use select {
case ch <- decodeResult{data: out, err: err}: case <-ctx.Done(): } so the worker
exits without blocking if the caller timed out; keep the existing select in the
caller unchanged.

---

Nitpick comments:
In `@block/internal/da/client.go`:
- Around line 91-100: The compression-level batch cutoffs are hard-coded;
replace the literals used in the switch (the len(data) > 10 and len(data) > 3
checks that set compLevel to da.LevelFastest/da.LevelDefault/da.LevelBest) with
configurable thresholds (e.g., constants or values read from the existing config
object) so operators can tune them without code changes; update the code that
computes compLevel to read two named thresholds (e.g., fastBatchThreshold and
defaultBatchThreshold) from config or constants and use those in the switch, and
add validation/defaults for those thresholds where configuration is loaded.

In `@pkg/da/compression.go`:
- Around line 19-20: The duplicated 7 MiB value should be centralized: introduce
a single exported or package-level constant (e.g., maxDecompressedSize) and use
it for both the header declared-size cap and the decoder memory cap instead of
repeating the literal; update all occurrences (including the existing const
maxDecompressedSize and the checks in the decoder/reader code paths referenced
around the functions that validate header size and set decoder limits) to
reference that constant so both checks stay in sync.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c875c0b and db89458.

📒 Files selected for processing (6)
  • block/internal/da/client.go
  • node/execution_test.go
  • pkg/da/compression.go
  • pkg/da/compression_test.go
  • pkg/da/types/namespace_test.go
  • test/e2e/failover_e2e_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • pkg/da/compression_test.go

Comment on lines +314 to +323
decompressed, decompErr := da.Decompress(ctx, b.Data())
if decompErr != nil {
c.logger.Warn().
Err(decompErr).
Uint64("height", height).
Int("blob_index", i).
Int("blob_size", len(b.Data())).
Msg("skipping malicious or corrupt DA blob")
continue
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Do not return success when decompression dropped all requested data.

Retrieve/Get currently swallow decompression errors and can return a “successful” empty/partial payload with no explicit failure signal. That can mask corruption and break caller assumptions on completeness.

💡 Proposed fix
  ids := make([]datypes.ID, 0, len(blobs))
  data := make([]datypes.Blob, 0, len(blobs))
+ failedDecompress := 0

  for i, b := range blobs {
    decompressed, decompErr := da.Decompress(ctx, b.Data())
    if decompErr != nil {
+     failedDecompress++
      c.logger.Warn().
        Err(decompErr).
        Uint64("height", height).
        Int("blob_index", i).
        Int("blob_size", len(b.Data())).
        Msg("skipping malicious or corrupt DA blob")
      continue
    }
    ids = append(ids, blobrpc.MakeID(height, b.Commitment))
    data = append(data, decompressed)
  }

+ if len(data) == 0 && len(blobs) > 0 {
+   return datypes.ResultRetrieve{
+     BaseResult: datypes.BaseResult{
+       Code:      datypes.StatusError,
+       Message:   fmt.Sprintf("all blobs failed decompression (%d/%d)", failedDecompress, len(blobs)),
+       Height:    height,
+       Timestamp: blockTime,
+     },
+   }
+ }

Apply the same principle in Get: if any requested ID fails decompression, return an error (or explicitly surface partial-result semantics).

As per coding guidelines, "Return errors early".

Also applies to: 330-338, 403-413

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/da/client.go` around lines 314 - 323, In Retrieve/Get in
client.go, don't swallow decompression failures: when da.Decompress(ctx,
b.Data()) returns an error (the branch that currently logs via c.logger.Warn()
and continues), return an error immediately (or propagate a wrapped error)
instead of skipping that blob and returning a successful but incomplete payload;
update the code paths around da.Decompress, c.logger.Warn(), and the calling
functions Retrieve/Get to propagate the decompression error (or explicit
partial-result sentinel) so callers can detect corrupted/missing data.

Comment on lines +123 to +137
ch := make(chan decodeResult, 1)
go func() {
out, err := decoder.DecodeAll(payload, nil)
ch <- decodeResult{data: out, err: err}
}()

select {
case res := <-ch:
if res.err != nil {
return nil, fmt.Errorf("zstd decompress: %w", res.err)
}
return res.data, nil
case <-ctx.Done():
return nil, fmt.Errorf("zstd decompress timeout: %w", ctx.Err())
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Timeout does not stop decode work; detached decode goroutines can pile up.

When ctx.Done() wins, Decompress returns, but the decode goroutine continues running. Under repeated timeouts, this can create unbounded background CPU work.

💡 Proposed fix (bounded concurrent decodes)
+const maxConcurrentDecodes = 8
+var decodeSem = make(chan struct{}, maxConcurrentDecodes)

 func Decompress(ctx context.Context, data []byte) ([]byte, error) {
   if !IsCompressed(data) {
     return data, nil
   }
@@
   ctx, cancel := context.WithTimeout(ctx, decompressTimeout)
   defer cancel()

+  select {
+  case decodeSem <- struct{}{}:
+  case <-ctx.Done():
+    return nil, fmt.Errorf("zstd decompress timeout: %w", ctx.Err())
+  }
+
   ch := make(chan decodeResult, 1)
   go func() {
+    defer func() { <-decodeSem }()
     out, err := decoder.DecodeAll(payload, nil)
     ch <- decodeResult{data: out, err: err}
   }()

As per coding guidelines, "Be mindful of goroutine leaks in concurrent processing code".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/da/compression.go` around lines 123 - 137, The goroutine that calls
decoder.DecodeAll(payload, nil) can continue running after Decompress returns on
ctx.Done(), causing goroutine leaks; modify the goroutine (the closure that
currently writes to ch) to select when sending the decodeResult so it respects
ctx cancellation — i.e., capture ctx in the closure, run out, err :=
decoder.DecodeAll(...), then use select { case ch <- decodeResult{data: out,
err: err}: case <-ctx.Done(): } so the worker exits without blocking if the
caller timed out; keep the existing select in the caller unchanged.

Copy link
Member

@julienrbrt julienrbrt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm! some questions/nits about some default constants.

// maxDecompressedSize is the maximum allowed decompressed output size.
// Matches the WithDecoderMaxMemory cap and provides early rejection
// by inspecting the zstd frame header before allocating anything.
const maxDecompressedSize = 7 * 1024 * 1024 // 7 MiB
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is based on our max blob size. Could we just use that variable instead of having another one to remember here? As when we increase blob size after a celestia upgrade, we may forget to bump this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how come the max decompressable size is 7mb? wouldnt it be the max compressed size is 7mb therefore making the decompressed size potentially x size bigger?

}
encoders[i] = enc
}
const maxDecoderMemory = 7 * 1024 * 1024 // 7 MiB cap
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why using this instead of maxDecompressedSize?

}

// Layer 3: Apply the shorter of caller deadline and our hard cap.
ctx, cancel := context.WithTimeout(ctx, decompressTimeout)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i get the idea, but can we maybe factor it by the maxDecompressedSize? so when this one increases, we expect the decrompression time to linearly increase? (or we need to plot decompression size / decompression time, to know the real link and factor it properly)

* main:
  feat: adding spammoor test to benchmark (#3105)
Copy link
Contributor

@tac0turtle tac0turtle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the thinking with compressing everything instead of only where it makes a difference. compressing smaller chunks of data, does the decrease of less than 2x justify the time trade off of compressing instead of submitting uncompressed. What is the expected cost savings for such a feature.

I dont think we understand the tradeoffs here to justify the changes quite yet

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add Compression to Blobs Before Submission

3 participants