diff --git a/CLAUDE.md b/CLAUDE.md index ead38632a..4edfbc2ae 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -109,6 +109,7 @@ go test -race ./package/... - Wrap errors with context using `fmt.Errorf` - Return errors early - Use custom error types for domain-specific errors +- Never start an error message with "failed to" ### Logging diff --git a/apps/evm/go.mod b/apps/evm/go.mod index 0d314a78d..d452f52ac 100644 --- a/apps/evm/go.mod +++ b/apps/evm/go.mod @@ -91,7 +91,7 @@ require ( github.com/ipld/go-ipld-prime v0.21.0 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect - github.com/klauspost/compress v1.18.0 // indirect + github.com/klauspost/compress v1.18.4 // indirect github.com/klauspost/cpuid/v2 v2.3.0 // indirect github.com/koron/go-ssdp v0.0.6 // indirect github.com/libp2p/go-buffer-pool v0.1.0 // indirect diff --git a/apps/evm/go.sum b/apps/evm/go.sum index 4561fbfb6..77f487c7e 100644 --- a/apps/evm/go.sum +++ b/apps/evm/go.sum @@ -714,8 +714,8 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= -github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= -github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c= +github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.3/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY= github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= diff --git a/apps/grpc/go.mod b/apps/grpc/go.mod index 19277f2e5..4da4eeb62 100644 --- a/apps/grpc/go.mod +++ b/apps/grpc/go.mod @@ -73,7 +73,7 @@ require ( github.com/ipld/go-ipld-prime v0.21.0 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect - github.com/klauspost/compress v1.18.0 // indirect + github.com/klauspost/compress v1.18.4 // indirect github.com/klauspost/cpuid/v2 v2.3.0 // indirect github.com/koron/go-ssdp v0.0.6 // indirect github.com/libp2p/go-buffer-pool v0.1.0 // indirect diff --git a/apps/grpc/go.sum b/apps/grpc/go.sum index 3baa6646d..1344f1f24 100644 --- a/apps/grpc/go.sum +++ b/apps/grpc/go.sum @@ -646,8 +646,8 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= -github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= -github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c= +github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.3/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY= github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= diff --git a/apps/testapp/go.mod b/apps/testapp/go.mod index f44f42f89..6fbd00b10 100644 --- a/apps/testapp/go.mod +++ b/apps/testapp/go.mod @@ -72,7 +72,7 @@ require ( github.com/ipld/go-ipld-prime v0.21.0 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect - github.com/klauspost/compress v1.18.0 // indirect + github.com/klauspost/compress v1.18.4 // indirect github.com/klauspost/cpuid/v2 v2.3.0 // indirect github.com/koron/go-ssdp v0.0.6 // indirect github.com/libp2p/go-buffer-pool v0.1.0 // indirect diff --git a/apps/testapp/go.sum b/apps/testapp/go.sum index 3baa6646d..1344f1f24 100644 --- a/apps/testapp/go.sum +++ b/apps/testapp/go.sum @@ -646,8 +646,8 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= -github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= -github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c= +github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.3/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY= github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= diff --git a/block/internal/da/client.go b/block/internal/da/client.go index 3185410cd..866484164 100644 --- a/block/internal/da/client.go +++ b/block/internal/da/client.go @@ -12,6 +12,7 @@ import ( "github.com/rs/zerolog" "github.com/evstack/ev-node/block/internal/common" + "github.com/evstack/ev-node/pkg/da" blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc" datypes "github.com/evstack/ev-node/pkg/da/types" ) @@ -87,9 +88,36 @@ func (c *client) Submit(ctx context.Context, data [][]byte, _ float64, namespace } } + // Select compression level based on backlog pressure: + // large batch = high backlog = prioritize speed; + // small batch = low backlog = prioritize ratio. + compLevel := da.LevelBest + switch { + case len(data) > 10: + compLevel = da.LevelFastest + case len(data) > 3: + compLevel = da.LevelDefault + } + blobs := make([]*blobrpc.Blob, len(data)) for i, raw := range data { - if uint64(len(raw)) > common.DefaultMaxBlobSize { + compressed, compErr := da.Compress(raw, compLevel) + if compErr != nil { + return datypes.ResultSubmit{ + BaseResult: datypes.BaseResult{ + Code: datypes.StatusError, + Message: fmt.Sprintf("compress blob %d: %v", i, compErr), + }, + } + } + c.logger.Debug(). + Int("original_size", len(raw)). + Int("compressed_size", len(compressed)). + Float64("ratio", float64(len(compressed))/float64(max(len(raw), 1))). + Int("level", int(compLevel)). + Msg("compressed blob for DA submission") + + if uint64(len(compressed)) > common.DefaultMaxBlobSize { return datypes.ResultSubmit{ BaseResult: datypes.BaseResult{ Code: datypes.StatusTooBig, @@ -97,7 +125,7 @@ func (c *client) Submit(ctx context.Context, data [][]byte, _ float64, namespace }, } } - blobs[i], err = blobrpc.NewBlobV0(ns, raw) + blobs[i], err = blobrpc.NewBlobV0(ns, compressed) if err != nil { return datypes.ResultSubmit{ BaseResult: datypes.BaseResult{ @@ -278,12 +306,23 @@ func (c *client) Retrieve(ctx context.Context, height uint64, namespace []byte) } } - // Extract IDs and data from the blobs. - ids := make([]datypes.ID, len(blobs)) - data := make([]datypes.Blob, len(blobs)) + // Extract IDs and data from the blobs, decompressing if needed. + // Malicious or corrupt blobs that fail decompression are logged and skipped. + ids := make([]datypes.ID, 0, len(blobs)) + data := make([]datypes.Blob, 0, len(blobs)) for i, b := range blobs { - ids[i] = blobrpc.MakeID(height, b.Commitment) - data[i] = b.Data() + decompressed, decompErr := da.Decompress(ctx, b.Data()) + if decompErr != nil { + c.logger.Warn(). + Err(decompErr). + Uint64("height", height). + Int("blob_index", i). + Int("blob_size", len(b.Data())). + Msg("skipping malicious or corrupt DA blob") + continue + } + ids = append(ids, blobrpc.MakeID(height, b.Commitment)) + data = append(data, decompressed) } c.logger.Debug().Int("num_blobs", len(blobs)).Msg("retrieved blobs") @@ -361,7 +400,16 @@ func (c *client) Get(ctx context.Context, ids []datypes.ID, namespace []byte) ([ if b == nil { continue } - res = append(res, b.Data()) + decompressed, decompErr := da.Decompress(ctx, b.Data()) + if decompErr != nil { + c.logger.Warn(). + Err(decompErr). + Uint64("height", height). + Int("blob_size", len(b.Data())). + Msg("skipping malicious or corrupt DA blob") + continue + } + res = append(res, decompressed) } return res, nil diff --git a/execution/evm/go.sum b/execution/evm/go.sum index 72fc3d49f..c422f17e2 100644 --- a/execution/evm/go.sum +++ b/execution/evm/go.sum @@ -186,8 +186,8 @@ github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7Bd github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABoLk/+KKHggpk= github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk= -github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= -github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c= +github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y= github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= github.com/koron/go-ssdp v0.0.6 h1:Jb0h04599eq/CY7rB5YEqPS83HmRfHP2azkxMN2rFtU= diff --git a/execution/evm/test/go.mod b/execution/evm/test/go.mod index 72d713909..673dd322e 100644 --- a/execution/evm/test/go.mod +++ b/execution/evm/test/go.mod @@ -109,7 +109,7 @@ require ( github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/ipfs/go-cid v0.6.0 // indirect github.com/jmhodges/levigo v1.0.0 // indirect - github.com/klauspost/compress v1.18.0 // indirect + github.com/klauspost/compress v1.18.4 // indirect github.com/klauspost/cpuid/v2 v2.3.0 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/kr/text v0.2.0 // indirect diff --git a/execution/evm/test/go.sum b/execution/evm/test/go.sum index 721ac1280..ceda6185b 100644 --- a/execution/evm/test/go.sum +++ b/execution/evm/test/go.sum @@ -404,8 +404,8 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= -github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c= +github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y= github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= diff --git a/go.mod b/go.mod index 41e0a93a0..653abdcb7 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( github.com/hashicorp/raft-boltdb v0.0.0-20251103221153-05f9dd7a5148 github.com/ipfs/go-datastore v0.9.1 github.com/ipfs/go-ds-badger4 v0.1.8 + github.com/klauspost/compress v1.18.4 github.com/libp2p/go-libp2p v0.47.0 github.com/libp2p/go-libp2p-kad-dht v0.38.0 github.com/libp2p/go-libp2p-pubsub v0.15.0 @@ -89,7 +90,6 @@ require ( github.com/ipld/go-ipld-prime v0.21.0 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect - github.com/klauspost/compress v1.18.0 // indirect github.com/klauspost/cpuid/v2 v2.3.0 // indirect github.com/koron/go-ssdp v0.0.6 // indirect github.com/libp2p/go-buffer-pool v0.1.0 // indirect diff --git a/go.sum b/go.sum index 3baa6646d..1344f1f24 100644 --- a/go.sum +++ b/go.sum @@ -646,8 +646,8 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= -github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= -github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c= +github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.3/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY= github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= diff --git a/node/execution_test.go b/node/execution_test.go index 8ea13c3fe..637b10a68 100644 --- a/node/execution_test.go +++ b/node/execution_test.go @@ -90,22 +90,12 @@ func waitForNodeInitialization(node *FullNode) error { func getExecutorFromNode(t *testing.T, node *FullNode) coreexecutor.Executor { le := node.leaderElection sle, ok := le.(*singleRoleElector) - if !ok { - t.Fatal("Leader election is not singleRoleElector") - } + require.True(t, ok, "Leader election is not singleRoleElector") state := sle.state() - if state == nil { - t.Fatal("failoverState is nil") - } - bc := state.bc - if bc == nil { - t.Fatal("blockComponents is nil") - } - if bc.Executor != nil { - return bc.Executor.GetCoreExecutor() - } - t.Fatal("Executor not found in block components") - return nil + require.NotNil(t, state) + require.NotNil(t, state.bc) + require.NotNil(t, state.bc.Executor) + return state.bc.Executor.GetCoreExecutor() } func getTransactions(t *testing.T, executor coreexecutor.Executor, ctx context.Context) [][]byte { diff --git a/pkg/da/compression.go b/pkg/da/compression.go new file mode 100644 index 000000000..434eeae0c --- /dev/null +++ b/pkg/da/compression.go @@ -0,0 +1,149 @@ +package da + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/klauspost/compress/zstd" +) + +// magic is the 4-byte prefix prepended to all compressed blobs. +// ASCII "ZSTD" = 0x5A 0x53 0x54 0x44. +var magic = []byte{0x5A, 0x53, 0x54, 0x44} + +// maxDecompressedSize is the maximum allowed decompressed output size. +// Matches the WithDecoderMaxMemory cap and provides early rejection +// by inspecting the zstd frame header before allocating anything. +const maxDecompressedSize = 7 * 1024 * 1024 // 7 MiB + +// decompressTimeout is the hard wall-clock cap on a single decompression. +// This guards against CPU-based decompression bombs (crafted inputs that +// are slow to decode) independently of the caller's context deadline. +const decompressTimeout = 500 * time.Millisecond + +// ErrDecompressedSizeExceeded is returned when the zstd frame header +// declares a decompressed size that exceeds the allowed limit. +var ErrDecompressedSizeExceeded = errors.New("compression: declared decompressed size exceeds limit") + +// CompressionLevel controls the speed/ratio trade-off for blob compression. +type CompressionLevel int + +const ( + // LevelFastest prioritizes speed over compression ratio. + // Use when backlog is high and throughput matters most. + LevelFastest CompressionLevel = iota + // LevelDefault balances speed and compression ratio. + LevelDefault + // LevelBest prioritizes compression ratio over speed. + // Use when backlog is low to save bandwidth and storage. + LevelBest +) + +// encoders holds one zstd encoder per compression level. Each is safe for +// concurrent use per the klauspost/compress documentation. +var encoders [3]*zstd.Encoder + +// decoder is a package-level singleton, safe for concurrent use. +var decoder *zstd.Decoder + +func init() { + levels := [3]zstd.EncoderLevel{ + zstd.SpeedFastest, + zstd.SpeedDefault, + zstd.SpeedBestCompression, + } + for i, lvl := range levels { + enc, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(lvl)) + if err != nil { + panic(fmt.Sprintf("compression: create zstd encoder (level %d): %v", i, err)) + } + encoders[i] = enc + } + const maxDecoderMemory = 7 * 1024 * 1024 // 7 MiB cap + var err error + decoder, err = zstd.NewReader(nil, zstd.WithDecoderMaxMemory(maxDecoderMemory)) + if err != nil { + panic(fmt.Sprintf("compression: create zstd decoder: %v", err)) + } +} + +// Compress compresses data using zstd at the given level and prepends the magic prefix. +// Returns the original data unchanged if it is empty. +func Compress(data []byte, level CompressionLevel) ([]byte, error) { + if len(data) == 0 { + return data, nil + } + + if level < LevelFastest || level > LevelBest { + level = LevelDefault + } + + compressed := encoders[level].EncodeAll(data, nil) + + // Prepend magic prefix + result := make([]byte, len(magic)+len(compressed)) + copy(result, magic) + copy(result[len(magic):], compressed) + + return result, nil +} + +// decodeResult carries the output of a DecodeAll goroutine. +type decodeResult struct { + data []byte + err error +} + +// Decompress decompresses data that was compressed with Compress. +// If the data does not have the magic prefix, it is returned as-is +// (backward-compatible passthrough for uncompressed blobs). +func Decompress(ctx context.Context, data []byte) ([]byte, error) { + if !IsCompressed(data) { + return data, nil + } + + payload := data[len(magic):] + + // Layer 1: Parse frame header to check declared decompressed size + // before allocating anything. This is a zero-cost upfront rejection. + var hdr zstd.Header + if err := hdr.Decode(payload); err == nil && hdr.HasFCS { + if hdr.FrameContentSize > maxDecompressedSize { + return nil, fmt.Errorf("%w: %d bytes declared, %d allowed", + ErrDecompressedSizeExceeded, hdr.FrameContentSize, maxDecompressedSize) + } + } + + // Layer 3: Apply the shorter of caller deadline and our hard cap. + ctx, cancel := context.WithTimeout(ctx, decompressTimeout) + defer cancel() + + ch := make(chan decodeResult, 1) + go func() { + out, err := decoder.DecodeAll(payload, nil) + ch <- decodeResult{data: out, err: err} + }() + + select { + case res := <-ch: + if res.err != nil { + return nil, fmt.Errorf("zstd decompress: %w", res.err) + } + return res.data, nil + case <-ctx.Done(): + return nil, fmt.Errorf("zstd decompress timeout: %w", ctx.Err()) + } +} + +// IsCompressed reports whether data starts with the compression magic prefix. +func IsCompressed(data []byte) bool { + if len(data) < len(magic) { + return false + } + return data[0] == magic[0] && + data[1] == magic[1] && + data[2] == magic[2] && + data[3] == magic[3] +} diff --git a/pkg/da/compression_test.go b/pkg/da/compression_test.go new file mode 100644 index 000000000..0f3b69349 --- /dev/null +++ b/pkg/da/compression_test.go @@ -0,0 +1,224 @@ +package da + +import ( + "bytes" + "context" + "crypto/rand" + "testing" + + "github.com/klauspost/compress/zstd" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCompressDecompress_RoundTrip(t *testing.T) { + tests := []struct { + name string + data []byte + }{ + { + name: "small payload", + data: []byte("hello world, this is a test payload for compression"), + }, + { + name: "protobuf-like repeated data", + data: bytes.Repeat([]byte{0x0a, 0x10, 0x08, 0x01, 0x12, 0x0c}, 1000), + }, + { + name: "single byte", + data: []byte{0xFF}, + }, + { + name: "1MB payload", + data: bytes.Repeat([]byte("rollkit blob compression test data "), 30000), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + compressed, err := Compress(tt.data, LevelDefault) + require.NoError(t, err) + + assert.True(t, IsCompressed(compressed), "compressed data should have magic prefix") + + decompressed, err := Decompress(context.Background(), compressed) + require.NoError(t, err) + + assert.Equal(t, tt.data, decompressed, "round-trip should preserve data") + }) + } +} + +func TestCompress_AllLevelsRoundTrip(t *testing.T) { + data := bytes.Repeat([]byte("adaptive compression level test "), 5000) + levels := []struct { + name string + level CompressionLevel + }{ + {"fastest", LevelFastest}, + {"default", LevelDefault}, + {"best", LevelBest}, + } + + var sizes []int + for _, lvl := range levels { + t.Run(lvl.name, func(t *testing.T) { + compressed, err := Compress(data, lvl.level) + require.NoError(t, err) + assert.True(t, IsCompressed(compressed)) + + sizes = append(sizes, len(compressed)) + + decompressed, err := Decompress(context.Background(), compressed) + require.NoError(t, err) + assert.Equal(t, data, decompressed) + + t.Logf("level=%s compressed=%d ratio=%.4f", lvl.name, len(compressed), float64(len(compressed))/float64(len(data))) + }) + } + + // Best should produce equal or smaller output than Fastest + if len(sizes) == 3 { + assert.LessOrEqual(t, sizes[2], sizes[0], "LevelBest should compress at least as well as LevelFastest") + } +} + +func TestCompress_Empty(t *testing.T) { + compressed, err := Compress(nil, LevelDefault) + require.NoError(t, err) + assert.Nil(t, compressed) + + compressed, err = Compress([]byte{}, LevelDefault) + require.NoError(t, err) + assert.Empty(t, compressed) +} + +func TestDecompress_UncompressedPassthrough(t *testing.T) { + // Data without magic prefix should pass through unchanged + raw := []byte("this is uncompressed protobuf data") + result, err := Decompress(context.Background(), raw) + require.NoError(t, err) + assert.Equal(t, raw, result) +} + +func TestDecompress_Empty(t *testing.T) { + result, err := Decompress(context.Background(), nil) + require.NoError(t, err) + assert.Nil(t, result) + + result, err = Decompress(context.Background(), []byte{}) + require.NoError(t, err) + assert.Empty(t, result) +} + +func TestDecompress_ShortData(t *testing.T) { + // Data shorter than magic prefix should pass through + result, err := Decompress(context.Background(), []byte{0x5A, 0x53}) + require.NoError(t, err) + assert.Equal(t, []byte{0x5A, 0x53}, result) +} + +func TestDecompress_CorruptCompressedData(t *testing.T) { + // Magic prefix followed by invalid zstd data + corrupt := append([]byte{0x5A, 0x53, 0x54, 0x44}, []byte("not valid zstd")...) + _, err := Decompress(context.Background(), corrupt) + assert.Error(t, err, "should fail on corrupt compressed data") +} + +func TestIsCompressed(t *testing.T) { + tests := []struct { + name string + data []byte + expected bool + }{ + {name: "nil", data: nil, expected: false}, + {name: "empty", data: []byte{}, expected: false}, + {name: "short", data: []byte{0x5A}, expected: false}, + {name: "magic prefix only", data: []byte{0x5A, 0x53, 0x54, 0x44}, expected: true}, + {name: "magic with data", data: []byte{0x5A, 0x53, 0x54, 0x44, 0x01, 0x02}, expected: true}, + {name: "wrong prefix", data: []byte{0x00, 0x53, 0x54, 0x44}, expected: false}, + {name: "protobuf data", data: []byte{0x0a, 0x10, 0x08, 0x01}, expected: false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.expected, IsCompressed(tt.data)) + }) + } +} + +func TestCompress_AchievesCompression(t *testing.T) { + data := bytes.Repeat([]byte("rollkit block data with repeated content "), 10000) + compressed, err := Compress(data, LevelDefault) + require.NoError(t, err) + + ratio := float64(len(compressed)) / float64(len(data)) + t.Logf("compression ratio: %.4f (original: %d, compressed: %d)", ratio, len(data), len(compressed)) + assert.Less(t, ratio, 0.1, "highly repetitive data should compress to <10%% of original size") +} + +func TestCompress_RandomDataStillWorks(t *testing.T) { + data := make([]byte, 4096) + _, err := rand.Read(data) + require.NoError(t, err) + + compressed, err := Compress(data, LevelFastest) + require.NoError(t, err) + + decompressed, err := Decompress(context.Background(), compressed) + require.NoError(t, err) + assert.Equal(t, data, decompressed) +} + +func TestDecompress_DataStartingWithMagicButUncompressed(t *testing.T) { + fakeCompressed := append([]byte{0x5A, 0x53, 0x54, 0x44}, bytes.Repeat([]byte{0x00}, 100)...) + _, err := Decompress(context.Background(), fakeCompressed) + assert.Error(t, err, "data starting with magic but containing invalid zstd should error") +} + +func TestCompress_InvalidLevel(t *testing.T) { + // Out-of-range level should fall back to LevelDefault + data := []byte("test data for invalid level") + compressed, err := Compress(data, CompressionLevel(99)) + require.NoError(t, err) + + decompressed, err := Decompress(context.Background(), compressed) + require.NoError(t, err) + assert.Equal(t, data, decompressed) +} + +func TestDecompress_ContextCanceled(t *testing.T) { + data := []byte("test data for context cancellation") + compressed, err := Compress(data, LevelDefault) + require.NoError(t, err) + + // Pre-canceled context should cause Decompress to return an error. + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + _, err = Decompress(ctx, compressed) + assert.Error(t, err, "decompress with canceled context should fail") + assert.ErrorIs(t, err, context.Canceled) +} + +func TestDecompress_OversizedFrameRejected(t *testing.T) { + // Build a fake zstd frame header that declares 100 MiB decompressed size. + // This should be rejected by the frame header pre-check before any + // decompression occurs. + hdr := zstd.Header{ + SingleSegment: true, + HasFCS: true, + FrameContentSize: 100 * 1024 * 1024, // 100 MiB — way over the 7 MiB limit + } + frame, err := hdr.AppendTo(nil) + require.NoError(t, err) + + // Prepend our custom magic prefix + blob := make([]byte, len(magic)+len(frame)) + copy(blob, magic) + copy(blob[len(magic):], frame) + + _, err = Decompress(context.Background(), blob) + assert.Error(t, err, "should reject blob declaring oversized decompressed output") + assert.ErrorIs(t, err, ErrDecompressedSizeExceeded) +} diff --git a/pkg/da/types/namespace_test.go b/pkg/da/types/namespace_test.go index 1c068c8f9..6abbaeb7c 100644 --- a/pkg/da/types/namespace_test.go +++ b/pkg/da/types/namespace_test.go @@ -4,6 +4,8 @@ import ( "bytes" "encoding/hex" "testing" + + "github.com/stretchr/testify/require" ) func TestNamespaceV0Creation(t *testing.T) { @@ -22,7 +24,6 @@ func TestNamespaceV0Creation(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { ns, err := NewNamespaceV0(tt.data) - if tt.expectError { if err == nil { t.Errorf("%s: expected error but got nil", tt.description) @@ -33,13 +34,8 @@ func TestNamespaceV0Creation(t *testing.T) { return } - if err != nil { - t.Fatalf("%s: unexpected error: %v", tt.description, err) - } - if ns == nil { - t.Fatal("expected non-nil namespace but got nil") - } - + require.NoError(t, err) + require.NotNil(t, ns) if ns.Version != NamespaceVersionZero { t.Errorf("Version should be 0, got %d", ns.Version) } @@ -114,12 +110,8 @@ func TestNamespaceFromBytes(t *testing.T) { return } - if err != nil { - t.Fatalf("%s: unexpected error: %v", tt.description, err) - } - if ns == nil { - t.Fatal("expected non-nil namespace but got nil") - } + require.NoError(t, err, tt.description) + require.NotNil(t, ns, "expected non-nil namespace but got nil") if !bytes.Equal(tt.input, ns.Bytes()) { t.Errorf("Should round-trip correctly, expected %v, got %v", tt.input, ns.Bytes()) } diff --git a/test/e2e/failover_e2e_test.go b/test/e2e/failover_e2e_test.go index 905b280cf..44d5d8cc9 100644 --- a/test/e2e/failover_e2e_test.go +++ b/test/e2e/failover_e2e_test.go @@ -32,6 +32,7 @@ import ( "github.com/evstack/ev-node/execution/evm" evmtest "github.com/evstack/ev-node/execution/evm/test" + da "github.com/evstack/ev-node/pkg/da" blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc" coreda "github.com/evstack/ev-node/pkg/da/types" "github.com/evstack/ev-node/pkg/rpc/client" @@ -612,6 +613,14 @@ func extractBlockHeight(t *testing.T, blob []byte) (uint64, types.Hash, string) t.Log("empty blob, skipping") return 0, nil, "" } + + // Decompress if the blob was compressed before DA submission. + if da.IsCompressed(blob) { + var err error + blob, err = da.Decompress(t.Context(), blob) + require.NoError(t, err, "failed to decompress blob") + } + var headerPb pb.SignedHeader if err := proto.Unmarshal(blob, &headerPb); err == nil { var signedHeader types.SignedHeader diff --git a/test/e2e/go.mod b/test/e2e/go.mod index 46710aa54..87a8ba3e1 100644 --- a/test/e2e/go.mod +++ b/test/e2e/go.mod @@ -172,7 +172,7 @@ require ( github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect github.com/jmhodges/levigo v1.0.0 // indirect - github.com/klauspost/compress v1.18.0 // indirect + github.com/klauspost/compress v1.18.4 // indirect github.com/klauspost/cpuid/v2 v2.3.0 // indirect github.com/koron/go-ssdp v0.0.6 // indirect github.com/kr/pretty v0.3.1 // indirect diff --git a/test/e2e/go.sum b/test/e2e/go.sum index 782994eb2..979c62d77 100644 --- a/test/e2e/go.sum +++ b/test/e2e/go.sum @@ -632,8 +632,8 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.11.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= -github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= -github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c= +github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y= github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=