From d5b41c8b355b266f85d0d3fd073ab2e04b8f22a5 Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Tue, 24 Feb 2026 10:04:42 +0100 Subject: [PATCH 1/9] feat: Implement ZSTD compression for DA blobs, integrate it into the DA client, and update related dependencies. --- CLAUDE.md | 1 + apps/evm/go.mod | 2 +- apps/evm/go.sum | 4 +- apps/grpc/go.mod | 2 +- apps/grpc/go.sum | 4 +- apps/testapp/go.mod | 2 +- apps/testapp/go.sum | 4 +- block/internal/da/client.go | 41 ++++++++-- execution/evm/go.sum | 4 +- execution/evm/test/go.mod | 2 +- execution/evm/test/go.sum | 4 +- go.mod | 2 +- go.sum | 4 +- pkg/da/compression.go | 75 +++++++++++++++++++ pkg/da/compression_test.go | 145 ++++++++++++++++++++++++++++++++++++ test/e2e/go.mod | 2 +- test/e2e/go.sum | 4 +- 17 files changed, 277 insertions(+), 25 deletions(-) create mode 100644 pkg/da/compression.go create mode 100644 pkg/da/compression_test.go diff --git a/CLAUDE.md b/CLAUDE.md index ead38632a..4edfbc2ae 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -109,6 +109,7 @@ go test -race ./package/... - Wrap errors with context using `fmt.Errorf` - Return errors early - Use custom error types for domain-specific errors +- Never start an error message with "failed to" ### Logging diff --git a/apps/evm/go.mod b/apps/evm/go.mod index d7fd2c751..995e221cb 100644 --- a/apps/evm/go.mod +++ b/apps/evm/go.mod @@ -93,7 +93,7 @@ require ( github.com/ipld/go-ipld-prime v0.21.0 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect - github.com/klauspost/compress v1.18.0 // indirect + github.com/klauspost/compress v1.18.4 // indirect github.com/klauspost/cpuid/v2 v2.3.0 // indirect github.com/koron/go-ssdp v0.0.6 // indirect github.com/libp2p/go-buffer-pool v0.1.0 // indirect diff --git a/apps/evm/go.sum b/apps/evm/go.sum index a452ee14a..c50eb268c 100644 --- a/apps/evm/go.sum +++ b/apps/evm/go.sum @@ -714,8 +714,8 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= -github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= -github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c= +github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.3/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY= github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= diff --git a/apps/grpc/go.mod b/apps/grpc/go.mod index 19277f2e5..4da4eeb62 100644 --- a/apps/grpc/go.mod +++ b/apps/grpc/go.mod @@ -73,7 +73,7 @@ require ( github.com/ipld/go-ipld-prime v0.21.0 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect - github.com/klauspost/compress v1.18.0 // indirect + github.com/klauspost/compress v1.18.4 // indirect github.com/klauspost/cpuid/v2 v2.3.0 // indirect github.com/koron/go-ssdp v0.0.6 // indirect github.com/libp2p/go-buffer-pool v0.1.0 // indirect diff --git a/apps/grpc/go.sum b/apps/grpc/go.sum index 3baa6646d..1344f1f24 100644 --- a/apps/grpc/go.sum +++ b/apps/grpc/go.sum @@ -646,8 +646,8 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= -github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= -github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c= +github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.3/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY= github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= diff --git a/apps/testapp/go.mod b/apps/testapp/go.mod index f44f42f89..6fbd00b10 100644 --- a/apps/testapp/go.mod +++ b/apps/testapp/go.mod @@ -72,7 +72,7 @@ require ( github.com/ipld/go-ipld-prime v0.21.0 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect - github.com/klauspost/compress v1.18.0 // indirect + github.com/klauspost/compress v1.18.4 // indirect github.com/klauspost/cpuid/v2 v2.3.0 // indirect github.com/koron/go-ssdp v0.0.6 // indirect github.com/libp2p/go-buffer-pool v0.1.0 // indirect diff --git a/apps/testapp/go.sum b/apps/testapp/go.sum index 3baa6646d..1344f1f24 100644 --- a/apps/testapp/go.sum +++ b/apps/testapp/go.sum @@ -646,8 +646,8 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= -github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= -github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c= +github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.3/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY= github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= diff --git a/block/internal/da/client.go b/block/internal/da/client.go index 3185410cd..9840fdc0f 100644 --- a/block/internal/da/client.go +++ b/block/internal/da/client.go @@ -9,6 +9,7 @@ import ( "time" "github.com/celestiaorg/go-square/v3/share" + "github.com/evstack/ev-node/pkg/da" "github.com/rs/zerolog" "github.com/evstack/ev-node/block/internal/common" @@ -89,7 +90,23 @@ func (c *client) Submit(ctx context.Context, data [][]byte, _ float64, namespace blobs := make([]*blobrpc.Blob, len(data)) for i, raw := range data { - if uint64(len(raw)) > common.DefaultMaxBlobSize { + // Compress blob data before submission to reduce bandwidth and storage costs + compressed, compErr := da.Compress(raw) + if compErr != nil { + return datypes.ResultSubmit{ + BaseResult: datypes.BaseResult{ + Code: datypes.StatusError, + Message: fmt.Sprintf("compress blob %d: %v", i, compErr), + }, + } + } + c.logger.Debug(). + Int("original_size", len(raw)). + Int("compressed_size", len(compressed)). + Float64("ratio", float64(len(compressed))/float64(len(raw))). + Msg("compressed blob for DA submission") + + if uint64(len(compressed)) > common.DefaultMaxBlobSize { return datypes.ResultSubmit{ BaseResult: datypes.BaseResult{ Code: datypes.StatusTooBig, @@ -97,7 +114,7 @@ func (c *client) Submit(ctx context.Context, data [][]byte, _ float64, namespace }, } } - blobs[i], err = blobrpc.NewBlobV0(ns, raw) + blobs[i], err = blobrpc.NewBlobV0(ns, compressed) if err != nil { return datypes.ResultSubmit{ BaseResult: datypes.BaseResult{ @@ -278,12 +295,22 @@ func (c *client) Retrieve(ctx context.Context, height uint64, namespace []byte) } } - // Extract IDs and data from the blobs. + // Extract IDs and data from the blobs, decompressing if needed. ids := make([]datypes.ID, len(blobs)) data := make([]datypes.Blob, len(blobs)) for i, b := range blobs { ids[i] = blobrpc.MakeID(height, b.Commitment) - data[i] = b.Data() + decompressed, decompErr := da.Decompress(b.Data()) + if decompErr != nil { + return datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{ + Code: datypes.StatusError, + Message: fmt.Sprintf("decompress blob %d at height %d: %v", i, height, decompErr), + Height: height, + }, + } + } + data[i] = decompressed } c.logger.Debug().Int("num_blobs", len(blobs)).Msg("retrieved blobs") @@ -361,7 +388,11 @@ func (c *client) Get(ctx context.Context, ids []datypes.ID, namespace []byte) ([ if b == nil { continue } - res = append(res, b.Data()) + decompressed, decompErr := da.Decompress(b.Data()) + if decompErr != nil { + return nil, fmt.Errorf("decompress blob: %w", decompErr) + } + res = append(res, decompressed) } return res, nil diff --git a/execution/evm/go.sum b/execution/evm/go.sum index 46d23df03..187e72a47 100644 --- a/execution/evm/go.sum +++ b/execution/evm/go.sum @@ -186,8 +186,8 @@ github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7Bd github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABoLk/+KKHggpk= github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk= -github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= -github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c= +github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y= github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= github.com/koron/go-ssdp v0.0.6 h1:Jb0h04599eq/CY7rB5YEqPS83HmRfHP2azkxMN2rFtU= diff --git a/execution/evm/test/go.mod b/execution/evm/test/go.mod index 72d713909..673dd322e 100644 --- a/execution/evm/test/go.mod +++ b/execution/evm/test/go.mod @@ -109,7 +109,7 @@ require ( github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/ipfs/go-cid v0.6.0 // indirect github.com/jmhodges/levigo v1.0.0 // indirect - github.com/klauspost/compress v1.18.0 // indirect + github.com/klauspost/compress v1.18.4 // indirect github.com/klauspost/cpuid/v2 v2.3.0 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/kr/text v0.2.0 // indirect diff --git a/execution/evm/test/go.sum b/execution/evm/test/go.sum index 721ac1280..ceda6185b 100644 --- a/execution/evm/test/go.sum +++ b/execution/evm/test/go.sum @@ -404,8 +404,8 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= -github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c= +github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y= github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= diff --git a/go.mod b/go.mod index 41e0a93a0..653abdcb7 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( github.com/hashicorp/raft-boltdb v0.0.0-20251103221153-05f9dd7a5148 github.com/ipfs/go-datastore v0.9.1 github.com/ipfs/go-ds-badger4 v0.1.8 + github.com/klauspost/compress v1.18.4 github.com/libp2p/go-libp2p v0.47.0 github.com/libp2p/go-libp2p-kad-dht v0.38.0 github.com/libp2p/go-libp2p-pubsub v0.15.0 @@ -89,7 +90,6 @@ require ( github.com/ipld/go-ipld-prime v0.21.0 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect - github.com/klauspost/compress v1.18.0 // indirect github.com/klauspost/cpuid/v2 v2.3.0 // indirect github.com/koron/go-ssdp v0.0.6 // indirect github.com/libp2p/go-buffer-pool v0.1.0 // indirect diff --git a/go.sum b/go.sum index 3baa6646d..1344f1f24 100644 --- a/go.sum +++ b/go.sum @@ -646,8 +646,8 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= -github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= -github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c= +github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.3/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY= github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= diff --git a/pkg/da/compression.go b/pkg/da/compression.go new file mode 100644 index 000000000..4ae1a7f0b --- /dev/null +++ b/pkg/da/compression.go @@ -0,0 +1,75 @@ +package da + +import ( + "fmt" + + "github.com/klauspost/compress/zstd" +) + +// magic is the 4-byte prefix prepended to all compressed blobs. +// ASCII "ZSTD" = 0x5A 0x53 0x54 0x44. +var magic = []byte{0x5A, 0x53, 0x54, 0x44} + +// encoder and decoder are package-level singletons. They are safe for +// concurrent use per the klauspost/compress documentation. +var ( + encoder *zstd.Encoder + decoder *zstd.Decoder +) + +func init() { + var err error + encoder, err = zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedDefault)) + if err != nil { + panic(fmt.Sprintf("compression: create zstd encoder: %v", err)) + } + decoder, err = zstd.NewReader(nil) + if err != nil { + panic(fmt.Sprintf("compression: create zstd decoder: %v", err)) + } +} + +// Compress compresses data using zstd and prepends the magic prefix. +// Returns the original data unchanged if it is empty. +func Compress(data []byte) ([]byte, error) { + if len(data) == 0 { + return data, nil + } + + compressed := encoder.EncodeAll(data, nil) + + // Prepend magic prefix + result := make([]byte, len(magic)+len(compressed)) + copy(result, magic) + copy(result[len(magic):], compressed) + + return result, nil +} + +// Decompress decompresses data that was compressed with Compress. +// If the data does not have the magic prefix, it is returned as-is +// (backward-compatible passthrough for uncompressed blobs). +func Decompress(data []byte) ([]byte, error) { + if !IsCompressed(data) { + return data, nil + } + + // Strip magic prefix and decompress + decompressed, err := decoder.DecodeAll(data[len(magic):], nil) + if err != nil { + return nil, fmt.Errorf("compression: zstd decompress failed: %w", err) + } + + return decompressed, nil +} + +// IsCompressed reports whether data starts with the compression magic prefix. +func IsCompressed(data []byte) bool { + if len(data) < len(magic) { + return false + } + return data[0] == magic[0] && + data[1] == magic[1] && + data[2] == magic[2] && + data[3] == magic[3] +} diff --git a/pkg/da/compression_test.go b/pkg/da/compression_test.go new file mode 100644 index 000000000..e5c2b077e --- /dev/null +++ b/pkg/da/compression_test.go @@ -0,0 +1,145 @@ +package da + +import ( + "bytes" + "crypto/rand" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCompressDecompress_RoundTrip(t *testing.T) { + tests := []struct { + name string + data []byte + }{ + { + name: "small payload", + data: []byte("hello world, this is a test payload for compression"), + }, + { + name: "protobuf-like repeated data", + data: bytes.Repeat([]byte{0x0a, 0x10, 0x08, 0x01, 0x12, 0x0c}, 1000), + }, + { + name: "single byte", + data: []byte{0xFF}, + }, + { + name: "1MB payload", + data: bytes.Repeat([]byte("rollkit blob compression test data "), 30000), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + compressed, err := Compress(tt.data) + require.NoError(t, err) + + assert.True(t, IsCompressed(compressed), "compressed data should have magic prefix") + + decompressed, err := Decompress(compressed) + require.NoError(t, err) + + assert.Equal(t, tt.data, decompressed, "round-trip should preserve data") + }) + } +} + +func TestCompress_Empty(t *testing.T) { + compressed, err := Compress(nil) + require.NoError(t, err) + assert.Nil(t, compressed) + + compressed, err = Compress([]byte{}) + require.NoError(t, err) + assert.Empty(t, compressed) +} + +func TestDecompress_UncompressedPassthrough(t *testing.T) { + // Data without magic prefix should pass through unchanged + raw := []byte("this is uncompressed protobuf data") + result, err := Decompress(raw) + require.NoError(t, err) + assert.Equal(t, raw, result) +} + +func TestDecompress_Empty(t *testing.T) { + result, err := Decompress(nil) + require.NoError(t, err) + assert.Nil(t, result) + + result, err = Decompress([]byte{}) + require.NoError(t, err) + assert.Empty(t, result) +} + +func TestDecompress_ShortData(t *testing.T) { + // Data shorter than magic prefix should pass through + result, err := Decompress([]byte{0x5A, 0x53}) + require.NoError(t, err) + assert.Equal(t, []byte{0x5A, 0x53}, result) +} + +func TestDecompress_CorruptCompressedData(t *testing.T) { + // Magic prefix followed by invalid zstd data + corrupt := append([]byte{0x5A, 0x53, 0x54, 0x44}, []byte("not valid zstd")...) + _, err := Decompress(corrupt) + assert.Error(t, err, "should fail on corrupt compressed data") +} + +func TestIsCompressed(t *testing.T) { + tests := []struct { + name string + data []byte + expected bool + }{ + {name: "nil", data: nil, expected: false}, + {name: "empty", data: []byte{}, expected: false}, + {name: "short", data: []byte{0x5A}, expected: false}, + {name: "magic prefix only", data: []byte{0x5A, 0x53, 0x54, 0x44}, expected: true}, + {name: "magic with data", data: []byte{0x5A, 0x53, 0x54, 0x44, 0x01, 0x02}, expected: true}, + {name: "wrong prefix", data: []byte{0x00, 0x53, 0x54, 0x44}, expected: false}, + {name: "protobuf data", data: []byte{0x0a, 0x10, 0x08, 0x01}, expected: false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.expected, IsCompressed(tt.data)) + }) + } +} + +func TestCompress_AchievesCompression(t *testing.T) { + // Highly compressible data should achieve meaningful compression + data := bytes.Repeat([]byte("rollkit block data with repeated content "), 10000) + compressed, err := Compress(data) + require.NoError(t, err) + + ratio := float64(len(compressed)) / float64(len(data)) + t.Logf("compression ratio: %.4f (original: %d, compressed: %d)", ratio, len(data), len(compressed)) + assert.Less(t, ratio, 0.1, "highly repetitive data should compress to <10%% of original size") +} + +func TestCompress_RandomDataStillWorks(t *testing.T) { + // Random data won't compress well but should still round-trip correctly + data := make([]byte, 4096) + _, err := rand.Read(data) + require.NoError(t, err) + + compressed, err := Compress(data) + require.NoError(t, err) + + decompressed, err := Decompress(compressed) + require.NoError(t, err) + assert.Equal(t, data, decompressed) +} + +func TestDecompress_DataStartingWithMagicButUncompressed(t *testing.T) { + // Edge case: data that happens to start with the magic bytes but isn't actually compressed. + // This should fail decompression (invalid zstd frame). + fakeCompressed := append([]byte{0x5A, 0x53, 0x54, 0x44}, bytes.Repeat([]byte{0x00}, 100)...) + _, err := Decompress(fakeCompressed) + assert.Error(t, err, "data starting with magic but containing invalid zstd should error") +} diff --git a/test/e2e/go.mod b/test/e2e/go.mod index 8d04cfcc0..440fa3556 100644 --- a/test/e2e/go.mod +++ b/test/e2e/go.mod @@ -171,7 +171,7 @@ require ( github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect github.com/jmhodges/levigo v1.0.0 // indirect - github.com/klauspost/compress v1.18.0 // indirect + github.com/klauspost/compress v1.18.4 // indirect github.com/klauspost/cpuid/v2 v2.3.0 // indirect github.com/koron/go-ssdp v0.0.6 // indirect github.com/kr/pretty v0.3.1 // indirect diff --git a/test/e2e/go.sum b/test/e2e/go.sum index 464179e01..99a65e54d 100644 --- a/test/e2e/go.sum +++ b/test/e2e/go.sum @@ -632,8 +632,8 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.11.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= -github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= -github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c= +github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y= github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= From cc68320d161e351fbead025e1b47537717b2bf2b Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Tue, 24 Feb 2026 10:32:54 +0100 Subject: [PATCH 2/9] Linter --- block/internal/da/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/block/internal/da/client.go b/block/internal/da/client.go index 9840fdc0f..d8ac95888 100644 --- a/block/internal/da/client.go +++ b/block/internal/da/client.go @@ -9,10 +9,10 @@ import ( "time" "github.com/celestiaorg/go-square/v3/share" - "github.com/evstack/ev-node/pkg/da" "github.com/rs/zerolog" "github.com/evstack/ev-node/block/internal/common" + "github.com/evstack/ev-node/pkg/da" blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc" datypes "github.com/evstack/ev-node/pkg/da/types" ) From 58e9b18e18265556f824932b7bd8d38b7708fa3e Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Tue, 24 Feb 2026 12:04:01 +0100 Subject: [PATCH 3/9] feat: Implement adaptive ZSTD compression levels for DA blob submission based on batch size. (cherry picked from commit 3ff6211ec69d482437de79a617b5b39a88c91835) --- block/internal/da/client.go | 15 ++++++++-- pkg/da/compression.go | 52 ++++++++++++++++++++++++-------- pkg/da/compression_test.go | 59 +++++++++++++++++++++++++++++++------ 3 files changed, 102 insertions(+), 24 deletions(-) diff --git a/block/internal/da/client.go b/block/internal/da/client.go index d8ac95888..d541987c5 100644 --- a/block/internal/da/client.go +++ b/block/internal/da/client.go @@ -88,10 +88,20 @@ func (c *client) Submit(ctx context.Context, data [][]byte, _ float64, namespace } } + // Select compression level based on backlog pressure: + // large batch = high backlog = prioritize speed; + // small batch = low backlog = prioritize ratio. + compLevel := da.LevelBest + switch { + case len(data) > 10: + compLevel = da.LevelFastest + case len(data) > 3: + compLevel = da.LevelDefault + } + blobs := make([]*blobrpc.Blob, len(data)) for i, raw := range data { - // Compress blob data before submission to reduce bandwidth and storage costs - compressed, compErr := da.Compress(raw) + compressed, compErr := da.Compress(raw, compLevel) if compErr != nil { return datypes.ResultSubmit{ BaseResult: datypes.BaseResult{ @@ -104,6 +114,7 @@ func (c *client) Submit(ctx context.Context, data [][]byte, _ float64, namespace Int("original_size", len(raw)). Int("compressed_size", len(compressed)). Float64("ratio", float64(len(compressed))/float64(len(raw))). + Int("level", int(compLevel)). Msg("compressed blob for DA submission") if uint64(len(compressed)) > common.DefaultMaxBlobSize { diff --git a/pkg/da/compression.go b/pkg/da/compression.go index 4ae1a7f0b..f299f99da 100644 --- a/pkg/da/compression.go +++ b/pkg/da/compression.go @@ -10,33 +10,59 @@ import ( // ASCII "ZSTD" = 0x5A 0x53 0x54 0x44. var magic = []byte{0x5A, 0x53, 0x54, 0x44} -// encoder and decoder are package-level singletons. They are safe for -// concurrent use per the klauspost/compress documentation. -var ( - encoder *zstd.Encoder - decoder *zstd.Decoder +// CompressionLevel controls the speed/ratio trade-off for blob compression. +type CompressionLevel int + +const ( + // LevelFastest prioritizes speed over compression ratio. + // Use when backlog is high and throughput matters most. + LevelFastest CompressionLevel = iota + // LevelDefault balances speed and compression ratio. + LevelDefault + // LevelBest prioritizes compression ratio over speed. + // Use when backlog is low to save bandwidth and storage. + LevelBest ) +// encoders holds one zstd encoder per compression level. Each is safe for +// concurrent use per the klauspost/compress documentation. +var encoders [3]*zstd.Encoder + +// decoder is a package-level singleton, safe for concurrent use. +var decoder *zstd.Decoder + func init() { - var err error - encoder, err = zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedDefault)) - if err != nil { - panic(fmt.Sprintf("compression: create zstd encoder: %v", err)) + levels := [3]zstd.EncoderLevel{ + zstd.SpeedFastest, + zstd.SpeedDefault, + zstd.SpeedBestCompression, + } + for i, lvl := range levels { + enc, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(lvl)) + if err != nil { + panic(fmt.Sprintf("compression: create zstd encoder (level %d): %v", i, err)) + } + encoders[i] = enc } + var err error decoder, err = zstd.NewReader(nil) if err != nil { panic(fmt.Sprintf("compression: create zstd decoder: %v", err)) } } -// Compress compresses data using zstd and prepends the magic prefix. +// Compress compresses data using zstd at the given level and prepends the magic prefix. // Returns the original data unchanged if it is empty. -func Compress(data []byte) ([]byte, error) { +func Compress(data []byte, level CompressionLevel) ([]byte, error) { if len(data) == 0 { return data, nil } - compressed := encoder.EncodeAll(data, nil) + if level < LevelFastest || level > LevelBest { + level = LevelDefault + } + + compressed := encoders[level].EncodeAll(data, nil) // Prepend magic prefix result := make([]byte, len(magic)+len(compressed)) @@ -57,7 +83,7 @@ func Decompress(data []byte) ([]byte, error) { // Strip magic prefix and decompress decompressed, err := decoder.DecodeAll(data[len(magic):], nil) if err != nil { - return nil, fmt.Errorf("compression: zstd decompress failed: %w", err) + return nil, fmt.Errorf("compression: zstd decompress: %w", err) } return decompressed, nil diff --git a/pkg/da/compression_test.go b/pkg/da/compression_test.go index e5c2b077e..d7c21fe41 100644 --- a/pkg/da/compression_test.go +++ b/pkg/da/compression_test.go @@ -34,7 +34,7 @@ func TestCompressDecompress_RoundTrip(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - compressed, err := Compress(tt.data) + compressed, err := Compress(tt.data, LevelDefault) require.NoError(t, err) assert.True(t, IsCompressed(compressed), "compressed data should have magic prefix") @@ -47,12 +47,46 @@ func TestCompressDecompress_RoundTrip(t *testing.T) { } } +func TestCompress_AllLevelsRoundTrip(t *testing.T) { + data := bytes.Repeat([]byte("adaptive compression level test "), 5000) + levels := []struct { + name string + level CompressionLevel + }{ + {"fastest", LevelFastest}, + {"default", LevelDefault}, + {"best", LevelBest}, + } + + var sizes []int + for _, lvl := range levels { + t.Run(lvl.name, func(t *testing.T) { + compressed, err := Compress(data, lvl.level) + require.NoError(t, err) + assert.True(t, IsCompressed(compressed)) + + sizes = append(sizes, len(compressed)) + + decompressed, err := Decompress(compressed) + require.NoError(t, err) + assert.Equal(t, data, decompressed) + + t.Logf("level=%s compressed=%d ratio=%.4f", lvl.name, len(compressed), float64(len(compressed))/float64(len(data))) + }) + } + + // Best should produce equal or smaller output than Fastest + if len(sizes) == 3 { + assert.LessOrEqual(t, sizes[2], sizes[0], "LevelBest should compress at least as well as LevelFastest") + } +} + func TestCompress_Empty(t *testing.T) { - compressed, err := Compress(nil) + compressed, err := Compress(nil, LevelDefault) require.NoError(t, err) assert.Nil(t, compressed) - compressed, err = Compress([]byte{}) + compressed, err = Compress([]byte{}, LevelDefault) require.NoError(t, err) assert.Empty(t, compressed) } @@ -112,9 +146,8 @@ func TestIsCompressed(t *testing.T) { } func TestCompress_AchievesCompression(t *testing.T) { - // Highly compressible data should achieve meaningful compression data := bytes.Repeat([]byte("rollkit block data with repeated content "), 10000) - compressed, err := Compress(data) + compressed, err := Compress(data, LevelDefault) require.NoError(t, err) ratio := float64(len(compressed)) / float64(len(data)) @@ -123,12 +156,11 @@ func TestCompress_AchievesCompression(t *testing.T) { } func TestCompress_RandomDataStillWorks(t *testing.T) { - // Random data won't compress well but should still round-trip correctly data := make([]byte, 4096) _, err := rand.Read(data) require.NoError(t, err) - compressed, err := Compress(data) + compressed, err := Compress(data, LevelFastest) require.NoError(t, err) decompressed, err := Decompress(compressed) @@ -137,9 +169,18 @@ func TestCompress_RandomDataStillWorks(t *testing.T) { } func TestDecompress_DataStartingWithMagicButUncompressed(t *testing.T) { - // Edge case: data that happens to start with the magic bytes but isn't actually compressed. - // This should fail decompression (invalid zstd frame). fakeCompressed := append([]byte{0x5A, 0x53, 0x54, 0x44}, bytes.Repeat([]byte{0x00}, 100)...) _, err := Decompress(fakeCompressed) assert.Error(t, err, "data starting with magic but containing invalid zstd should error") } + +func TestCompress_InvalidLevel(t *testing.T) { + // Out-of-range level should fall back to LevelDefault + data := []byte("test data for invalid level") + compressed, err := Compress(data, CompressionLevel(99)) + require.NoError(t, err) + + decompressed, err := Decompress(compressed) + require.NoError(t, err) + assert.Equal(t, data, decompressed) +} From c76c2459189e251b45b8f7bdbd5adabcbfda20bf Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Wed, 25 Feb 2026 10:17:53 +0100 Subject: [PATCH 4/9] Review feedback --- block/internal/da/client.go | 13 ++++---- pkg/da/compression.go | 62 ++++++++++++++++++++++++++++++++----- pkg/da/compression_test.go | 58 ++++++++++++++++++++++++++++------ 3 files changed, 110 insertions(+), 23 deletions(-) diff --git a/block/internal/da/client.go b/block/internal/da/client.go index d541987c5..82304511e 100644 --- a/block/internal/da/client.go +++ b/block/internal/da/client.go @@ -113,7 +113,7 @@ func (c *client) Submit(ctx context.Context, data [][]byte, _ float64, namespace c.logger.Debug(). Int("original_size", len(raw)). Int("compressed_size", len(compressed)). - Float64("ratio", float64(len(compressed))/float64(len(raw))). + Float64("ratio", float64(len(compressed))/float64(max(len(raw), 1))). Int("level", int(compLevel)). Msg("compressed blob for DA submission") @@ -311,13 +311,14 @@ func (c *client) Retrieve(ctx context.Context, height uint64, namespace []byte) data := make([]datypes.Blob, len(blobs)) for i, b := range blobs { ids[i] = blobrpc.MakeID(height, b.Commitment) - decompressed, decompErr := da.Decompress(b.Data()) + decompressed, decompErr := da.Decompress(ctx, b.Data()) if decompErr != nil { return datypes.ResultRetrieve{ BaseResult: datypes.BaseResult{ - Code: datypes.StatusError, - Message: fmt.Sprintf("decompress blob %d at height %d: %v", i, height, decompErr), - Height: height, + Code: datypes.StatusError, + Message: fmt.Sprintf("decompress blob %d at height %d: %v", i, height, decompErr), + Height: height, + Timestamp: blockTime, }, } } @@ -399,7 +400,7 @@ func (c *client) Get(ctx context.Context, ids []datypes.ID, namespace []byte) ([ if b == nil { continue } - decompressed, decompErr := da.Decompress(b.Data()) + decompressed, decompErr := da.Decompress(ctx, b.Data()) if decompErr != nil { return nil, fmt.Errorf("decompress blob: %w", decompErr) } diff --git a/pkg/da/compression.go b/pkg/da/compression.go index f299f99da..434eeae0c 100644 --- a/pkg/da/compression.go +++ b/pkg/da/compression.go @@ -1,7 +1,10 @@ package da import ( + "context" + "errors" "fmt" + "time" "github.com/klauspost/compress/zstd" ) @@ -10,6 +13,20 @@ import ( // ASCII "ZSTD" = 0x5A 0x53 0x54 0x44. var magic = []byte{0x5A, 0x53, 0x54, 0x44} +// maxDecompressedSize is the maximum allowed decompressed output size. +// Matches the WithDecoderMaxMemory cap and provides early rejection +// by inspecting the zstd frame header before allocating anything. +const maxDecompressedSize = 7 * 1024 * 1024 // 7 MiB + +// decompressTimeout is the hard wall-clock cap on a single decompression. +// This guards against CPU-based decompression bombs (crafted inputs that +// are slow to decode) independently of the caller's context deadline. +const decompressTimeout = 500 * time.Millisecond + +// ErrDecompressedSizeExceeded is returned when the zstd frame header +// declares a decompressed size that exceeds the allowed limit. +var ErrDecompressedSizeExceeded = errors.New("compression: declared decompressed size exceeds limit") + // CompressionLevel controls the speed/ratio trade-off for blob compression. type CompressionLevel int @@ -44,8 +61,9 @@ func init() { } encoders[i] = enc } + const maxDecoderMemory = 7 * 1024 * 1024 // 7 MiB cap var err error - decoder, err = zstd.NewReader(nil) + decoder, err = zstd.NewReader(nil, zstd.WithDecoderMaxMemory(maxDecoderMemory)) if err != nil { panic(fmt.Sprintf("compression: create zstd decoder: %v", err)) } @@ -72,21 +90,51 @@ func Compress(data []byte, level CompressionLevel) ([]byte, error) { return result, nil } +// decodeResult carries the output of a DecodeAll goroutine. +type decodeResult struct { + data []byte + err error +} + // Decompress decompresses data that was compressed with Compress. // If the data does not have the magic prefix, it is returned as-is // (backward-compatible passthrough for uncompressed blobs). -func Decompress(data []byte) ([]byte, error) { +func Decompress(ctx context.Context, data []byte) ([]byte, error) { if !IsCompressed(data) { return data, nil } - // Strip magic prefix and decompress - decompressed, err := decoder.DecodeAll(data[len(magic):], nil) - if err != nil { - return nil, fmt.Errorf("compression: zstd decompress: %w", err) + payload := data[len(magic):] + + // Layer 1: Parse frame header to check declared decompressed size + // before allocating anything. This is a zero-cost upfront rejection. + var hdr zstd.Header + if err := hdr.Decode(payload); err == nil && hdr.HasFCS { + if hdr.FrameContentSize > maxDecompressedSize { + return nil, fmt.Errorf("%w: %d bytes declared, %d allowed", + ErrDecompressedSizeExceeded, hdr.FrameContentSize, maxDecompressedSize) + } } - return decompressed, nil + // Layer 3: Apply the shorter of caller deadline and our hard cap. + ctx, cancel := context.WithTimeout(ctx, decompressTimeout) + defer cancel() + + ch := make(chan decodeResult, 1) + go func() { + out, err := decoder.DecodeAll(payload, nil) + ch <- decodeResult{data: out, err: err} + }() + + select { + case res := <-ch: + if res.err != nil { + return nil, fmt.Errorf("zstd decompress: %w", res.err) + } + return res.data, nil + case <-ctx.Done(): + return nil, fmt.Errorf("zstd decompress timeout: %w", ctx.Err()) + } } // IsCompressed reports whether data starts with the compression magic prefix. diff --git a/pkg/da/compression_test.go b/pkg/da/compression_test.go index d7c21fe41..0f3b69349 100644 --- a/pkg/da/compression_test.go +++ b/pkg/da/compression_test.go @@ -2,9 +2,11 @@ package da import ( "bytes" + "context" "crypto/rand" "testing" + "github.com/klauspost/compress/zstd" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -39,7 +41,7 @@ func TestCompressDecompress_RoundTrip(t *testing.T) { assert.True(t, IsCompressed(compressed), "compressed data should have magic prefix") - decompressed, err := Decompress(compressed) + decompressed, err := Decompress(context.Background(), compressed) require.NoError(t, err) assert.Equal(t, tt.data, decompressed, "round-trip should preserve data") @@ -67,7 +69,7 @@ func TestCompress_AllLevelsRoundTrip(t *testing.T) { sizes = append(sizes, len(compressed)) - decompressed, err := Decompress(compressed) + decompressed, err := Decompress(context.Background(), compressed) require.NoError(t, err) assert.Equal(t, data, decompressed) @@ -94,24 +96,24 @@ func TestCompress_Empty(t *testing.T) { func TestDecompress_UncompressedPassthrough(t *testing.T) { // Data without magic prefix should pass through unchanged raw := []byte("this is uncompressed protobuf data") - result, err := Decompress(raw) + result, err := Decompress(context.Background(), raw) require.NoError(t, err) assert.Equal(t, raw, result) } func TestDecompress_Empty(t *testing.T) { - result, err := Decompress(nil) + result, err := Decompress(context.Background(), nil) require.NoError(t, err) assert.Nil(t, result) - result, err = Decompress([]byte{}) + result, err = Decompress(context.Background(), []byte{}) require.NoError(t, err) assert.Empty(t, result) } func TestDecompress_ShortData(t *testing.T) { // Data shorter than magic prefix should pass through - result, err := Decompress([]byte{0x5A, 0x53}) + result, err := Decompress(context.Background(), []byte{0x5A, 0x53}) require.NoError(t, err) assert.Equal(t, []byte{0x5A, 0x53}, result) } @@ -119,7 +121,7 @@ func TestDecompress_ShortData(t *testing.T) { func TestDecompress_CorruptCompressedData(t *testing.T) { // Magic prefix followed by invalid zstd data corrupt := append([]byte{0x5A, 0x53, 0x54, 0x44}, []byte("not valid zstd")...) - _, err := Decompress(corrupt) + _, err := Decompress(context.Background(), corrupt) assert.Error(t, err, "should fail on corrupt compressed data") } @@ -163,14 +165,14 @@ func TestCompress_RandomDataStillWorks(t *testing.T) { compressed, err := Compress(data, LevelFastest) require.NoError(t, err) - decompressed, err := Decompress(compressed) + decompressed, err := Decompress(context.Background(), compressed) require.NoError(t, err) assert.Equal(t, data, decompressed) } func TestDecompress_DataStartingWithMagicButUncompressed(t *testing.T) { fakeCompressed := append([]byte{0x5A, 0x53, 0x54, 0x44}, bytes.Repeat([]byte{0x00}, 100)...) - _, err := Decompress(fakeCompressed) + _, err := Decompress(context.Background(), fakeCompressed) assert.Error(t, err, "data starting with magic but containing invalid zstd should error") } @@ -180,7 +182,43 @@ func TestCompress_InvalidLevel(t *testing.T) { compressed, err := Compress(data, CompressionLevel(99)) require.NoError(t, err) - decompressed, err := Decompress(compressed) + decompressed, err := Decompress(context.Background(), compressed) require.NoError(t, err) assert.Equal(t, data, decompressed) } + +func TestDecompress_ContextCanceled(t *testing.T) { + data := []byte("test data for context cancellation") + compressed, err := Compress(data, LevelDefault) + require.NoError(t, err) + + // Pre-canceled context should cause Decompress to return an error. + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + _, err = Decompress(ctx, compressed) + assert.Error(t, err, "decompress with canceled context should fail") + assert.ErrorIs(t, err, context.Canceled) +} + +func TestDecompress_OversizedFrameRejected(t *testing.T) { + // Build a fake zstd frame header that declares 100 MiB decompressed size. + // This should be rejected by the frame header pre-check before any + // decompression occurs. + hdr := zstd.Header{ + SingleSegment: true, + HasFCS: true, + FrameContentSize: 100 * 1024 * 1024, // 100 MiB — way over the 7 MiB limit + } + frame, err := hdr.AppendTo(nil) + require.NoError(t, err) + + // Prepend our custom magic prefix + blob := make([]byte, len(magic)+len(frame)) + copy(blob, magic) + copy(blob[len(magic):], frame) + + _, err = Decompress(context.Background(), blob) + assert.Error(t, err, "should reject blob declaring oversized decompressed output") + assert.ErrorIs(t, err, ErrDecompressedSizeExceeded) +} From dcc03d082c5e18c8f8030888137d025b98a60eff Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Wed, 25 Feb 2026 10:45:10 +0100 Subject: [PATCH 5/9] Skip malicious blobs --- block/internal/da/client.go | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/block/internal/da/client.go b/block/internal/da/client.go index 82304511e..866484164 100644 --- a/block/internal/da/client.go +++ b/block/internal/da/client.go @@ -307,22 +307,22 @@ func (c *client) Retrieve(ctx context.Context, height uint64, namespace []byte) } // Extract IDs and data from the blobs, decompressing if needed. - ids := make([]datypes.ID, len(blobs)) - data := make([]datypes.Blob, len(blobs)) + // Malicious or corrupt blobs that fail decompression are logged and skipped. + ids := make([]datypes.ID, 0, len(blobs)) + data := make([]datypes.Blob, 0, len(blobs)) for i, b := range blobs { - ids[i] = blobrpc.MakeID(height, b.Commitment) decompressed, decompErr := da.Decompress(ctx, b.Data()) if decompErr != nil { - return datypes.ResultRetrieve{ - BaseResult: datypes.BaseResult{ - Code: datypes.StatusError, - Message: fmt.Sprintf("decompress blob %d at height %d: %v", i, height, decompErr), - Height: height, - Timestamp: blockTime, - }, - } + c.logger.Warn(). + Err(decompErr). + Uint64("height", height). + Int("blob_index", i). + Int("blob_size", len(b.Data())). + Msg("skipping malicious or corrupt DA blob") + continue } - data[i] = decompressed + ids = append(ids, blobrpc.MakeID(height, b.Commitment)) + data = append(data, decompressed) } c.logger.Debug().Int("num_blobs", len(blobs)).Msg("retrieved blobs") @@ -402,7 +402,12 @@ func (c *client) Get(ctx context.Context, ids []datypes.ID, namespace []byte) ([ } decompressed, decompErr := da.Decompress(ctx, b.Data()) if decompErr != nil { - return nil, fmt.Errorf("decompress blob: %w", decompErr) + c.logger.Warn(). + Err(decompErr). + Uint64("height", height). + Int("blob_size", len(b.Data())). + Msg("skipping malicious or corrupt DA blob") + continue } res = append(res, decompressed) } From c52dde53d1fe54677c24035585a7b3b35c3711c4 Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Wed, 25 Feb 2026 10:50:23 +0100 Subject: [PATCH 6/9] Linter --- node/execution_test.go | 18 ++++-------------- pkg/da/types/namespace_test.go | 12 ++++-------- 2 files changed, 8 insertions(+), 22 deletions(-) diff --git a/node/execution_test.go b/node/execution_test.go index 8ea13c3fe..75d4a1a51 100644 --- a/node/execution_test.go +++ b/node/execution_test.go @@ -90,21 +90,11 @@ func waitForNodeInitialization(node *FullNode) error { func getExecutorFromNode(t *testing.T, node *FullNode) coreexecutor.Executor { le := node.leaderElection sle, ok := le.(*singleRoleElector) - if !ok { - t.Fatal("Leader election is not singleRoleElector") - } + require.True(t, ok, "Leader election is not singleRoleElector") state := sle.state() - if state == nil { - t.Fatal("failoverState is nil") - } - bc := state.bc - if bc == nil { - t.Fatal("blockComponents is nil") - } - if bc.Executor != nil { - return bc.Executor.GetCoreExecutor() - } - t.Fatal("Executor not found in block components") + require.NotNil(t, state) + require.NotNil(t, state.bc) + require.NotNil(t, state.bc.Executor) return nil } diff --git a/pkg/da/types/namespace_test.go b/pkg/da/types/namespace_test.go index 1c068c8f9..1ea0ff32d 100644 --- a/pkg/da/types/namespace_test.go +++ b/pkg/da/types/namespace_test.go @@ -4,6 +4,8 @@ import ( "bytes" "encoding/hex" "testing" + + "github.com/stretchr/testify/require" ) func TestNamespaceV0Creation(t *testing.T) { @@ -22,7 +24,6 @@ func TestNamespaceV0Creation(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { ns, err := NewNamespaceV0(tt.data) - if tt.expectError { if err == nil { t.Errorf("%s: expected error but got nil", tt.description) @@ -33,13 +34,8 @@ func TestNamespaceV0Creation(t *testing.T) { return } - if err != nil { - t.Fatalf("%s: unexpected error: %v", tt.description, err) - } - if ns == nil { - t.Fatal("expected non-nil namespace but got nil") - } - + require.NoError(t, err) + require.NotNil(t, ns) if ns.Version != NamespaceVersionZero { t.Errorf("Version should be 0, got %d", ns.Version) } From 7ab8ce69111c69af32162469b191d5e52e39c16b Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Wed, 25 Feb 2026 12:47:56 +0100 Subject: [PATCH 7/9] Fix test --- test/e2e/failover_e2e_test.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/test/e2e/failover_e2e_test.go b/test/e2e/failover_e2e_test.go index 905b280cf..44d5d8cc9 100644 --- a/test/e2e/failover_e2e_test.go +++ b/test/e2e/failover_e2e_test.go @@ -32,6 +32,7 @@ import ( "github.com/evstack/ev-node/execution/evm" evmtest "github.com/evstack/ev-node/execution/evm/test" + da "github.com/evstack/ev-node/pkg/da" blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc" coreda "github.com/evstack/ev-node/pkg/da/types" "github.com/evstack/ev-node/pkg/rpc/client" @@ -612,6 +613,14 @@ func extractBlockHeight(t *testing.T, blob []byte) (uint64, types.Hash, string) t.Log("empty blob, skipping") return 0, nil, "" } + + // Decompress if the blob was compressed before DA submission. + if da.IsCompressed(blob) { + var err error + blob, err = da.Decompress(t.Context(), blob) + require.NoError(t, err, "failed to decompress blob") + } + var headerPb pb.SignedHeader if err := proto.Unmarshal(blob, &headerPb); err == nil { var signedHeader types.SignedHeader From daff28b80db691e765172da3419be54f75f7359f Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Wed, 25 Feb 2026 13:42:34 +0100 Subject: [PATCH 8/9] Linter --- pkg/da/types/namespace_test.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/pkg/da/types/namespace_test.go b/pkg/da/types/namespace_test.go index 1ea0ff32d..6abbaeb7c 100644 --- a/pkg/da/types/namespace_test.go +++ b/pkg/da/types/namespace_test.go @@ -110,12 +110,8 @@ func TestNamespaceFromBytes(t *testing.T) { return } - if err != nil { - t.Fatalf("%s: unexpected error: %v", tt.description, err) - } - if ns == nil { - t.Fatal("expected non-nil namespace but got nil") - } + require.NoError(t, err, tt.description) + require.NotNil(t, ns, "expected non-nil namespace but got nil") if !bytes.Equal(tt.input, ns.Bytes()) { t.Errorf("Should round-trip correctly, expected %v, got %v", tt.input, ns.Bytes()) } From db8945871a9c592866980b83c903507f915fbd0f Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Wed, 25 Feb 2026 13:50:12 +0100 Subject: [PATCH 9/9] Fix test --- node/execution_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/execution_test.go b/node/execution_test.go index 75d4a1a51..637b10a68 100644 --- a/node/execution_test.go +++ b/node/execution_test.go @@ -95,7 +95,7 @@ func getExecutorFromNode(t *testing.T, node *FullNode) coreexecutor.Executor { require.NotNil(t, state) require.NotNil(t, state.bc) require.NotNil(t, state.bc.Executor) - return nil + return state.bc.Executor.GetCoreExecutor() } func getTransactions(t *testing.T, executor coreexecutor.Executor, ctx context.Context) [][]byte {