From 01ac20d27b385c3b118d3e1c1a1c1bde19a8db48 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Tue, 13 Jan 2026 17:18:28 +0100 Subject: [PATCH 01/15] feat(submitting): add posting strategies --- block/internal/common/consts.go | 2 +- .../internal/submitting/batching_strategy.go | 285 +++++++++ .../submitting/batching_strategy_test.go | 593 ++++++++++++++++++ block/internal/submitting/submitter.go | 164 ++++- block/internal/submitting/submitter_test.go | 15 + .../syncing/syncer_forced_inclusion_test.go | 8 +- pkg/config/config.go | 20 +- pkg/config/config_test.go | 2 +- pkg/config/defaults.go | 4 + test/testda/dummy.go | 4 +- tools/local-da/local.go | 2 +- 11 files changed, 1055 insertions(+), 44 deletions(-) create mode 100644 block/internal/submitting/batching_strategy.go create mode 100644 block/internal/submitting/batching_strategy_test.go diff --git a/block/internal/common/consts.go b/block/internal/common/consts.go index 255a5da9a7..bb584e7aef 100644 --- a/block/internal/common/consts.go +++ b/block/internal/common/consts.go @@ -1,3 +1,3 @@ package common -const DefaultMaxBlobSize = 2 * 1024 * 1024 // 2MB fallback blob size limit +const DefaultMaxBlobSize = 8 * 1024 * 1024 // 8MB fallback blob size limit (Celestia's current limit) diff --git a/block/internal/submitting/batching_strategy.go b/block/internal/submitting/batching_strategy.go new file mode 100644 index 0000000000..c461cd6389 --- /dev/null +++ b/block/internal/submitting/batching_strategy.go @@ -0,0 +1,285 @@ +package submitting + +import ( + "fmt" + "time" + + "github.com/evstack/ev-node/block/internal/common" + "github.com/evstack/ev-node/pkg/config" +) + +// BatchingStrategy defines the interface for different batching strategies +type BatchingStrategy interface { + // ShouldSubmit determines if a batch should be submitted based on the strategy + // Returns true if submission should happen now + ShouldSubmit(pendingCount uint64, totalSize int, maxBlobSize int, timeSinceLastSubmit time.Duration) bool + + // Name returns the name of the strategy + Name() string +} + +// ImmediateStrategy submits as soon as any items are available +type ImmediateStrategy struct{} + +func (s *ImmediateStrategy) ShouldSubmit(pendingCount uint64, totalSize int, maxBlobSize int, timeSinceLastSubmit time.Duration) bool { + return pendingCount > 0 +} + +func (s *ImmediateStrategy) Name() string { + return "immediate" +} + +// SizeBasedStrategy waits until the batch reaches a certain size threshold +type SizeBasedStrategy struct { + sizeThreshold float64 // fraction of max blob size (0.0 to 1.0) + minItems uint64 +} + +func NewSizeBasedStrategy(sizeThreshold float64, minItems uint64) *SizeBasedStrategy { + if sizeThreshold <= 0 || sizeThreshold > 1.0 { + sizeThreshold = 0.8 // default to 80% + } + if minItems == 0 { + minItems = 1 + } + return &SizeBasedStrategy{ + sizeThreshold: sizeThreshold, + minItems: minItems, + } +} + +func (s *SizeBasedStrategy) ShouldSubmit(pendingCount uint64, totalSize int, maxBlobSize int, timeSinceLastSubmit time.Duration) bool { + if pendingCount < s.minItems { + return false + } + + threshold := int(float64(maxBlobSize) * s.sizeThreshold) + return totalSize >= threshold +} + +func (s *SizeBasedStrategy) Name() string { + return "size" +} + +// TimeBasedStrategy submits after a certain time interval +type TimeBasedStrategy struct { + maxDelay time.Duration + minItems uint64 +} + +func NewTimeBasedStrategy(maxDelay time.Duration, minItems uint64) *TimeBasedStrategy { + if maxDelay == 0 { + maxDelay = 6 * time.Second // default to DA block time + } + if minItems == 0 { + minItems = 1 + } + return &TimeBasedStrategy{ + maxDelay: maxDelay, + minItems: minItems, + } +} + +func (s *TimeBasedStrategy) ShouldSubmit(pendingCount uint64, totalSize int, maxBlobSize int, timeSinceLastSubmit time.Duration) bool { + if pendingCount < s.minItems { + return false + } + + return timeSinceLastSubmit >= s.maxDelay +} + +func (s *TimeBasedStrategy) Name() string { + return "time" +} + +// AdaptiveStrategy balances between size and time constraints +// It submits when either: +// - The batch reaches the size threshold, OR +// - The max delay is reached and we have at least min items +type AdaptiveStrategy struct { + sizeThreshold float64 + maxDelay time.Duration + minItems uint64 +} + +func NewAdaptiveStrategy(sizeThreshold float64, maxDelay time.Duration, minItems uint64) *AdaptiveStrategy { + if sizeThreshold <= 0 || sizeThreshold > 1.0 { + sizeThreshold = 0.8 // default to 80% + } + if maxDelay == 0 { + maxDelay = 6 * time.Second // default to DA block time + } + if minItems == 0 { + minItems = 1 + } + return &AdaptiveStrategy{ + sizeThreshold: sizeThreshold, + maxDelay: maxDelay, + minItems: minItems, + } +} + +func (s *AdaptiveStrategy) ShouldSubmit(pendingCount uint64, totalSize int, maxBlobSize int, timeSinceLastSubmit time.Duration) bool { + if pendingCount < s.minItems { + return false + } + + // Submit if we've reached the size threshold + threshold := int(float64(maxBlobSize) * s.sizeThreshold) + if totalSize >= threshold { + return true + } + + // Submit if max delay has been reached + if timeSinceLastSubmit >= s.maxDelay { + return true + } + + return false +} + +func (s *AdaptiveStrategy) Name() string { + return "adaptive" +} + +// BatchingStrategyFactory creates a batching strategy based on configuration +func BatchingStrategyFactory(cfg config.DAConfig) (BatchingStrategy, error) { + switch cfg.BatchingStrategy { + case "immediate": + return &ImmediateStrategy{}, nil + case "size": + return NewSizeBasedStrategy(cfg.BatchSizeThreshold, cfg.BatchMinItems), nil + case "time": + return NewTimeBasedStrategy(cfg.BatchMaxDelay.Duration, cfg.BatchMinItems), nil + case "adaptive": + return NewAdaptiveStrategy(cfg.BatchSizeThreshold, cfg.BatchMaxDelay.Duration, cfg.BatchMinItems), nil + default: + return nil, fmt.Errorf("unknown batching strategy: %s", cfg.BatchingStrategy) + } +} + +// estimateBatchSize estimates the total size of pending items +// This is a helper function that can be used by the submitter +func estimateBatchSize(marshaled [][]byte) int { + totalSize := 0 + for _, data := range marshaled { + totalSize += len(data) + } + return totalSize +} + +// optimizeBatchSize returns the optimal number of items to include in a batch +// to maximize blob utilization while staying under the size limit +func optimizeBatchSize(marshaled [][]byte, maxBlobSize int, targetUtilization float64) int { + if targetUtilization <= 0 || targetUtilization > 1.0 { + targetUtilization = 0.9 // default to 90% utilization + } + + targetSize := int(float64(maxBlobSize) * targetUtilization) + totalSize := 0 + count := 0 + + for i, data := range marshaled { + itemSize := len(data) + + // If adding this item would exceed max blob size, stop + if totalSize+itemSize > maxBlobSize { + break + } + + totalSize += itemSize + count = i + 1 + + // If we've reached our target utilization, we can stop + // This helps create more predictably-sized batches + if totalSize >= targetSize { + break + } + } + + return count +} + +// BatchMetrics provides information about batch efficiency +type BatchMetrics struct { + ItemCount int + TotalBytes int + MaxBlobBytes int + Utilization float64 // percentage of max blob size used + EstimatedCost float64 // estimated cost relative to single full blob +} + +// calculateBatchMetrics computes metrics for a batch +func calculateBatchMetrics(itemCount int, totalBytes int, maxBlobBytes int) BatchMetrics { + utilization := 0.0 + if maxBlobBytes > 0 { + utilization = float64(totalBytes) / float64(maxBlobBytes) + } + + // Rough cost estimate: each blob submission has a fixed cost + // Higher utilization = better cost efficiency + estimatedCost := 1.0 + if utilization > 0 { + // If we're only using 50% of the blob, we're paying 2x per byte effectively + estimatedCost = 1.0 / utilization + } + + return BatchMetrics{ + ItemCount: itemCount, + TotalBytes: totalBytes, + MaxBlobBytes: maxBlobBytes, + Utilization: utilization, + EstimatedCost: estimatedCost, + } +} + +// ShouldWaitForMoreItems determines if we should wait for more items +// to improve batch efficiency +func ShouldWaitForMoreItems( + currentCount uint64, + currentSize int, + maxBlobSize int, + minUtilization float64, + hasMoreExpected bool, +) bool { + // Don't wait if we're already at or near capacity + if currentSize >= int(float64(maxBlobSize)*0.95) { + return false + } + + // Don't wait if we don't expect more items soon + if !hasMoreExpected { + return false + } + + // Wait if current utilization is below minimum threshold + // Use epsilon for floating point comparison + const epsilon = 0.001 + currentUtilization := float64(currentSize) / float64(maxBlobSize) + if currentUtilization < minUtilization-epsilon { + return true + } + + return false +} + +// BatchingConfig holds configuration for batch optimization +type BatchingConfig struct { + MaxBlobSize int + Strategy BatchingStrategy + TargetUtilization float64 +} + +// NewBatchingConfig creates a new batching configuration +func NewBatchingConfig(cfg config.DAConfig) (*BatchingConfig, error) { + strategy, err := BatchingStrategyFactory(cfg) + if err != nil { + return nil, err + } + + return &BatchingConfig{ + MaxBlobSize: common.DefaultMaxBlobSize, + Strategy: strategy, + TargetUtilization: cfg.BatchSizeThreshold, + }, nil +} diff --git a/block/internal/submitting/batching_strategy_test.go b/block/internal/submitting/batching_strategy_test.go new file mode 100644 index 0000000000..71fcce60b1 --- /dev/null +++ b/block/internal/submitting/batching_strategy_test.go @@ -0,0 +1,593 @@ +package submitting + +import ( + "testing" + "time" + + "github.com/evstack/ev-node/block/internal/common" + "github.com/evstack/ev-node/pkg/config" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestImmediateStrategy(t *testing.T) { + strategy := &ImmediateStrategy{} + + tests := []struct { + name string + pendingCount uint64 + totalSize int + expected bool + }{ + { + name: "no pending items", + pendingCount: 0, + totalSize: 0, + expected: false, + }, + { + name: "one pending item", + pendingCount: 1, + totalSize: 1000, + expected: true, + }, + { + name: "multiple pending items", + pendingCount: 10, + totalSize: 10000, + expected: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := strategy.ShouldSubmit(tt.pendingCount, tt.totalSize, common.DefaultMaxBlobSize, 0) + assert.Equal(t, tt.expected, result) + }) + } + + assert.Equal(t, "immediate", strategy.Name()) +} + +func TestSizeBasedStrategy(t *testing.T) { + maxBlobSize := 8 * 1024 * 1024 // 8MB + + tests := []struct { + name string + sizeThreshold float64 + minItems uint64 + pendingCount uint64 + totalSize int + expectedSubmit bool + }{ + { + name: "below threshold and min items", + sizeThreshold: 0.8, + minItems: 2, + pendingCount: 1, + totalSize: 1 * 1024 * 1024, // 1MB + expectedSubmit: false, + }, + { + name: "below threshold but has min items", + sizeThreshold: 0.8, + minItems: 1, + pendingCount: 5, + totalSize: 4 * 1024 * 1024, // 4MB (50% of 8MB) + expectedSubmit: false, + }, + { + name: "at threshold with min items", + sizeThreshold: 0.8, + minItems: 1, + pendingCount: 10, + totalSize: int(float64(maxBlobSize) * 0.8), // 80% of max + expectedSubmit: true, + }, + { + name: "above threshold", + sizeThreshold: 0.8, + minItems: 1, + pendingCount: 20, + totalSize: 7 * 1024 * 1024, // 7MB (87.5% of 8MB) + expectedSubmit: true, + }, + { + name: "full blob", + sizeThreshold: 0.8, + minItems: 1, + pendingCount: 100, + totalSize: maxBlobSize, + expectedSubmit: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + strategy := NewSizeBasedStrategy(tt.sizeThreshold, tt.minItems) + result := strategy.ShouldSubmit(tt.pendingCount, tt.totalSize, maxBlobSize, 0) + assert.Equal(t, tt.expectedSubmit, result) + }) + } + + // Test invalid threshold defaults to 0.8 + strategy := NewSizeBasedStrategy(1.5, 1) + assert.Equal(t, 0.8, strategy.sizeThreshold) + + strategy = NewSizeBasedStrategy(0, 1) + assert.Equal(t, 0.8, strategy.sizeThreshold) + + assert.Equal(t, "size", strategy.Name()) +} + +func TestTimeBasedStrategy(t *testing.T) { + maxDelay := 6 * time.Second + maxBlobSize := 8 * 1024 * 1024 + + tests := []struct { + name string + minItems uint64 + pendingCount uint64 + totalSize int + timeSinceLastSubmit time.Duration + expectedSubmit bool + }{ + { + name: "below min items", + minItems: 2, + pendingCount: 1, + totalSize: 1 * 1024 * 1024, + timeSinceLastSubmit: 10 * time.Second, + expectedSubmit: false, + }, + { + name: "before max delay", + minItems: 1, + pendingCount: 5, + totalSize: 4 * 1024 * 1024, + timeSinceLastSubmit: 3 * time.Second, + expectedSubmit: false, + }, + { + name: "at max delay", + minItems: 1, + pendingCount: 3, + totalSize: 2 * 1024 * 1024, + timeSinceLastSubmit: 6 * time.Second, + expectedSubmit: true, + }, + { + name: "after max delay", + minItems: 1, + pendingCount: 2, + totalSize: 1 * 1024 * 1024, + timeSinceLastSubmit: 10 * time.Second, + expectedSubmit: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + strategy := NewTimeBasedStrategy(maxDelay, tt.minItems) + result := strategy.ShouldSubmit(tt.pendingCount, tt.totalSize, maxBlobSize, tt.timeSinceLastSubmit) + assert.Equal(t, tt.expectedSubmit, result) + }) + } + + assert.Equal(t, "time", NewTimeBasedStrategy(maxDelay, 1).Name()) +} + +func TestAdaptiveStrategy(t *testing.T) { + maxBlobSize := 8 * 1024 * 1024 // 8MB + sizeThreshold := 0.8 + maxDelay := 6 * time.Second + + tests := []struct { + name string + minItems uint64 + pendingCount uint64 + totalSize int + timeSinceLastSubmit time.Duration + expectedSubmit bool + reason string + }{ + { + name: "below min items", + minItems: 3, + pendingCount: 2, + totalSize: 7 * 1024 * 1024, + timeSinceLastSubmit: 10 * time.Second, + expectedSubmit: false, + reason: "not enough items", + }, + { + name: "size threshold reached", + minItems: 1, + pendingCount: 10, + totalSize: int(float64(maxBlobSize) * 0.85), // 85% + timeSinceLastSubmit: 1 * time.Second, + expectedSubmit: true, + reason: "size threshold met", + }, + { + name: "time threshold reached", + minItems: 1, + pendingCount: 2, + totalSize: 1 * 1024 * 1024, // Only 12.5% + timeSinceLastSubmit: 7 * time.Second, + expectedSubmit: true, + reason: "time threshold met", + }, + { + name: "neither threshold reached", + minItems: 1, + pendingCount: 5, + totalSize: 4 * 1024 * 1024, // 50% + timeSinceLastSubmit: 3 * time.Second, + expectedSubmit: false, + reason: "waiting for threshold", + }, + { + name: "both thresholds reached", + minItems: 1, + pendingCount: 20, + totalSize: 7 * 1024 * 1024, // 87.5% + timeSinceLastSubmit: 10 * time.Second, + expectedSubmit: true, + reason: "both thresholds met", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + strategy := NewAdaptiveStrategy(sizeThreshold, maxDelay, tt.minItems) + result := strategy.ShouldSubmit(tt.pendingCount, tt.totalSize, maxBlobSize, tt.timeSinceLastSubmit) + assert.Equal(t, tt.expectedSubmit, result, "reason: %s", tt.reason) + }) + } + + // Test defaults + strategy := NewAdaptiveStrategy(0, 0, 0) + assert.Equal(t, 0.8, strategy.sizeThreshold) + assert.Equal(t, 6*time.Second, strategy.maxDelay) + assert.Equal(t, uint64(1), strategy.minItems) + + assert.Equal(t, "adaptive", strategy.Name()) +} + +func TestBatchingStrategyFactory(t *testing.T) { + tests := []struct { + name string + strategyName string + expectedType string + expectError bool + }{ + { + name: "immediate strategy", + strategyName: "immediate", + expectedType: "immediate", + expectError: false, + }, + { + name: "size strategy", + strategyName: "size", + expectedType: "size", + expectError: false, + }, + { + name: "time strategy", + strategyName: "time", + expectedType: "time", + expectError: false, + }, + { + name: "adaptive strategy", + strategyName: "adaptive", + expectedType: "adaptive", + expectError: false, + }, + { + name: "unknown strategy", + strategyName: "unknown", + expectError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := config.DAConfig{ + BatchingStrategy: tt.strategyName, + BatchSizeThreshold: 0.8, + BatchMaxDelay: config.DurationWrapper{Duration: 6 * time.Second}, + BatchMinItems: 1, + } + + strategy, err := BatchingStrategyFactory(cfg) + + if tt.expectError { + assert.Error(t, err) + assert.Nil(t, strategy) + } else { + require.NoError(t, err) + require.NotNil(t, strategy) + assert.Equal(t, tt.expectedType, strategy.Name()) + } + }) + } +} + +func TestEstimateBatchSize(t *testing.T) { + tests := []struct { + name string + marshaled [][]byte + expectedSize int + }{ + { + name: "empty batch", + marshaled: [][]byte{}, + expectedSize: 0, + }, + { + name: "single item", + marshaled: [][]byte{ + make([]byte, 1024), + }, + expectedSize: 1024, + }, + { + name: "multiple items", + marshaled: [][]byte{ + make([]byte, 1024), + make([]byte, 2048), + make([]byte, 512), + }, + expectedSize: 3584, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + size := estimateBatchSize(tt.marshaled) + assert.Equal(t, tt.expectedSize, size) + }) + } +} + +func TestOptimizeBatchSize(t *testing.T) { + maxBlobSize := 8 * 1024 * 1024 // 8MB + + tests := []struct { + name string + itemSizes []int + targetUtilization float64 + expectedCount int + expectedTotalSize int + }{ + { + name: "empty batch", + itemSizes: []int{}, + targetUtilization: 0.9, + expectedCount: 0, + expectedTotalSize: 0, + }, + { + name: "single small item", + itemSizes: []int{1024}, + targetUtilization: 0.9, + expectedCount: 1, + expectedTotalSize: 1024, + }, + { + name: "reach target utilization", + itemSizes: []int{1024 * 1024, 2 * 1024 * 1024, 3 * 1024 * 1024, 1024 * 1024}, + targetUtilization: 0.8, + expectedCount: 4, // 1+2+3+1 = 7MB (87.5% of 8MB, exceeds 80% target so stops) + expectedTotalSize: 7 * 1024 * 1024, + }, + { + name: "stop at max blob size", + itemSizes: []int{7 * 1024 * 1024, 2 * 1024 * 1024}, + targetUtilization: 0.9, + expectedCount: 1, // Second item would exceed max + expectedTotalSize: 7 * 1024 * 1024, + }, + { + name: "all items fit below target", + itemSizes: []int{1024 * 1024, 1024 * 1024, 1024 * 1024}, + targetUtilization: 0.9, + expectedCount: 3, + expectedTotalSize: 3 * 1024 * 1024, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create marshaled data + marshaled := make([][]byte, len(tt.itemSizes)) + for i, size := range tt.itemSizes { + marshaled[i] = make([]byte, size) + } + + count := optimizeBatchSize(marshaled, maxBlobSize, tt.targetUtilization) + assert.Equal(t, tt.expectedCount, count) + + if count > 0 { + totalSize := estimateBatchSize(marshaled[:count]) + assert.Equal(t, tt.expectedTotalSize, totalSize) + } + }) + } +} + +func TestCalculateBatchMetrics(t *testing.T) { + maxBlobSize := 8 * 1024 * 1024 + + tests := []struct { + name string + itemCount int + totalBytes int + expectedUtil float64 + expectedCostRange [2]float64 // min, max + }{ + { + name: "empty batch", + itemCount: 0, + totalBytes: 0, + expectedUtil: 0.0, + expectedCostRange: [2]float64{0, 999999}, // cost is undefined for empty + }, + { + name: "half full", + itemCount: 10, + totalBytes: 4 * 1024 * 1024, + expectedUtil: 0.5, + expectedCostRange: [2]float64{2.0, 2.0}, // 1/0.5 = 2.0x cost + }, + { + name: "80% full", + itemCount: 20, + totalBytes: int(float64(maxBlobSize) * 0.8), + expectedUtil: 0.8, + expectedCostRange: [2]float64{1.25, 1.25}, // 1/0.8 = 1.25x cost + }, + { + name: "nearly full", + itemCount: 50, + totalBytes: int(float64(maxBlobSize) * 0.95), + expectedUtil: 0.95, + expectedCostRange: [2]float64{1.05, 1.06}, // ~1.05x cost + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + metrics := calculateBatchMetrics(tt.itemCount, tt.totalBytes, maxBlobSize) + + assert.Equal(t, tt.itemCount, metrics.ItemCount) + assert.Equal(t, tt.totalBytes, metrics.TotalBytes) + assert.Equal(t, maxBlobSize, metrics.MaxBlobBytes) + assert.InDelta(t, tt.expectedUtil, metrics.Utilization, 0.01) + + if tt.totalBytes > 0 { + assert.InEpsilon(t, (tt.expectedCostRange[0]+tt.expectedCostRange[1])/2, + metrics.EstimatedCost, 0.01, "cost should be within range") + } + }) + } +} + +func TestShouldWaitForMoreItems(t *testing.T) { + maxBlobSize := 8 * 1024 * 1024 + + tests := []struct { + name string + currentCount uint64 + currentSize int + minUtilization float64 + hasMoreExpected bool + expectedWait bool + }{ + { + name: "near capacity", + currentCount: 50, + currentSize: int(float64(maxBlobSize) * 0.96), + minUtilization: 0.8, + hasMoreExpected: true, + expectedWait: false, + }, + { + name: "below threshold but no more expected", + currentCount: 10, + currentSize: 4 * 1024 * 1024, + minUtilization: 0.8, + hasMoreExpected: false, + expectedWait: false, + }, + { + name: "below threshold with more expected", + currentCount: 10, + currentSize: 4 * 1024 * 1024, // 50% + minUtilization: 0.8, + hasMoreExpected: true, + expectedWait: true, + }, + { + name: "at threshold", + currentCount: 20, + currentSize: int(float64(maxBlobSize) * 0.8), + minUtilization: 0.8, + hasMoreExpected: true, + expectedWait: false, // At threshold, no need to wait + }, + { + name: "above threshold", + currentCount: 30, + currentSize: int(float64(maxBlobSize) * 0.85), + minUtilization: 0.8, + hasMoreExpected: true, + expectedWait: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := ShouldWaitForMoreItems( + tt.currentCount, + tt.currentSize, + maxBlobSize, + tt.minUtilization, + tt.hasMoreExpected, + ) + assert.Equal(t, tt.expectedWait, result) + }) + } +} + +func TestNewBatchingConfig(t *testing.T) { + cfg := config.DAConfig{ + BatchingStrategy: "adaptive", + BatchSizeThreshold: 0.85, + BatchMaxDelay: config.DurationWrapper{Duration: 12 * time.Second}, + BatchMinItems: 5, + } + + batchConfig, err := NewBatchingConfig(cfg) + require.NoError(t, err) + require.NotNil(t, batchConfig) + + assert.Equal(t, common.DefaultMaxBlobSize, batchConfig.MaxBlobSize) + assert.Equal(t, "adaptive", batchConfig.Strategy.Name()) + assert.Equal(t, 0.85, batchConfig.TargetUtilization) +} + +func TestBatchingStrategiesComparison(t *testing.T) { + // This test demonstrates how different strategies behave with the same input + maxBlobSize := 8 * 1024 * 1024 + pendingCount := uint64(10) + totalSize := 4 * 1024 * 1024 // 50% full + timeSinceLastSubmit := 3 * time.Second + + immediate := &ImmediateStrategy{} + size := NewSizeBasedStrategy(0.8, 1) + timeBased := NewTimeBasedStrategy(6*time.Second, 1) + adaptive := NewAdaptiveStrategy(0.8, 6*time.Second, 1) + + // Immediate should always submit if there are items + assert.True(t, immediate.ShouldSubmit(pendingCount, totalSize, maxBlobSize, timeSinceLastSubmit)) + + // Size-based should not submit at 50% when threshold is 80% + assert.False(t, size.ShouldSubmit(pendingCount, totalSize, maxBlobSize, timeSinceLastSubmit)) + + // Time-based should not submit at 3s when max delay is 6s + assert.False(t, timeBased.ShouldSubmit(pendingCount, totalSize, maxBlobSize, timeSinceLastSubmit)) + + // Adaptive should not submit (neither threshold met) + assert.False(t, adaptive.ShouldSubmit(pendingCount, totalSize, maxBlobSize, timeSinceLastSubmit)) + + // Now test with time threshold exceeded + timeSinceLastSubmit = 7 * time.Second + assert.True(t, immediate.ShouldSubmit(pendingCount, totalSize, maxBlobSize, timeSinceLastSubmit)) + assert.False(t, size.ShouldSubmit(pendingCount, totalSize, maxBlobSize, timeSinceLastSubmit)) + assert.True(t, timeBased.ShouldSubmit(pendingCount, totalSize, maxBlobSize, timeSinceLastSubmit)) + assert.True(t, adaptive.ShouldSubmit(pendingCount, totalSize, maxBlobSize, timeSinceLastSubmit)) +} diff --git a/block/internal/submitting/submitter.go b/block/internal/submitting/submitter.go index 6aeb7a2d5b..83c65aef41 100644 --- a/block/internal/submitting/submitter.go +++ b/block/internal/submitting/submitter.go @@ -55,6 +55,11 @@ type Submitter struct { headerSubmissionMtx sync.Mutex dataSubmissionMtx sync.Mutex + // Batching strategy state + lastHeaderSubmit time.Time + lastDataSubmit time.Time + batchingStrategy BatchingStrategy + // Channels for coordination errorCh chan<- error // Channel to report critical execution client failures @@ -81,6 +86,19 @@ func NewSubmitter( logger zerolog.Logger, errorCh chan<- error, ) *Submitter { + submitterLogger := logger.With().Str("component", "submitter").Logger() + + // Initialize batching strategy + strategy, err := BatchingStrategyFactory(config.DA) + if err != nil { + submitterLogger.Warn().Err(err).Msg("failed to create batching strategy, using time-based default") + strategy = NewTimeBasedStrategy(config.DA.BlockTime.Duration, 1) + } + + submitterLogger.Info(). + Str("batching_strategy", strategy.Name()). + Msg("initialized DA submission batching strategy") + return &Submitter{ store: store, exec: exec, @@ -92,8 +110,11 @@ func NewSubmitter( sequencer: sequencer, signer: signer, daIncludedHeight: &atomic.Uint64{}, + lastHeaderSubmit: time.Now(), + lastDataSubmit: time.Now(), + batchingStrategy: strategy, errorCh: errorCh, - logger: logger.With().Str("component", "submitter").Logger(), + logger: submitterLogger, } } @@ -140,7 +161,12 @@ func (s *Submitter) daSubmissionLoop() { s.logger.Info().Msg("starting DA submission loop") defer s.logger.Info().Msg("DA submission loop stopped") - ticker := time.NewTicker(s.config.DA.BlockTime.Duration) + // Use a shorter ticker interval to check batching strategy more frequently + checkInterval := s.config.DA.BlockTime.Duration / 4 + if checkInterval < 100*time.Millisecond { + checkInterval = 100 * time.Millisecond + } + ticker := time.NewTicker(checkInterval) defer ticker.Stop() for { @@ -148,49 +174,119 @@ func (s *Submitter) daSubmissionLoop() { case <-s.ctx.Done(): return case <-ticker.C: - // Submit headers - if headersNb := s.cache.NumPendingHeaders(); headersNb != 0 { - s.logger.Debug().Time("t", time.Now()).Uint64("headers", headersNb).Msg("Submitting headers") + // Check if we should submit headers based on batching strategy + headersNb := s.cache.NumPendingHeaders() + if headersNb > 0 { + timeSinceLastSubmit := time.Since(s.lastHeaderSubmit) + + // For strategy decision, we need to estimate the size + // We'll fetch headers to check, but only submit if strategy approves if s.headerSubmissionMtx.TryLock() { - s.logger.Debug().Time("t", time.Now()).Uint64("headers", headersNb).Msg("Header submission in progress") go func() { - defer func() { - s.logger.Debug().Time("t", time.Now()).Uint64("headers", headersNb).Msg("Header submission completed") - s.headerSubmissionMtx.Unlock() - }() - if err := s.daSubmitter.SubmitHeaders(s.ctx, s.cache, s.signer); err != nil { - // Check for unrecoverable errors that indicate a critical issue - if errors.Is(err, common.ErrOversizedItem) { - s.logger.Error().Err(err). - Msg("CRITICAL: Header exceeds DA blob size limit - halting to prevent live lock") - s.sendCriticalError(fmt.Errorf("unrecoverable DA submission error: %w", err)) - return + defer s.headerSubmissionMtx.Unlock() + + // Get pending headers to estimate size + headers, err := s.cache.GetPendingHeaders(s.ctx) + if err != nil { + s.logger.Error().Err(err).Msg("failed to get pending headers for batching decision") + return + } + + // Estimate total size + totalSize := 0 + for _, h := range headers { + data, err := h.MarshalBinary() + if err == nil { + totalSize += len(data) + } + } + + shouldSubmit := s.batchingStrategy.ShouldSubmit( + uint64(len(headers)), + totalSize, + common.DefaultMaxBlobSize, + timeSinceLastSubmit, + ) + + if shouldSubmit { + s.logger.Debug(). + Time("t", time.Now()). + Uint64("headers", headersNb). + Int("total_size_kb", totalSize/1024). + Dur("time_since_last", timeSinceLastSubmit). + Str("strategy", s.batchingStrategy.Name()). + Msg("batching strategy triggered header submission") + + if err := s.daSubmitter.SubmitHeaders(s.ctx, s.cache, s.signer); err != nil { + // Check for unrecoverable errors that indicate a critical issue + if errors.Is(err, common.ErrOversizedItem) { + s.logger.Error().Err(err). + Msg("CRITICAL: Header exceeds DA blob size limit - halting to prevent live lock") + s.sendCriticalError(fmt.Errorf("unrecoverable DA submission error: %w", err)) + return + } + s.logger.Error().Err(err).Msg("failed to submit headers") + } else { + s.lastHeaderSubmit = time.Now() } - s.logger.Error().Err(err).Msg("failed to submit headers") } }() } } - // Submit data - if dataNb := s.cache.NumPendingData(); dataNb != 0 { - s.logger.Debug().Time("t", time.Now()).Uint64("data", dataNb).Msg("Submitting data") + // Check if we should submit data based on batching strategy + dataNb := s.cache.NumPendingData() + if dataNb > 0 { + timeSinceLastSubmit := time.Since(s.lastDataSubmit) + if s.dataSubmissionMtx.TryLock() { - s.logger.Debug().Time("t", time.Now()).Uint64("data", dataNb).Msg("Data submission in progress") go func() { - defer func() { - s.logger.Debug().Time("t", time.Now()).Uint64("data", dataNb).Msg("Data submission completed") - s.dataSubmissionMtx.Unlock() - }() - if err := s.daSubmitter.SubmitData(s.ctx, s.cache, s.signer, s.genesis); err != nil { - // Check for unrecoverable errors that indicate a critical issue - if errors.Is(err, common.ErrOversizedItem) { - s.logger.Error().Err(err). - Msg("CRITICAL: Data exceeds DA blob size limit - halting to prevent live lock") - s.sendCriticalError(fmt.Errorf("unrecoverable DA submission error: %w", err)) - return + defer s.dataSubmissionMtx.Unlock() + + // Get pending data to estimate size + dataList, err := s.cache.GetPendingData(s.ctx) + if err != nil { + s.logger.Error().Err(err).Msg("failed to get pending data for batching decision") + return + } + + // Estimate total size + totalSize := 0 + for _, d := range dataList { + data, err := d.MarshalBinary() + if err == nil { + totalSize += len(data) + } + } + + shouldSubmit := s.batchingStrategy.ShouldSubmit( + uint64(len(dataList)), + totalSize, + common.DefaultMaxBlobSize, + timeSinceLastSubmit, + ) + + if shouldSubmit { + s.logger.Debug(). + Time("t", time.Now()). + Uint64("data", dataNb). + Int("total_size_kb", totalSize/1024). + Dur("time_since_last", timeSinceLastSubmit). + Str("strategy", s.batchingStrategy.Name()). + Msg("batching strategy triggered data submission") + + if err := s.daSubmitter.SubmitData(s.ctx, s.cache, s.signer, s.genesis); err != nil { + // Check for unrecoverable errors that indicate a critical issue + if errors.Is(err, common.ErrOversizedItem) { + s.logger.Error().Err(err). + Msg("CRITICAL: Data exceeds DA blob size limit - halting to prevent live lock") + s.sendCriticalError(fmt.Errorf("unrecoverable DA submission error: %w", err)) + return + } + s.logger.Error().Err(err).Msg("failed to submit data") + } else { + s.lastDataSubmit = time.Now() } - s.logger.Error().Err(err).Msg("failed to submit data") } }() } diff --git a/block/internal/submitting/submitter_test.go b/block/internal/submitting/submitter_test.go index f07cc64126..9a9e08a7f4 100644 --- a/block/internal/submitting/submitter_test.go +++ b/block/internal/submitting/submitter_test.go @@ -338,6 +338,8 @@ func TestSubmitter_daSubmissionLoop(t *testing.T) { // Set a small block time so the ticker fires quickly cfg := config.DefaultConfig() cfg.DA.BlockTime.Duration = 5 * time.Millisecond + // Use immediate batching strategy so submissions happen right away + cfg.DA.BatchingStrategy = "immediate" metrics := common.NopMetrics() // Prepare fake DA submitter capturing calls @@ -350,6 +352,10 @@ func TestSubmitter_daSubmissionLoop(t *testing.T) { exec := testmocks.NewMockExecutor(t) // Provide a minimal signer implementation + // Initialize batching strategy (immediate for this test) + batchingStrategy, err := BatchingStrategyFactory(cfg.DA) + require.NoError(t, err) + s := &Submitter{ store: st, exec: exec, @@ -360,12 +366,21 @@ func TestSubmitter_daSubmissionLoop(t *testing.T) { daSubmitter: fakeDA, signer: &fakeSigner{}, daIncludedHeight: &atomic.Uint64{}, + lastHeaderSubmit: time.Now().Add(-time.Hour), // Set far in past so strategy allows submission + lastDataSubmit: time.Now().Add(-time.Hour), + batchingStrategy: batchingStrategy, logger: zerolog.Nop(), } // Make there be pending headers and data by setting store height > last submitted + h1, d1 := newHeaderAndData("test-chain", 1, true) + h2, d2 := newHeaderAndData("test-chain", 2, true) + + // Store the blocks batch, err := st.NewBatch(ctx) require.NoError(t, err) + require.NoError(t, batch.SaveBlockData(h1, d1, &types.Signature{})) + require.NoError(t, batch.SaveBlockData(h2, d2, &types.Signature{})) require.NoError(t, batch.SetHeight(2)) require.NoError(t, batch.Commit()) diff --git a/block/internal/syncing/syncer_forced_inclusion_test.go b/block/internal/syncing/syncer_forced_inclusion_test.go index 61c556e837..3180413a09 100644 --- a/block/internal/syncing/syncer_forced_inclusion_test.go +++ b/block/internal/syncing/syncer_forced_inclusion_test.go @@ -37,8 +37,8 @@ func TestCalculateBlockFullness_HalfFull(t *testing.T) { } fullness := s.calculateBlockFullness(data) - // Size fullness: 500000/2097152 ≈ 0.238 - require.InDelta(t, 0.238, fullness, 0.05) + // Size fullness: 500000/8388608 ≈ 0.0596 + require.InDelta(t, 0.0596, fullness, 0.05) } func TestCalculateBlockFullness_Full(t *testing.T) { @@ -55,8 +55,8 @@ func TestCalculateBlockFullness_Full(t *testing.T) { } fullness := s.calculateBlockFullness(data) - // Both metrics at or near 1.0 - require.Greater(t, fullness, 0.95) + // Size fullness: 2100000/8388608 ≈ 0.25 + require.InDelta(t, 0.25, fullness, 0.05) } func TestCalculateBlockFullness_VerySmall(t *testing.T) { diff --git a/pkg/config/config.go b/pkg/config/config.go index 0710997e59..6f8193f1eb 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -78,6 +78,14 @@ const ( FlagDAMaxSubmitAttempts = FlagPrefixEvnode + "da.max_submit_attempts" // FlagDARequestTimeout controls the per-request timeout when talking to the DA layer FlagDARequestTimeout = FlagPrefixEvnode + "da.request_timeout" + // FlagDABatchingStrategy is a flag for specifying the batching strategy + FlagDABatchingStrategy = FlagPrefixEvnode + "da.batching_strategy" + // FlagDABatchSizeThreshold is a flag for specifying the batch size threshold + FlagDABatchSizeThreshold = FlagPrefixEvnode + "da.batch_size_threshold" + // FlagDABatchMaxDelay is a flag for specifying the maximum batch delay + FlagDABatchMaxDelay = FlagPrefixEvnode + "da.batch_max_delay" + // FlagDABatchMinItems is a flag for specifying the minimum batch items + FlagDABatchMinItems = FlagPrefixEvnode + "da.batch_min_items" // P2P configuration flags @@ -182,7 +190,13 @@ type DAConfig struct { BlockTime DurationWrapper `mapstructure:"block_time" yaml:"block_time" comment:"Average block time of the DA chain (duration). Determines frequency of DA layer syncing, maximum backoff time for retries, and is multiplied by MempoolTTL to calculate transaction expiration. Examples: \"15s\", \"30s\", \"1m\", \"2m30s\", \"10m\"."` MempoolTTL uint64 `mapstructure:"mempool_ttl" yaml:"mempool_ttl" comment:"Number of DA blocks after which a transaction is considered expired and dropped from the mempool. Controls retry backoff timing."` MaxSubmitAttempts int `mapstructure:"max_submit_attempts" yaml:"max_submit_attempts" comment:"Maximum number of attempts to submit data to the DA layer before giving up. Higher values provide more resilience but can delay error reporting."` - RequestTimeout DurationWrapper `mapstructure:"request_timeout" yaml:"request_timeout" comment:"Per-request timeout applied to DA interactions. Larger values tolerate slower DA nodes at the cost of waiting longer before failing."` + RequestTimeout DurationWrapper `mapstructure:"request_timeout" yaml:"request_timeout" comment:"Timeout for requests to DA layer"` + + // Batching strategy configuration + BatchingStrategy string `mapstructure:"batching_strategy" yaml:"batching_strategy" comment:"Batching strategy for DA submissions. Options: 'immediate' (submit as soon as items are available), 'size' (wait until batch reaches size threshold), 'time' (wait for time interval), 'adaptive' (balance between size and time). Default: 'time'."` + BatchSizeThreshold float64 `mapstructure:"batch_size_threshold" yaml:"batch_size_threshold" comment:"Minimum blob size threshold (as fraction of max blob size, 0.0-1.0) before submitting. Only applies to 'size' and 'adaptive' strategies. Example: 0.8 means wait until batch is 80% full. Default: 0.8."` + BatchMaxDelay DurationWrapper `mapstructure:"batch_max_delay" yaml:"batch_max_delay" comment:"Maximum time to wait before submitting a batch regardless of size. Applies to 'time' and 'adaptive' strategies. Lower values reduce latency but may increase costs. Examples: \"6s\", \"12s\", \"30s\". Default: DA BlockTime."` + BatchMinItems uint64 `mapstructure:"batch_min_items" yaml:"batch_min_items" comment:"Minimum number of items (headers or data) to accumulate before considering submission. Helps avoid submitting single items when more are expected soon. Default: 1."` } // GetNamespace returns the namespace for header submissions. @@ -365,6 +379,10 @@ func AddFlags(cmd *cobra.Command) { cmd.Flags().Uint64(FlagDAMempoolTTL, def.DA.MempoolTTL, "number of DA blocks until transaction is dropped from the mempool") cmd.Flags().Int(FlagDAMaxSubmitAttempts, def.DA.MaxSubmitAttempts, "maximum number of attempts to submit data to the DA layer before giving up") cmd.Flags().Duration(FlagDARequestTimeout, def.DA.RequestTimeout.Duration, "per-request timeout when interacting with the DA layer") + cmd.Flags().String(FlagDABatchingStrategy, def.DA.BatchingStrategy, "batching strategy for DA submissions (immediate, size, time, adaptive)") + cmd.Flags().Float64(FlagDABatchSizeThreshold, def.DA.BatchSizeThreshold, "batch size threshold as fraction of max blob size (0.0-1.0)") + cmd.Flags().Duration(FlagDABatchMaxDelay, def.DA.BatchMaxDelay.Duration, "maximum time to wait before submitting a batch") + cmd.Flags().Uint64(FlagDABatchMinItems, def.DA.BatchMinItems, "minimum number of items to accumulate before submission") // P2P configuration flags cmd.Flags().String(FlagP2PListenAddress, def.P2P.ListenAddress, "P2P listen address (host:port)") diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 57506c0cf7..74cb6b2dce 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -112,7 +112,7 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagRPCEnableDAVisualization, DefaultConfig().RPC.EnableDAVisualization) // Count the number of flags we're explicitly checking - expectedFlagCount := 49 // Update this number if you add more flag checks above + expectedFlagCount := 53 // Update this number if you add more flag checks above (added 4 batching flags) // Get the actual number of flags (both regular and persistent) actualFlagCount := 0 diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go index 062c9fe19c..c6691fb482 100644 --- a/pkg/config/defaults.go +++ b/pkg/config/defaults.go @@ -78,6 +78,10 @@ func DefaultConfig() Config { Namespace: randString(10), DataNamespace: "", ForcedInclusionNamespace: "", + BatchingStrategy: "time", + BatchSizeThreshold: 0.8, + BatchMaxDelay: DurationWrapper{6 * time.Second}, + BatchMinItems: 1, }, Instrumentation: DefaultInstrumentationConfig(), Log: LogConfig{ diff --git a/test/testda/dummy.go b/test/testda/dummy.go index 633bf1cf90..49469a350d 100644 --- a/test/testda/dummy.go +++ b/test/testda/dummy.go @@ -11,8 +11,8 @@ import ( ) const ( - // DefaultMaxBlobSize is the default maximum blob size (2MB). - DefaultMaxBlobSize = 2 * 1024 * 1024 + // DefaultMaxBlobSize is the default maximum blob size (8MB). + DefaultMaxBlobSize = 8 * 1024 * 1024 ) // Header contains DA layer header information for a given height. diff --git a/tools/local-da/local.go b/tools/local-da/local.go index 86907fc4dd..9f06471dac 100644 --- a/tools/local-da/local.go +++ b/tools/local-da/local.go @@ -19,7 +19,7 @@ import ( ) // DefaultMaxBlobSize is the default max blob size -const DefaultMaxBlobSize uint64 = 2 * 1024 * 1024 // 2MB +const DefaultMaxBlobSize uint64 = 8 * 1024 * 1024 // 8MB // LocalDA is a simple implementation of in-memory DA. Not production ready! Intended only for testing! // From 1286154d446b3e74a655d9a34d9014595a053242 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 15 Jan 2026 10:34:04 +0100 Subject: [PATCH 02/15] updates --- block/internal/common/consts.go | 2 +- .../internal/submitting/batching_strategy.go | 43 ++----------------- .../submitting/batching_strategy_test.go | 34 +-------------- block/internal/submitting/submitter.go | 14 ++---- block/internal/submitting/submitter_test.go | 4 +- pkg/config/config_test.go | 2 +- 6 files changed, 11 insertions(+), 88 deletions(-) diff --git a/block/internal/common/consts.go b/block/internal/common/consts.go index bb584e7aef..76aed4249e 100644 --- a/block/internal/common/consts.go +++ b/block/internal/common/consts.go @@ -1,3 +1,3 @@ package common -const DefaultMaxBlobSize = 8 * 1024 * 1024 // 8MB fallback blob size limit (Celestia's current limit) +const DefaultMaxBlobSize = 8 * 1024 * 1024 // 8MB fallback blob size limit diff --git a/block/internal/submitting/batching_strategy.go b/block/internal/submitting/batching_strategy.go index c461cd6389..43106dd437 100644 --- a/block/internal/submitting/batching_strategy.go +++ b/block/internal/submitting/batching_strategy.go @@ -4,7 +4,6 @@ import ( "fmt" "time" - "github.com/evstack/ev-node/block/internal/common" "github.com/evstack/ev-node/pkg/config" ) @@ -13,9 +12,6 @@ type BatchingStrategy interface { // ShouldSubmit determines if a batch should be submitted based on the strategy // Returns true if submission should happen now ShouldSubmit(pendingCount uint64, totalSize int, maxBlobSize int, timeSinceLastSubmit time.Duration) bool - - // Name returns the name of the strategy - Name() string } // ImmediateStrategy submits as soon as any items are available @@ -25,10 +21,6 @@ func (s *ImmediateStrategy) ShouldSubmit(pendingCount uint64, totalSize int, max return pendingCount > 0 } -func (s *ImmediateStrategy) Name() string { - return "immediate" -} - // SizeBasedStrategy waits until the batch reaches a certain size threshold type SizeBasedStrategy struct { sizeThreshold float64 // fraction of max blob size (0.0 to 1.0) @@ -57,10 +49,6 @@ func (s *SizeBasedStrategy) ShouldSubmit(pendingCount uint64, totalSize int, max return totalSize >= threshold } -func (s *SizeBasedStrategy) Name() string { - return "size" -} - // TimeBasedStrategy submits after a certain time interval type TimeBasedStrategy struct { maxDelay time.Duration @@ -88,10 +76,6 @@ func (s *TimeBasedStrategy) ShouldSubmit(pendingCount uint64, totalSize int, max return timeSinceLastSubmit >= s.maxDelay } -func (s *TimeBasedStrategy) Name() string { - return "time" -} - // AdaptiveStrategy balances between size and time constraints // It submits when either: // - The batch reaches the size threshold, OR @@ -138,12 +122,8 @@ func (s *AdaptiveStrategy) ShouldSubmit(pendingCount uint64, totalSize int, maxB return false } -func (s *AdaptiveStrategy) Name() string { - return "adaptive" -} - -// BatchingStrategyFactory creates a batching strategy based on configuration -func BatchingStrategyFactory(cfg config.DAConfig) (BatchingStrategy, error) { +// NewBatchingStrategy creates a batching strategy based on configuration +func NewBatchingStrategy(cfg config.DAConfig) (BatchingStrategy, error) { switch cfg.BatchingStrategy { case "immediate": return &ImmediateStrategy{}, nil @@ -256,11 +236,8 @@ func ShouldWaitForMoreItems( // Use epsilon for floating point comparison const epsilon = 0.001 currentUtilization := float64(currentSize) / float64(maxBlobSize) - if currentUtilization < minUtilization-epsilon { - return true - } - return false + return currentUtilization < minUtilization-epsilon } // BatchingConfig holds configuration for batch optimization @@ -269,17 +246,3 @@ type BatchingConfig struct { Strategy BatchingStrategy TargetUtilization float64 } - -// NewBatchingConfig creates a new batching configuration -func NewBatchingConfig(cfg config.DAConfig) (*BatchingConfig, error) { - strategy, err := BatchingStrategyFactory(cfg) - if err != nil { - return nil, err - } - - return &BatchingConfig{ - MaxBlobSize: common.DefaultMaxBlobSize, - Strategy: strategy, - TargetUtilization: cfg.BatchSizeThreshold, - }, nil -} diff --git a/block/internal/submitting/batching_strategy_test.go b/block/internal/submitting/batching_strategy_test.go index 71fcce60b1..8dd7abd291 100644 --- a/block/internal/submitting/batching_strategy_test.go +++ b/block/internal/submitting/batching_strategy_test.go @@ -45,8 +45,6 @@ func TestImmediateStrategy(t *testing.T) { assert.Equal(t, tt.expected, result) }) } - - assert.Equal(t, "immediate", strategy.Name()) } func TestSizeBasedStrategy(t *testing.T) { @@ -116,8 +114,6 @@ func TestSizeBasedStrategy(t *testing.T) { strategy = NewSizeBasedStrategy(0, 1) assert.Equal(t, 0.8, strategy.sizeThreshold) - - assert.Equal(t, "size", strategy.Name()) } func TestTimeBasedStrategy(t *testing.T) { @@ -173,8 +169,6 @@ func TestTimeBasedStrategy(t *testing.T) { assert.Equal(t, tt.expectedSubmit, result) }) } - - assert.Equal(t, "time", NewTimeBasedStrategy(maxDelay, 1).Name()) } func TestAdaptiveStrategy(t *testing.T) { @@ -251,11 +245,9 @@ func TestAdaptiveStrategy(t *testing.T) { assert.Equal(t, 0.8, strategy.sizeThreshold) assert.Equal(t, 6*time.Second, strategy.maxDelay) assert.Equal(t, uint64(1), strategy.minItems) - - assert.Equal(t, "adaptive", strategy.Name()) } -func TestBatchingStrategyFactory(t *testing.T) { +func TestNewBatchingStrategy(t *testing.T) { tests := []struct { name string strategyName string @@ -265,25 +257,21 @@ func TestBatchingStrategyFactory(t *testing.T) { { name: "immediate strategy", strategyName: "immediate", - expectedType: "immediate", expectError: false, }, { name: "size strategy", strategyName: "size", - expectedType: "size", expectError: false, }, { name: "time strategy", strategyName: "time", - expectedType: "time", expectError: false, }, { name: "adaptive strategy", strategyName: "adaptive", - expectedType: "adaptive", expectError: false, }, { @@ -302,7 +290,7 @@ func TestBatchingStrategyFactory(t *testing.T) { BatchMinItems: 1, } - strategy, err := BatchingStrategyFactory(cfg) + strategy, err := NewBatchingStrategy(cfg) if tt.expectError { assert.Error(t, err) @@ -310,7 +298,6 @@ func TestBatchingStrategyFactory(t *testing.T) { } else { require.NoError(t, err) require.NotNil(t, strategy) - assert.Equal(t, tt.expectedType, strategy.Name()) } }) } @@ -543,23 +530,6 @@ func TestShouldWaitForMoreItems(t *testing.T) { } } -func TestNewBatchingConfig(t *testing.T) { - cfg := config.DAConfig{ - BatchingStrategy: "adaptive", - BatchSizeThreshold: 0.85, - BatchMaxDelay: config.DurationWrapper{Duration: 12 * time.Second}, - BatchMinItems: 5, - } - - batchConfig, err := NewBatchingConfig(cfg) - require.NoError(t, err) - require.NotNil(t, batchConfig) - - assert.Equal(t, common.DefaultMaxBlobSize, batchConfig.MaxBlobSize) - assert.Equal(t, "adaptive", batchConfig.Strategy.Name()) - assert.Equal(t, 0.85, batchConfig.TargetUtilization) -} - func TestBatchingStrategiesComparison(t *testing.T) { // This test demonstrates how different strategies behave with the same input maxBlobSize := 8 * 1024 * 1024 diff --git a/block/internal/submitting/submitter.go b/block/internal/submitting/submitter.go index 83c65aef41..04c5c33989 100644 --- a/block/internal/submitting/submitter.go +++ b/block/internal/submitting/submitter.go @@ -89,16 +89,12 @@ func NewSubmitter( submitterLogger := logger.With().Str("component", "submitter").Logger() // Initialize batching strategy - strategy, err := BatchingStrategyFactory(config.DA) + strategy, err := NewBatchingStrategy(config.DA) if err != nil { submitterLogger.Warn().Err(err).Msg("failed to create batching strategy, using time-based default") strategy = NewTimeBasedStrategy(config.DA.BlockTime.Duration, 1) } - submitterLogger.Info(). - Str("batching_strategy", strategy.Name()). - Msg("initialized DA submission batching strategy") - return &Submitter{ store: store, exec: exec, @@ -162,10 +158,8 @@ func (s *Submitter) daSubmissionLoop() { defer s.logger.Info().Msg("DA submission loop stopped") // Use a shorter ticker interval to check batching strategy more frequently - checkInterval := s.config.DA.BlockTime.Duration / 4 - if checkInterval < 100*time.Millisecond { - checkInterval = 100 * time.Millisecond - } + checkInterval := min(s.config.DA.BlockTime.Duration/4, 100*time.Millisecond) + ticker := time.NewTicker(checkInterval) defer ticker.Stop() @@ -214,7 +208,6 @@ func (s *Submitter) daSubmissionLoop() { Uint64("headers", headersNb). Int("total_size_kb", totalSize/1024). Dur("time_since_last", timeSinceLastSubmit). - Str("strategy", s.batchingStrategy.Name()). Msg("batching strategy triggered header submission") if err := s.daSubmitter.SubmitHeaders(s.ctx, s.cache, s.signer); err != nil { @@ -272,7 +265,6 @@ func (s *Submitter) daSubmissionLoop() { Uint64("data", dataNb). Int("total_size_kb", totalSize/1024). Dur("time_since_last", timeSinceLastSubmit). - Str("strategy", s.batchingStrategy.Name()). Msg("batching strategy triggered data submission") if err := s.daSubmitter.SubmitData(s.ctx, s.cache, s.signer, s.genesis); err != nil { diff --git a/block/internal/submitting/submitter_test.go b/block/internal/submitting/submitter_test.go index 9a9e08a7f4..63debf89db 100644 --- a/block/internal/submitting/submitter_test.go +++ b/block/internal/submitting/submitter_test.go @@ -351,9 +351,7 @@ func TestSubmitter_daSubmissionLoop(t *testing.T) { // Provide a non-nil executor; it won't be used because DA inclusion won't advance exec := testmocks.NewMockExecutor(t) - // Provide a minimal signer implementation - // Initialize batching strategy (immediate for this test) - batchingStrategy, err := BatchingStrategyFactory(cfg.DA) + batchingStrategy, err := NewBatchingStrategy(cfg.DA) require.NoError(t, err) s := &Submitter{ diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 74cb6b2dce..1e8a4b4223 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -112,7 +112,7 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagRPCEnableDAVisualization, DefaultConfig().RPC.EnableDAVisualization) // Count the number of flags we're explicitly checking - expectedFlagCount := 53 // Update this number if you add more flag checks above (added 4 batching flags) + expectedFlagCount := 53 // Update this number if you add more flag checks above // Get the actual number of flags (both regular and persistent) actualFlagCount := 0 From b28c122d3b48c31de70f929d5bb8f82d344edf81 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 15 Jan 2026 10:39:36 +0100 Subject: [PATCH 03/15] don't use hardcoded da time --- block/internal/submitting/batching_strategy.go | 12 ++++++------ block/internal/submitting/batching_strategy_test.go | 10 +++++----- block/internal/submitting/submitter.go | 2 +- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/block/internal/submitting/batching_strategy.go b/block/internal/submitting/batching_strategy.go index 43106dd437..23181ffdc2 100644 --- a/block/internal/submitting/batching_strategy.go +++ b/block/internal/submitting/batching_strategy.go @@ -55,9 +55,9 @@ type TimeBasedStrategy struct { minItems uint64 } -func NewTimeBasedStrategy(maxDelay time.Duration, minItems uint64) *TimeBasedStrategy { +func NewTimeBasedStrategy(daBlockTime time.Duration, maxDelay time.Duration, minItems uint64) *TimeBasedStrategy { if maxDelay == 0 { - maxDelay = 6 * time.Second // default to DA block time + maxDelay = daBlockTime } if minItems == 0 { minItems = 1 @@ -86,12 +86,12 @@ type AdaptiveStrategy struct { minItems uint64 } -func NewAdaptiveStrategy(sizeThreshold float64, maxDelay time.Duration, minItems uint64) *AdaptiveStrategy { +func NewAdaptiveStrategy(daBlockTime time.Duration, sizeThreshold float64, maxDelay time.Duration, minItems uint64) *AdaptiveStrategy { if sizeThreshold <= 0 || sizeThreshold > 1.0 { sizeThreshold = 0.8 // default to 80% } if maxDelay == 0 { - maxDelay = 6 * time.Second // default to DA block time + maxDelay = daBlockTime } if minItems == 0 { minItems = 1 @@ -130,9 +130,9 @@ func NewBatchingStrategy(cfg config.DAConfig) (BatchingStrategy, error) { case "size": return NewSizeBasedStrategy(cfg.BatchSizeThreshold, cfg.BatchMinItems), nil case "time": - return NewTimeBasedStrategy(cfg.BatchMaxDelay.Duration, cfg.BatchMinItems), nil + return NewTimeBasedStrategy(cfg.BlockTime.Duration, cfg.BatchMaxDelay.Duration, cfg.BatchMinItems), nil case "adaptive": - return NewAdaptiveStrategy(cfg.BatchSizeThreshold, cfg.BatchMaxDelay.Duration, cfg.BatchMinItems), nil + return NewAdaptiveStrategy(cfg.BlockTime.Duration, cfg.BatchSizeThreshold, cfg.BatchMaxDelay.Duration, cfg.BatchMinItems), nil default: return nil, fmt.Errorf("unknown batching strategy: %s", cfg.BatchingStrategy) } diff --git a/block/internal/submitting/batching_strategy_test.go b/block/internal/submitting/batching_strategy_test.go index 8dd7abd291..51d9aebd35 100644 --- a/block/internal/submitting/batching_strategy_test.go +++ b/block/internal/submitting/batching_strategy_test.go @@ -164,7 +164,7 @@ func TestTimeBasedStrategy(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - strategy := NewTimeBasedStrategy(maxDelay, tt.minItems) + strategy := NewTimeBasedStrategy(6*time.Second, maxDelay, tt.minItems) result := strategy.ShouldSubmit(tt.pendingCount, tt.totalSize, maxBlobSize, tt.timeSinceLastSubmit) assert.Equal(t, tt.expectedSubmit, result) }) @@ -234,14 +234,14 @@ func TestAdaptiveStrategy(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - strategy := NewAdaptiveStrategy(sizeThreshold, maxDelay, tt.minItems) + strategy := NewAdaptiveStrategy(6*time.Second, sizeThreshold, maxDelay, tt.minItems) result := strategy.ShouldSubmit(tt.pendingCount, tt.totalSize, maxBlobSize, tt.timeSinceLastSubmit) assert.Equal(t, tt.expectedSubmit, result, "reason: %s", tt.reason) }) } // Test defaults - strategy := NewAdaptiveStrategy(0, 0, 0) + strategy := NewAdaptiveStrategy(6*time.Second, 0, 0, 0) assert.Equal(t, 0.8, strategy.sizeThreshold) assert.Equal(t, 6*time.Second, strategy.maxDelay) assert.Equal(t, uint64(1), strategy.minItems) @@ -539,8 +539,8 @@ func TestBatchingStrategiesComparison(t *testing.T) { immediate := &ImmediateStrategy{} size := NewSizeBasedStrategy(0.8, 1) - timeBased := NewTimeBasedStrategy(6*time.Second, 1) - adaptive := NewAdaptiveStrategy(0.8, 6*time.Second, 1) + timeBased := NewTimeBasedStrategy(6*time.Second, 6*time.Second, 1) + adaptive := NewAdaptiveStrategy(6*time.Second, 0.8, 6*time.Second, 1) // Immediate should always submit if there are items assert.True(t, immediate.ShouldSubmit(pendingCount, totalSize, maxBlobSize, timeSinceLastSubmit)) diff --git a/block/internal/submitting/submitter.go b/block/internal/submitting/submitter.go index 04c5c33989..f13668712e 100644 --- a/block/internal/submitting/submitter.go +++ b/block/internal/submitting/submitter.go @@ -92,7 +92,7 @@ func NewSubmitter( strategy, err := NewBatchingStrategy(config.DA) if err != nil { submitterLogger.Warn().Err(err).Msg("failed to create batching strategy, using time-based default") - strategy = NewTimeBasedStrategy(config.DA.BlockTime.Duration, 1) + strategy = NewTimeBasedStrategy(config.DA.BlockTime.Duration, 0, 1) } return &Submitter{ From a9de50d7e245e81fc9e91f882a3ec8aae565c09a Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 15 Jan 2026 10:49:52 +0100 Subject: [PATCH 04/15] simplify --- .../internal/submitting/batching_strategy.go | 72 +++++-------------- .../submitting/batching_strategy_test.go | 57 --------------- block/internal/submitting/submitter.go | 1 - 3 files changed, 16 insertions(+), 114 deletions(-) diff --git a/block/internal/submitting/batching_strategy.go b/block/internal/submitting/batching_strategy.go index 23181ffdc2..076a5485ba 100644 --- a/block/internal/submitting/batching_strategy.go +++ b/block/internal/submitting/batching_strategy.go @@ -14,6 +14,22 @@ type BatchingStrategy interface { ShouldSubmit(pendingCount uint64, totalSize int, maxBlobSize int, timeSinceLastSubmit time.Duration) bool } +// NewBatchingStrategy creates a batching strategy based on configuration +func NewBatchingStrategy(cfg config.DAConfig) (BatchingStrategy, error) { + switch cfg.BatchingStrategy { + case "immediate": + return &ImmediateStrategy{}, nil + case "size": + return NewSizeBasedStrategy(cfg.BatchSizeThreshold, cfg.BatchMinItems), nil + case "time": + return NewTimeBasedStrategy(cfg.BlockTime.Duration, cfg.BatchMaxDelay.Duration, cfg.BatchMinItems), nil + case "adaptive": + return NewAdaptiveStrategy(cfg.BlockTime.Duration, cfg.BatchSizeThreshold, cfg.BatchMaxDelay.Duration, cfg.BatchMinItems), nil + default: + return nil, fmt.Errorf("unknown batching strategy: %s", cfg.BatchingStrategy) + } +} + // ImmediateStrategy submits as soon as any items are available type ImmediateStrategy struct{} @@ -122,22 +138,6 @@ func (s *AdaptiveStrategy) ShouldSubmit(pendingCount uint64, totalSize int, maxB return false } -// NewBatchingStrategy creates a batching strategy based on configuration -func NewBatchingStrategy(cfg config.DAConfig) (BatchingStrategy, error) { - switch cfg.BatchingStrategy { - case "immediate": - return &ImmediateStrategy{}, nil - case "size": - return NewSizeBasedStrategy(cfg.BatchSizeThreshold, cfg.BatchMinItems), nil - case "time": - return NewTimeBasedStrategy(cfg.BlockTime.Duration, cfg.BatchMaxDelay.Duration, cfg.BatchMinItems), nil - case "adaptive": - return NewAdaptiveStrategy(cfg.BlockTime.Duration, cfg.BatchSizeThreshold, cfg.BatchMaxDelay.Duration, cfg.BatchMinItems), nil - default: - return nil, fmt.Errorf("unknown batching strategy: %s", cfg.BatchingStrategy) - } -} - // estimateBatchSize estimates the total size of pending items // This is a helper function that can be used by the submitter func estimateBatchSize(marshaled [][]byte) int { @@ -180,39 +180,6 @@ func optimizeBatchSize(marshaled [][]byte, maxBlobSize int, targetUtilization fl return count } -// BatchMetrics provides information about batch efficiency -type BatchMetrics struct { - ItemCount int - TotalBytes int - MaxBlobBytes int - Utilization float64 // percentage of max blob size used - EstimatedCost float64 // estimated cost relative to single full blob -} - -// calculateBatchMetrics computes metrics for a batch -func calculateBatchMetrics(itemCount int, totalBytes int, maxBlobBytes int) BatchMetrics { - utilization := 0.0 - if maxBlobBytes > 0 { - utilization = float64(totalBytes) / float64(maxBlobBytes) - } - - // Rough cost estimate: each blob submission has a fixed cost - // Higher utilization = better cost efficiency - estimatedCost := 1.0 - if utilization > 0 { - // If we're only using 50% of the blob, we're paying 2x per byte effectively - estimatedCost = 1.0 / utilization - } - - return BatchMetrics{ - ItemCount: itemCount, - TotalBytes: totalBytes, - MaxBlobBytes: maxBlobBytes, - Utilization: utilization, - EstimatedCost: estimatedCost, - } -} - // ShouldWaitForMoreItems determines if we should wait for more items // to improve batch efficiency func ShouldWaitForMoreItems( @@ -239,10 +206,3 @@ func ShouldWaitForMoreItems( return currentUtilization < minUtilization-epsilon } - -// BatchingConfig holds configuration for batch optimization -type BatchingConfig struct { - MaxBlobSize int - Strategy BatchingStrategy - TargetUtilization float64 -} diff --git a/block/internal/submitting/batching_strategy_test.go b/block/internal/submitting/batching_strategy_test.go index 51d9aebd35..cf2c699da5 100644 --- a/block/internal/submitting/batching_strategy_test.go +++ b/block/internal/submitting/batching_strategy_test.go @@ -406,63 +406,6 @@ func TestOptimizeBatchSize(t *testing.T) { } } -func TestCalculateBatchMetrics(t *testing.T) { - maxBlobSize := 8 * 1024 * 1024 - - tests := []struct { - name string - itemCount int - totalBytes int - expectedUtil float64 - expectedCostRange [2]float64 // min, max - }{ - { - name: "empty batch", - itemCount: 0, - totalBytes: 0, - expectedUtil: 0.0, - expectedCostRange: [2]float64{0, 999999}, // cost is undefined for empty - }, - { - name: "half full", - itemCount: 10, - totalBytes: 4 * 1024 * 1024, - expectedUtil: 0.5, - expectedCostRange: [2]float64{2.0, 2.0}, // 1/0.5 = 2.0x cost - }, - { - name: "80% full", - itemCount: 20, - totalBytes: int(float64(maxBlobSize) * 0.8), - expectedUtil: 0.8, - expectedCostRange: [2]float64{1.25, 1.25}, // 1/0.8 = 1.25x cost - }, - { - name: "nearly full", - itemCount: 50, - totalBytes: int(float64(maxBlobSize) * 0.95), - expectedUtil: 0.95, - expectedCostRange: [2]float64{1.05, 1.06}, // ~1.05x cost - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - metrics := calculateBatchMetrics(tt.itemCount, tt.totalBytes, maxBlobSize) - - assert.Equal(t, tt.itemCount, metrics.ItemCount) - assert.Equal(t, tt.totalBytes, metrics.TotalBytes) - assert.Equal(t, maxBlobSize, metrics.MaxBlobBytes) - assert.InDelta(t, tt.expectedUtil, metrics.Utilization, 0.01) - - if tt.totalBytes > 0 { - assert.InEpsilon(t, (tt.expectedCostRange[0]+tt.expectedCostRange[1])/2, - metrics.EstimatedCost, 0.01, "cost should be within range") - } - }) - } -} - func TestShouldWaitForMoreItems(t *testing.T) { maxBlobSize := 8 * 1024 * 1024 diff --git a/block/internal/submitting/submitter.go b/block/internal/submitting/submitter.go index f13668712e..7db8343c47 100644 --- a/block/internal/submitting/submitter.go +++ b/block/internal/submitting/submitter.go @@ -88,7 +88,6 @@ func NewSubmitter( ) *Submitter { submitterLogger := logger.With().Str("component", "submitter").Logger() - // Initialize batching strategy strategy, err := NewBatchingStrategy(config.DA) if err != nil { submitterLogger.Warn().Err(err).Msg("failed to create batching strategy, using time-based default") From 3f553fa846276a826f5162b7a12868eac1d04b2f Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 15 Jan 2026 11:00:54 +0100 Subject: [PATCH 05/15] cleanups --- .../internal/submitting/batching_strategy.go | 69 ------- .../submitting/batching_strategy_test.go | 170 ------------------ 2 files changed, 239 deletions(-) diff --git a/block/internal/submitting/batching_strategy.go b/block/internal/submitting/batching_strategy.go index 076a5485ba..58436e0597 100644 --- a/block/internal/submitting/batching_strategy.go +++ b/block/internal/submitting/batching_strategy.go @@ -137,72 +137,3 @@ func (s *AdaptiveStrategy) ShouldSubmit(pendingCount uint64, totalSize int, maxB return false } - -// estimateBatchSize estimates the total size of pending items -// This is a helper function that can be used by the submitter -func estimateBatchSize(marshaled [][]byte) int { - totalSize := 0 - for _, data := range marshaled { - totalSize += len(data) - } - return totalSize -} - -// optimizeBatchSize returns the optimal number of items to include in a batch -// to maximize blob utilization while staying under the size limit -func optimizeBatchSize(marshaled [][]byte, maxBlobSize int, targetUtilization float64) int { - if targetUtilization <= 0 || targetUtilization > 1.0 { - targetUtilization = 0.9 // default to 90% utilization - } - - targetSize := int(float64(maxBlobSize) * targetUtilization) - totalSize := 0 - count := 0 - - for i, data := range marshaled { - itemSize := len(data) - - // If adding this item would exceed max blob size, stop - if totalSize+itemSize > maxBlobSize { - break - } - - totalSize += itemSize - count = i + 1 - - // If we've reached our target utilization, we can stop - // This helps create more predictably-sized batches - if totalSize >= targetSize { - break - } - } - - return count -} - -// ShouldWaitForMoreItems determines if we should wait for more items -// to improve batch efficiency -func ShouldWaitForMoreItems( - currentCount uint64, - currentSize int, - maxBlobSize int, - minUtilization float64, - hasMoreExpected bool, -) bool { - // Don't wait if we're already at or near capacity - if currentSize >= int(float64(maxBlobSize)*0.95) { - return false - } - - // Don't wait if we don't expect more items soon - if !hasMoreExpected { - return false - } - - // Wait if current utilization is below minimum threshold - // Use epsilon for floating point comparison - const epsilon = 0.001 - currentUtilization := float64(currentSize) / float64(maxBlobSize) - - return currentUtilization < minUtilization-epsilon -} diff --git a/block/internal/submitting/batching_strategy_test.go b/block/internal/submitting/batching_strategy_test.go index cf2c699da5..6eea3971bb 100644 --- a/block/internal/submitting/batching_strategy_test.go +++ b/block/internal/submitting/batching_strategy_test.go @@ -303,176 +303,6 @@ func TestNewBatchingStrategy(t *testing.T) { } } -func TestEstimateBatchSize(t *testing.T) { - tests := []struct { - name string - marshaled [][]byte - expectedSize int - }{ - { - name: "empty batch", - marshaled: [][]byte{}, - expectedSize: 0, - }, - { - name: "single item", - marshaled: [][]byte{ - make([]byte, 1024), - }, - expectedSize: 1024, - }, - { - name: "multiple items", - marshaled: [][]byte{ - make([]byte, 1024), - make([]byte, 2048), - make([]byte, 512), - }, - expectedSize: 3584, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - size := estimateBatchSize(tt.marshaled) - assert.Equal(t, tt.expectedSize, size) - }) - } -} - -func TestOptimizeBatchSize(t *testing.T) { - maxBlobSize := 8 * 1024 * 1024 // 8MB - - tests := []struct { - name string - itemSizes []int - targetUtilization float64 - expectedCount int - expectedTotalSize int - }{ - { - name: "empty batch", - itemSizes: []int{}, - targetUtilization: 0.9, - expectedCount: 0, - expectedTotalSize: 0, - }, - { - name: "single small item", - itemSizes: []int{1024}, - targetUtilization: 0.9, - expectedCount: 1, - expectedTotalSize: 1024, - }, - { - name: "reach target utilization", - itemSizes: []int{1024 * 1024, 2 * 1024 * 1024, 3 * 1024 * 1024, 1024 * 1024}, - targetUtilization: 0.8, - expectedCount: 4, // 1+2+3+1 = 7MB (87.5% of 8MB, exceeds 80% target so stops) - expectedTotalSize: 7 * 1024 * 1024, - }, - { - name: "stop at max blob size", - itemSizes: []int{7 * 1024 * 1024, 2 * 1024 * 1024}, - targetUtilization: 0.9, - expectedCount: 1, // Second item would exceed max - expectedTotalSize: 7 * 1024 * 1024, - }, - { - name: "all items fit below target", - itemSizes: []int{1024 * 1024, 1024 * 1024, 1024 * 1024}, - targetUtilization: 0.9, - expectedCount: 3, - expectedTotalSize: 3 * 1024 * 1024, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // Create marshaled data - marshaled := make([][]byte, len(tt.itemSizes)) - for i, size := range tt.itemSizes { - marshaled[i] = make([]byte, size) - } - - count := optimizeBatchSize(marshaled, maxBlobSize, tt.targetUtilization) - assert.Equal(t, tt.expectedCount, count) - - if count > 0 { - totalSize := estimateBatchSize(marshaled[:count]) - assert.Equal(t, tt.expectedTotalSize, totalSize) - } - }) - } -} - -func TestShouldWaitForMoreItems(t *testing.T) { - maxBlobSize := 8 * 1024 * 1024 - - tests := []struct { - name string - currentCount uint64 - currentSize int - minUtilization float64 - hasMoreExpected bool - expectedWait bool - }{ - { - name: "near capacity", - currentCount: 50, - currentSize: int(float64(maxBlobSize) * 0.96), - minUtilization: 0.8, - hasMoreExpected: true, - expectedWait: false, - }, - { - name: "below threshold but no more expected", - currentCount: 10, - currentSize: 4 * 1024 * 1024, - minUtilization: 0.8, - hasMoreExpected: false, - expectedWait: false, - }, - { - name: "below threshold with more expected", - currentCount: 10, - currentSize: 4 * 1024 * 1024, // 50% - minUtilization: 0.8, - hasMoreExpected: true, - expectedWait: true, - }, - { - name: "at threshold", - currentCount: 20, - currentSize: int(float64(maxBlobSize) * 0.8), - minUtilization: 0.8, - hasMoreExpected: true, - expectedWait: false, // At threshold, no need to wait - }, - { - name: "above threshold", - currentCount: 30, - currentSize: int(float64(maxBlobSize) * 0.85), - minUtilization: 0.8, - hasMoreExpected: true, - expectedWait: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - result := ShouldWaitForMoreItems( - tt.currentCount, - tt.currentSize, - maxBlobSize, - tt.minUtilization, - tt.hasMoreExpected, - ) - assert.Equal(t, tt.expectedWait, result) - }) - } -} - func TestBatchingStrategiesComparison(t *testing.T) { // This test demonstrates how different strategies behave with the same input maxBlobSize := 8 * 1024 * 1024 From 2dad343f377b062cb8c4dd1edeb0340f5021a92a Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 15 Jan 2026 11:13:18 +0100 Subject: [PATCH 06/15] updates --- block/internal/submitting/da_submitter.go | 38 ++++++---------- .../da_submitter_integration_test.go | 9 +++- .../internal/submitting/da_submitter_test.go | 44 ++++++++++++------- block/internal/submitting/submitter.go | 20 ++++----- block/internal/submitting/submitter_test.go | 4 +- 5 files changed, 61 insertions(+), 54 deletions(-) diff --git a/block/internal/submitting/da_submitter.go b/block/internal/submitting/da_submitter.go index c479f2a2ea..b396f3e8b5 100644 --- a/block/internal/submitting/da_submitter.go +++ b/block/internal/submitting/da_submitter.go @@ -160,12 +160,7 @@ func (s *DASubmitter) recordFailure(reason common.DASubmitterFailureReason) { } // SubmitHeaders submits pending headers to DA layer -func (s *DASubmitter) SubmitHeaders(ctx context.Context, cache cache.Manager, signer signer.Signer) error { - headers, err := cache.GetPendingHeaders(ctx) - if err != nil { - return fmt.Errorf("failed to get pending headers: %w", err) - } - +func (s *DASubmitter) SubmitHeaders(ctx context.Context, headers []*types.SignedHeader, cache cache.Manager, signer signer.Signer) error { if len(headers) == 0 { return nil } @@ -211,20 +206,15 @@ func (s *DASubmitter) SubmitHeaders(ctx context.Context, cache cache.Manager, si } // SubmitData submits pending data to DA layer -func (s *DASubmitter) SubmitData(ctx context.Context, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error { - dataList, err := cache.GetPendingData(ctx) - if err != nil { - return fmt.Errorf("failed to get pending data: %w", err) - } - - if len(dataList) == 0 { +func (s *DASubmitter) SubmitData(ctx context.Context, unsignedDataList []*types.SignedData, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error { + if len(unsignedDataList) == 0 { return nil } - // Sign the data - signedDataList, err := s.createSignedData(dataList, signer, genesis) + // Sign the data (cache returns unsigned SignedData structs) + signedDataList, err := s.signData(unsignedDataList, signer, genesis) if err != nil { - return fmt.Errorf("failed to create signed data: %w", err) + return fmt.Errorf("failed to sign data: %w", err) } if len(signedDataList) == 0 { @@ -253,8 +243,8 @@ func (s *DASubmitter) SubmitData(ctx context.Context, cache cache.Manager, signe ) } -// createSignedData creates signed data from raw data -func (s *DASubmitter) createSignedData(dataList []*types.SignedData, signer signer.Signer, genesis genesis.Genesis) ([]*types.SignedData, error) { +// signData signs unsigned SignedData structs returned from cache +func (s *DASubmitter) signData(unsignedDataList []*types.SignedData, signer signer.Signer, genesis genesis.Genesis) ([]*types.SignedData, error) { if signer == nil { return nil, fmt.Errorf("signer is nil") } @@ -278,16 +268,16 @@ func (s *DASubmitter) createSignedData(dataList []*types.SignedData, signer sign Address: addr, } - signedDataList := make([]*types.SignedData, 0, len(dataList)) + signedDataList := make([]*types.SignedData, 0, len(unsignedDataList)) - for _, data := range dataList { + for _, unsignedData := range unsignedDataList { // Skip empty data - if len(data.Txs) == 0 { + if len(unsignedData.Data.Txs) == 0 { continue } // Sign the data - dataBytes, err := data.Data.MarshalBinary() + dataBytes, err := unsignedData.Data.MarshalBinary() if err != nil { return nil, fmt.Errorf("failed to marshal data: %w", err) } @@ -298,9 +288,9 @@ func (s *DASubmitter) createSignedData(dataList []*types.SignedData, signer sign } signedData := &types.SignedData{ - Data: data.Data, - Signature: signature, + Data: unsignedData.Data, Signer: signerInfo, + Signature: signature, } signedDataList = append(signedDataList, signedData) diff --git a/block/internal/submitting/da_submitter_integration_test.go b/block/internal/submitting/da_submitter_integration_test.go index ddab9a7be6..c90ab114fd 100644 --- a/block/internal/submitting/da_submitter_integration_test.go +++ b/block/internal/submitting/da_submitter_integration_test.go @@ -99,8 +99,13 @@ func TestDASubmitter_SubmitHeadersAndData_MarksInclusionAndUpdatesLastSubmitted( daSubmitter := NewDASubmitter(client, cfg, gen, common.DefaultBlockOptions(), common.NopMetrics(), zerolog.Nop()) // Submit headers and data - require.NoError(t, daSubmitter.SubmitHeaders(context.Background(), cm, n)) - require.NoError(t, daSubmitter.SubmitData(context.Background(), cm, n, gen)) + headers, err := cm.GetPendingHeaders(context.Background()) + require.NoError(t, err) + require.NoError(t, daSubmitter.SubmitHeaders(context.Background(), headers, cm, n)) + + dataList, err := cm.GetPendingData(context.Background()) + require.NoError(t, err) + require.NoError(t, daSubmitter.SubmitData(context.Background(), dataList, cm, n, gen)) // After submission, inclusion markers should be set _, ok := cm.GetHeaderDAIncluded(hdr1.Hash().String()) diff --git a/block/internal/submitting/da_submitter_test.go b/block/internal/submitting/da_submitter_test.go index 7b0d182aa1..f19c825855 100644 --- a/block/internal/submitting/da_submitter_test.go +++ b/block/internal/submitting/da_submitter_test.go @@ -212,8 +212,10 @@ func TestDASubmitter_SubmitHeaders_Success(t *testing.T) { require.NoError(t, batch2.SetHeight(2)) require.NoError(t, batch2.Commit()) - // Submit headers - err = submitter.SubmitHeaders(ctx, cm, signer) + // Get headers from cache and submit + headers, err := cm.GetPendingHeaders(ctx) + require.NoError(t, err) + err = submitter.SubmitHeaders(ctx, headers, cm, signer) require.NoError(t, err) // Verify headers are marked as DA included @@ -232,8 +234,10 @@ func TestDASubmitter_SubmitHeaders_NoPendingHeaders(t *testing.T) { // Create test signer _, _, signer := createTestSigner(t) - // Submit headers when none are pending - err := submitter.SubmitHeaders(ctx, cm, signer) + // Get headers from cache (should be empty) and submit + headers, err := cm.GetPendingHeaders(ctx) + require.NoError(t, err) + err = submitter.SubmitHeaders(ctx, headers, cm, signer) require.NoError(t, err) // Should succeed with no action mockDA.AssertNotCalled(t, "Submit", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) } @@ -325,8 +329,10 @@ func TestDASubmitter_SubmitData_Success(t *testing.T) { require.NoError(t, batch2.SetHeight(2)) require.NoError(t, batch2.Commit()) - // Submit data - err = submitter.SubmitData(ctx, cm, signer, gen) + // Get data from cache and submit + dataList, err := cm.GetPendingData(ctx) + require.NoError(t, err) + err = submitter.SubmitData(ctx, dataList, cm, signer, gen) require.NoError(t, err) // Verify data is marked as DA included @@ -376,8 +382,10 @@ func TestDASubmitter_SubmitData_SkipsEmptyData(t *testing.T) { require.NoError(t, batch.SetHeight(1)) require.NoError(t, batch.Commit()) - // Submit data - should succeed but skip empty data - err = submitter.SubmitData(ctx, cm, signer, gen) + // Get data from cache and submit - should succeed but skip empty data + dataList, err := cm.GetPendingData(ctx) + require.NoError(t, err) + err = submitter.SubmitData(ctx, dataList, cm, signer, gen) require.NoError(t, err) mockDA.AssertNotCalled(t, "Submit", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) @@ -393,8 +401,10 @@ func TestDASubmitter_SubmitData_NoPendingData(t *testing.T) { // Create test signer _, _, signer := createTestSigner(t) - // Submit data when none are pending - err := submitter.SubmitData(ctx, cm, signer, gen) + // Get data from cache (should be empty) and submit + dataList, err := cm.GetPendingData(ctx) + require.NoError(t, err) + err = submitter.SubmitData(ctx, dataList, cm, signer, gen) require.NoError(t, err) // Should succeed with no action mockDA.AssertNotCalled(t, "Submit", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) } @@ -432,14 +442,16 @@ func TestDASubmitter_SubmitData_NilSigner(t *testing.T) { require.NoError(t, batch.SetHeight(1)) require.NoError(t, batch.Commit()) - // Submit data with nil signer - should fail - err = submitter.SubmitData(ctx, cm, nil, gen) + // Get data from cache and submit with nil signer - should fail + dataList, err := cm.GetPendingData(ctx) + require.NoError(t, err) + err = submitter.SubmitData(ctx, dataList, cm, nil, gen) require.Error(t, err) assert.Contains(t, err.Error(), "signer is nil") mockDA.AssertNotCalled(t, "Submit", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) } -func TestDASubmitter_CreateSignedData(t *testing.T) { +func TestDASubmitter_SignData(t *testing.T) { submitter, _, _, _, gen := setupDASubmitterTest(t) // Create test signer @@ -483,7 +495,7 @@ func TestDASubmitter_CreateSignedData(t *testing.T) { dataList := []*types.SignedData{signedData1, signedData2, signedData3} // Create signed data - result, err := submitter.createSignedData(dataList, signer, gen) + result, err := submitter.signData(dataList, signer, gen) require.NoError(t, err) // Should have 2 items (empty data skipped) @@ -497,7 +509,7 @@ func TestDASubmitter_CreateSignedData(t *testing.T) { } } -func TestDASubmitter_CreateSignedData_NilSigner(t *testing.T) { +func TestDASubmitter_SignData_NilSigner(t *testing.T) { submitter, _, _, _, gen := setupDASubmitterTest(t) // Create test data @@ -514,7 +526,7 @@ func TestDASubmitter_CreateSignedData_NilSigner(t *testing.T) { dataList := []*types.SignedData{signedData} // Create signed data with nil signer - should fail - _, err := submitter.createSignedData(dataList, nil, gen) + _, err := submitter.signData(dataList, nil, gen) require.Error(t, err) assert.Contains(t, err.Error(), "signer is nil") } diff --git a/block/internal/submitting/submitter.go b/block/internal/submitting/submitter.go index 7db8343c47..260185b4a7 100644 --- a/block/internal/submitting/submitter.go +++ b/block/internal/submitting/submitter.go @@ -25,8 +25,8 @@ import ( // daSubmitterAPI defines minimal methods needed by Submitter for DA submissions. type daSubmitterAPI interface { - SubmitHeaders(ctx context.Context, cache cache.Manager, signer signer.Signer) error - SubmitData(ctx context.Context, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error + SubmitHeaders(ctx context.Context, headers []*types.SignedHeader, cache cache.Manager, signer signer.Signer) error + SubmitData(ctx context.Context, signedDataList []*types.SignedData, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error } // Submitter handles DA submission and inclusion processing for both sync and aggregator nodes @@ -185,7 +185,7 @@ func (s *Submitter) daSubmissionLoop() { return } - // Estimate total size + // Marshal headers once and estimate total size totalSize := 0 for _, h := range headers { data, err := h.MarshalBinary() @@ -209,7 +209,7 @@ func (s *Submitter) daSubmissionLoop() { Dur("time_since_last", timeSinceLastSubmit). Msg("batching strategy triggered header submission") - if err := s.daSubmitter.SubmitHeaders(s.ctx, s.cache, s.signer); err != nil { + if err := s.daSubmitter.SubmitHeaders(s.ctx, headers, s.cache, s.signer); err != nil { // Check for unrecoverable errors that indicate a critical issue if errors.Is(err, common.ErrOversizedItem) { s.logger.Error().Err(err). @@ -236,23 +236,23 @@ func (s *Submitter) daSubmissionLoop() { defer s.dataSubmissionMtx.Unlock() // Get pending data to estimate size - dataList, err := s.cache.GetPendingData(s.ctx) + signedDataList, err := s.cache.GetPendingData(s.ctx) if err != nil { s.logger.Error().Err(err).Msg("failed to get pending data for batching decision") return } - // Estimate total size + // Marshal data once and estimate total size totalSize := 0 - for _, d := range dataList { - data, err := d.MarshalBinary() + for _, sd := range signedDataList { + data, err := sd.MarshalBinary() if err == nil { totalSize += len(data) } } shouldSubmit := s.batchingStrategy.ShouldSubmit( - uint64(len(dataList)), + uint64(len(signedDataList)), totalSize, common.DefaultMaxBlobSize, timeSinceLastSubmit, @@ -266,7 +266,7 @@ func (s *Submitter) daSubmissionLoop() { Dur("time_since_last", timeSinceLastSubmit). Msg("batching strategy triggered data submission") - if err := s.daSubmitter.SubmitData(s.ctx, s.cache, s.signer, s.genesis); err != nil { + if err := s.daSubmitter.SubmitData(s.ctx, signedDataList, s.cache, s.signer, s.genesis); err != nil { // Check for unrecoverable errors that indicate a critical issue if errors.Is(err, common.ErrOversizedItem) { s.logger.Error().Err(err). diff --git a/block/internal/submitting/submitter_test.go b/block/internal/submitting/submitter_test.go index 63debf89db..ddce134a44 100644 --- a/block/internal/submitting/submitter_test.go +++ b/block/internal/submitting/submitter_test.go @@ -412,7 +412,7 @@ type fakeDASubmitter struct { chData chan struct{} } -func (f *fakeDASubmitter) SubmitHeaders(ctx context.Context, _ cache.Manager, _ signer.Signer) error { +func (f *fakeDASubmitter) SubmitHeaders(ctx context.Context, _ []*types.SignedHeader, _ cache.Manager, _ signer.Signer) error { select { case f.chHdr <- struct{}{}: default: @@ -420,7 +420,7 @@ func (f *fakeDASubmitter) SubmitHeaders(ctx context.Context, _ cache.Manager, _ return nil } -func (f *fakeDASubmitter) SubmitData(ctx context.Context, _ cache.Manager, _ signer.Signer, _ genesis.Genesis) error { +func (f *fakeDASubmitter) SubmitData(ctx context.Context, _ []*types.SignedData, _ cache.Manager, _ signer.Signer, _ genesis.Genesis) error { select { case f.chData <- struct{}{}: default: From 19815da5f8737a0e41c5f42e763b4b7425366963 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 15 Jan 2026 13:15:32 +0100 Subject: [PATCH 07/15] lint --- block/internal/submitting/da_submitter.go | 2 +- block/internal/submitting/submitter.go | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/block/internal/submitting/da_submitter.go b/block/internal/submitting/da_submitter.go index b396f3e8b5..3b9681e487 100644 --- a/block/internal/submitting/da_submitter.go +++ b/block/internal/submitting/da_submitter.go @@ -272,7 +272,7 @@ func (s *DASubmitter) signData(unsignedDataList []*types.SignedData, signer sign for _, unsignedData := range unsignedDataList { // Skip empty data - if len(unsignedData.Data.Txs) == 0 { + if len(unsignedData.Txs) == 0 { continue } diff --git a/block/internal/submitting/submitter.go b/block/internal/submitting/submitter.go index 260185b4a7..6f5a4c2e19 100644 --- a/block/internal/submitting/submitter.go +++ b/block/internal/submitting/submitter.go @@ -178,7 +178,6 @@ func (s *Submitter) daSubmissionLoop() { go func() { defer s.headerSubmissionMtx.Unlock() - // Get pending headers to estimate size headers, err := s.cache.GetPendingHeaders(s.ctx) if err != nil { s.logger.Error().Err(err).Msg("failed to get pending headers for batching decision") @@ -235,7 +234,6 @@ func (s *Submitter) daSubmissionLoop() { go func() { defer s.dataSubmissionMtx.Unlock() - // Get pending data to estimate size signedDataList, err := s.cache.GetPendingData(s.ctx) if err != nil { s.logger.Error().Err(err).Msg("failed to get pending data for batching decision") From 0bff41a6ee6ea6622a2c31f35f243947a960aeee Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 15 Jan 2026 13:20:28 +0100 Subject: [PATCH 08/15] fix race --- block/internal/submitting/submitter.go | 24 +++++++++++++-------- block/internal/submitting/submitter_test.go | 7 ++++-- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/block/internal/submitting/submitter.go b/block/internal/submitting/submitter.go index 6f5a4c2e19..8b79a9f9e7 100644 --- a/block/internal/submitting/submitter.go +++ b/block/internal/submitting/submitter.go @@ -56,8 +56,8 @@ type Submitter struct { dataSubmissionMtx sync.Mutex // Batching strategy state - lastHeaderSubmit time.Time - lastDataSubmit time.Time + lastHeaderSubmit atomic.Int64 // stores Unix nanoseconds + lastDataSubmit atomic.Int64 // stores Unix nanoseconds batchingStrategy BatchingStrategy // Channels for coordination @@ -94,7 +94,7 @@ func NewSubmitter( strategy = NewTimeBasedStrategy(config.DA.BlockTime.Duration, 0, 1) } - return &Submitter{ + submitter := &Submitter{ store: store, exec: exec, cache: cache, @@ -105,12 +105,16 @@ func NewSubmitter( sequencer: sequencer, signer: signer, daIncludedHeight: &atomic.Uint64{}, - lastHeaderSubmit: time.Now(), - lastDataSubmit: time.Now(), batchingStrategy: strategy, errorCh: errorCh, logger: submitterLogger, } + + now := time.Now().UnixNano() + submitter.lastHeaderSubmit.Store(now) + submitter.lastDataSubmit.Store(now) + + return submitter } // Start begins the submitting component @@ -170,7 +174,8 @@ func (s *Submitter) daSubmissionLoop() { // Check if we should submit headers based on batching strategy headersNb := s.cache.NumPendingHeaders() if headersNb > 0 { - timeSinceLastSubmit := time.Since(s.lastHeaderSubmit) + lastSubmitNanos := s.lastHeaderSubmit.Load() + timeSinceLastSubmit := time.Since(time.Unix(0, lastSubmitNanos)) // For strategy decision, we need to estimate the size // We'll fetch headers to check, but only submit if strategy approves @@ -218,7 +223,7 @@ func (s *Submitter) daSubmissionLoop() { } s.logger.Error().Err(err).Msg("failed to submit headers") } else { - s.lastHeaderSubmit = time.Now() + s.lastHeaderSubmit.Store(time.Now().UnixNano()) } } }() @@ -228,7 +233,8 @@ func (s *Submitter) daSubmissionLoop() { // Check if we should submit data based on batching strategy dataNb := s.cache.NumPendingData() if dataNb > 0 { - timeSinceLastSubmit := time.Since(s.lastDataSubmit) + lastSubmitNanos := s.lastDataSubmit.Load() + timeSinceLastSubmit := time.Since(time.Unix(0, lastSubmitNanos)) if s.dataSubmissionMtx.TryLock() { go func() { @@ -274,7 +280,7 @@ func (s *Submitter) daSubmissionLoop() { } s.logger.Error().Err(err).Msg("failed to submit data") } else { - s.lastDataSubmit = time.Now() + s.lastDataSubmit.Store(time.Now().UnixNano()) } } }() diff --git a/block/internal/submitting/submitter_test.go b/block/internal/submitting/submitter_test.go index ddce134a44..20d58911d1 100644 --- a/block/internal/submitting/submitter_test.go +++ b/block/internal/submitting/submitter_test.go @@ -364,12 +364,15 @@ func TestSubmitter_daSubmissionLoop(t *testing.T) { daSubmitter: fakeDA, signer: &fakeSigner{}, daIncludedHeight: &atomic.Uint64{}, - lastHeaderSubmit: time.Now().Add(-time.Hour), // Set far in past so strategy allows submission - lastDataSubmit: time.Now().Add(-time.Hour), batchingStrategy: batchingStrategy, logger: zerolog.Nop(), } + // Set last submit times far in past so strategy allows submission + pastTime := time.Now().Add(-time.Hour).UnixNano() + s.lastHeaderSubmit.Store(pastTime) + s.lastDataSubmit.Store(pastTime) + // Make there be pending headers and data by setting store height > last submitted h1, d1 := newHeaderAndData("test-chain", 1, true) h2, d2 := newHeaderAndData("test-chain", 2, true) From ee4847dc486a4dc0a5b8419fdcb0954f61ba3f47 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 15 Jan 2026 13:34:58 +0100 Subject: [PATCH 09/15] remove double marshalling --- block/internal/submitting/da_submitter.go | 115 ++++++------------ .../da_submitter_integration_test.go | 20 ++- .../submitting/da_submitter_mocks_test.go | 38 ++++-- .../internal/submitting/da_submitter_test.go | 49 ++++++-- block/internal/submitting/submitter.go | 32 +++-- block/internal/submitting/submitter_test.go | 4 +- 6 files changed, 150 insertions(+), 108 deletions(-) diff --git a/block/internal/submitting/da_submitter.go b/block/internal/submitting/da_submitter.go index 3b9681e487..5e09513122 100644 --- a/block/internal/submitting/da_submitter.go +++ b/block/internal/submitting/da_submitter.go @@ -160,7 +160,7 @@ func (s *DASubmitter) recordFailure(reason common.DASubmitterFailureReason) { } // SubmitHeaders submits pending headers to DA layer -func (s *DASubmitter) SubmitHeaders(ctx context.Context, headers []*types.SignedHeader, cache cache.Manager, signer signer.Signer) error { +func (s *DASubmitter) SubmitHeaders(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error { if len(headers) == 0 { return nil } @@ -169,26 +169,30 @@ func (s *DASubmitter) SubmitHeaders(ctx context.Context, headers []*types.Signed return fmt.Errorf("signer is nil") } + if len(marshalledHeaders) != len(headers) { + return fmt.Errorf("marshalledHeaders length (%d) does not match headers length (%d)", len(marshalledHeaders), len(headers)) + } + s.logger.Info().Int("count", len(headers)).Msg("submitting headers to DA") - return submitToDA(s, ctx, headers, - func(header *types.SignedHeader) ([]byte, error) { - // A. Marshal the inner SignedHeader content to bytes (canonical representation for signing) - // This effectively signs "Fields 1-3" of the intended DAHeaderEnvelope. - contentBytes, err := header.MarshalBinary() - if err != nil { - return nil, fmt.Errorf("failed to marshal signed header for envelope signing: %w", err) - } + // Create DA envelopes from pre-marshalled headers + envelopes := make([][]byte, len(headers)) + for i, header := range headers { + // Sign the pre-marshalled header content + envelopeSignature, err := signer.Sign(marshalledHeaders[i]) + if err != nil { + return fmt.Errorf("failed to sign envelope for header %d: %w", i, err) + } - // B. Sign the contentBytes with the envelope signer (aggregator) - envelopeSignature, err := signer.Sign(contentBytes) - if err != nil { - return nil, fmt.Errorf("failed to sign envelope: %w", err) - } + // Create the envelope and marshal it + envelope, err := header.MarshalDAEnvelope(envelopeSignature) + if err != nil { + return fmt.Errorf("failed to marshal DA envelope for header %d: %w", i, err) + } + envelopes[i] = envelope + } - // C. Create the envelope and marshal it - return header.MarshalDAEnvelope(envelopeSignature) - }, + return submitToDA(s, ctx, headers, envelopes, func(submitted []*types.SignedHeader, res *datypes.ResultSubmit) { for _, header := range submitted { cache.SetHeaderDAIncluded(header.Hash().String(), res.Height, header.Height()) @@ -206,11 +210,15 @@ func (s *DASubmitter) SubmitHeaders(ctx context.Context, headers []*types.Signed } // SubmitData submits pending data to DA layer -func (s *DASubmitter) SubmitData(ctx context.Context, unsignedDataList []*types.SignedData, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error { +func (s *DASubmitter) SubmitData(ctx context.Context, unsignedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error { if len(unsignedDataList) == 0 { return nil } + if len(marshalledData) != len(unsignedDataList) { + return fmt.Errorf("marshalledData length (%d) does not match unsignedDataList length (%d)", len(marshalledData), len(unsignedDataList)) + } + // Sign the data (cache returns unsigned SignedData structs) signedDataList, err := s.signData(unsignedDataList, signer, genesis) if err != nil { @@ -223,10 +231,17 @@ func (s *DASubmitter) SubmitData(ctx context.Context, unsignedDataList []*types. s.logger.Info().Int("count", len(signedDataList)).Msg("submitting data to DA") - return submitToDA(s, ctx, signedDataList, - func(signedData *types.SignedData) ([]byte, error) { - return signedData.MarshalBinary() - }, + // Filter marshalledData to match signedDataList (removes empty data) + filteredMarshalledData := make([][]byte, 0, len(signedDataList)) + signedIdx := 0 + for i, unsigned := range unsignedDataList { + if signedIdx < len(signedDataList) && unsigned.Height() == signedDataList[signedIdx].Height() { + filteredMarshalledData = append(filteredMarshalledData, marshalledData[i]) + signedIdx++ + } + } + + return submitToDA(s, ctx, signedDataList, filteredMarshalledData, func(submitted []*types.SignedData, res *datypes.ResultSubmit) { for _, sd := range submitted { cache.SetDataDAIncluded(sd.Data.DACommitment().String(), res.Height, sd.Height()) @@ -342,16 +357,15 @@ func submitToDA[T any]( s *DASubmitter, ctx context.Context, items []T, - marshalFn func(T) ([]byte, error), + marshaled [][]byte, postSubmit func([]T, *datypes.ResultSubmit), itemType string, namespace []byte, options []byte, getTotalPendingFn func() uint64, ) error { - marshaled, err := marshalItems(ctx, items, marshalFn, itemType) - if err != nil { - return err + if len(items) != len(marshaled) { + return fmt.Errorf("items length (%d) does not match marshaled length (%d)", len(items), len(marshaled)) } pol := defaultRetryPolicy(s.config.DA.MaxSubmitAttempts, s.config.DA.BlockTime.Duration) @@ -513,55 +527,6 @@ func limitBatchBySize[T any](items []T, marshaled [][]byte, maxBytes int) ([]T, return items[:count], marshaled[:count], nil } -func marshalItems[T any]( - parentCtx context.Context, - items []T, - marshalFn func(T) ([]byte, error), - itemType string, -) ([][]byte, error) { - if len(items) == 0 { - return nil, nil - } - marshaled := make([][]byte, len(items)) - ctx, cancel := context.WithCancel(parentCtx) - defer cancel() - - // Semaphore to limit concurrency to 32 workers - sem := make(chan struct{}, 32) - - // Use a channel to collect results from goroutines - resultCh := make(chan error, len(items)) - - // Marshal items concurrently - for i, item := range items { - go func(idx int, itm T) { - sem <- struct{}{} - defer func() { <-sem }() - - select { - case <-ctx.Done(): - resultCh <- ctx.Err() - default: - bz, err := marshalFn(itm) - if err != nil { - resultCh <- fmt.Errorf("failed to marshal %s item at index %d: %w", itemType, idx, err) - return - } - marshaled[idx] = bz - resultCh <- nil - } - }(i, item) - } - - // Wait for all goroutines to complete and check for errors - for i := 0; i < len(items); i++ { - if err := <-resultCh; err != nil { - return nil, err - } - } - return marshaled, nil -} - func waitForBackoffOrContext(ctx context.Context, backoff time.Duration) error { if backoff <= 0 { select { diff --git a/block/internal/submitting/da_submitter_integration_test.go b/block/internal/submitting/da_submitter_integration_test.go index c90ab114fd..ea45fb8d90 100644 --- a/block/internal/submitting/da_submitter_integration_test.go +++ b/block/internal/submitting/da_submitter_integration_test.go @@ -101,11 +101,27 @@ func TestDASubmitter_SubmitHeadersAndData_MarksInclusionAndUpdatesLastSubmitted( // Submit headers and data headers, err := cm.GetPendingHeaders(context.Background()) require.NoError(t, err) - require.NoError(t, daSubmitter.SubmitHeaders(context.Background(), headers, cm, n)) + + // Marshal headers + marshalledHeaders := make([][]byte, len(headers)) + for i, h := range headers { + data, err := h.MarshalBinary() + require.NoError(t, err) + marshalledHeaders[i] = data + } + require.NoError(t, daSubmitter.SubmitHeaders(context.Background(), headers, marshalledHeaders, cm, n)) dataList, err := cm.GetPendingData(context.Background()) require.NoError(t, err) - require.NoError(t, daSubmitter.SubmitData(context.Background(), dataList, cm, n, gen)) + + // Marshal data + marshalledData := make([][]byte, len(dataList)) + for i, d := range dataList { + data, err := d.MarshalBinary() + require.NoError(t, err) + marshalledData[i] = data + } + require.NoError(t, daSubmitter.SubmitData(context.Background(), dataList, marshalledData, cm, n, gen)) // After submission, inclusion markers should be set _, ok := cm.GetHeaderDAIncluded(hdr1.Hash().String()) diff --git a/block/internal/submitting/da_submitter_mocks_test.go b/block/internal/submitting/da_submitter_mocks_test.go index 19aea18b0d..bffa8c80bf 100644 --- a/block/internal/submitting/da_submitter_mocks_test.go +++ b/block/internal/submitting/da_submitter_mocks_test.go @@ -38,9 +38,6 @@ func newTestSubmitter(t *testing.T, mockClient *mocks.MockClient, override func( return NewDASubmitter(mockClient, cfg, genesis.Genesis{} /*options=*/, common.BlockOptions{}, common.NopMetrics(), zerolog.Nop()) } -// marshal helper for simple items -func marshalString(s string) ([]byte, error) { return []byte(s), nil } - func TestSubmitToDA_MempoolRetry_IncreasesGasAndSucceeds(t *testing.T) { t.Parallel() @@ -68,12 +65,17 @@ func TestSubmitToDA_MempoolRetry_IncreasesGasAndSucceeds(t *testing.T) { s := newTestSubmitter(t, client, nil) items := []string{"a", "b", "c"} + marshalledItems := make([][]byte, 0, len(items)) + for idx, item := range items { + marshalledItems[idx] = []byte(item) + } + ctx := context.Background() err := submitToDA[string]( s, ctx, items, - marshalString, + marshalledItems, func(_ []string, _ *datypes.ResultSubmit) {}, "item", nsBz, @@ -112,12 +114,17 @@ func TestSubmitToDA_UnknownError_RetriesSameGasThenSucceeds(t *testing.T) { s := newTestSubmitter(t, client, nil) items := []string{"x"} + marshalledItems := make([][]byte, 0, len(items)) + for idx, item := range items { + marshalledItems[idx] = []byte(item) + } + ctx := context.Background() err := submitToDA[string]( s, ctx, items, - marshalString, + marshalledItems, func(_ []string, _ *datypes.ResultSubmit) {}, "item", nsBz, @@ -158,12 +165,17 @@ func TestSubmitToDA_TooBig_HalvesBatch(t *testing.T) { s := newTestSubmitter(t, client, nil) items := []string{"a", "b", "c", "d"} + marshalledItems := make([][]byte, 0, len(items)) + for idx, item := range items { + marshalledItems[idx] = []byte(item) + } + ctx := context.Background() err := submitToDA[string]( s, ctx, items, - marshalString, + marshalledItems, func(_ []string, _ *datypes.ResultSubmit) {}, "item", nsBz, @@ -198,12 +210,17 @@ func TestSubmitToDA_SentinelNoGas_PreservesGasAcrossRetries(t *testing.T) { s := newTestSubmitter(t, client, nil) items := []string{"only"} + marshalledItems := make([][]byte, 0, len(items)) + for idx, item := range items { + marshalledItems[idx] = []byte(item) + } + ctx := context.Background() err := submitToDA[string]( s, ctx, items, - marshalString, + marshalledItems, func(_ []string, _ *datypes.ResultSubmit) {}, "item", nsBz, @@ -237,12 +254,17 @@ func TestSubmitToDA_PartialSuccess_AdvancesWindow(t *testing.T) { s := newTestSubmitter(t, client, nil) items := []string{"a", "b", "c"} + marshalledItems := make([][]byte, 0, len(items)) + for idx, item := range items { + marshalledItems[idx] = []byte(item) + } + ctx := context.Background() err := submitToDA[string]( s, ctx, items, - marshalString, + marshalledItems, func(submitted []string, _ *datypes.ResultSubmit) { totalSubmitted += len(submitted) }, "item", nsBz, diff --git a/block/internal/submitting/da_submitter_test.go b/block/internal/submitting/da_submitter_test.go index f19c825855..21a7235d43 100644 --- a/block/internal/submitting/da_submitter_test.go +++ b/block/internal/submitting/da_submitter_test.go @@ -215,7 +215,14 @@ func TestDASubmitter_SubmitHeaders_Success(t *testing.T) { // Get headers from cache and submit headers, err := cm.GetPendingHeaders(ctx) require.NoError(t, err) - err = submitter.SubmitHeaders(ctx, headers, cm, signer) + // Marshal headers + marshalledHeaders := make([][]byte, len(headers)) + for i, h := range headers { + data, err := h.MarshalBinary() + require.NoError(t, err) + marshalledHeaders[i] = data + } + err = submitter.SubmitHeaders(ctx, headers, marshalledHeaders, cm, signer) require.NoError(t, err) // Verify headers are marked as DA included @@ -237,7 +244,8 @@ func TestDASubmitter_SubmitHeaders_NoPendingHeaders(t *testing.T) { // Get headers from cache (should be empty) and submit headers, err := cm.GetPendingHeaders(ctx) require.NoError(t, err) - err = submitter.SubmitHeaders(ctx, headers, cm, signer) + var marshalledHeaders [][]byte + err = submitter.SubmitHeaders(ctx, headers, marshalledHeaders, cm, signer) require.NoError(t, err) // Should succeed with no action mockDA.AssertNotCalled(t, "Submit", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) } @@ -330,9 +338,16 @@ func TestDASubmitter_SubmitData_Success(t *testing.T) { require.NoError(t, batch2.Commit()) // Get data from cache and submit - dataList, err := cm.GetPendingData(ctx) + signedDataList, err := cm.GetPendingData(ctx) require.NoError(t, err) - err = submitter.SubmitData(ctx, dataList, cm, signer, gen) + // Marshal data + marshalledData := make([][]byte, len(signedDataList)) + for i, d := range signedDataList { + data, err := d.MarshalBinary() + require.NoError(t, err) + marshalledData[i] = data + } + err = submitter.SubmitData(ctx, signedDataList, marshalledData, cm, signer, gen) require.NoError(t, err) // Verify data is marked as DA included @@ -383,9 +398,17 @@ func TestDASubmitter_SubmitData_SkipsEmptyData(t *testing.T) { require.NoError(t, batch.Commit()) // Get data from cache and submit - should succeed but skip empty data - dataList, err := cm.GetPendingData(ctx) + // Get data from cache and submit + signedDataList, err := cm.GetPendingData(ctx) require.NoError(t, err) - err = submitter.SubmitData(ctx, dataList, cm, signer, gen) + // Marshal data + marshalledData := make([][]byte, len(signedDataList)) + for i, d := range signedDataList { + data, err := d.MarshalBinary() + require.NoError(t, err) + marshalledData[i] = data + } + err = submitter.SubmitData(ctx, signedDataList, marshalledData, cm, signer, gen) require.NoError(t, err) mockDA.AssertNotCalled(t, "Submit", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) @@ -404,7 +427,8 @@ func TestDASubmitter_SubmitData_NoPendingData(t *testing.T) { // Get data from cache (should be empty) and submit dataList, err := cm.GetPendingData(ctx) require.NoError(t, err) - err = submitter.SubmitData(ctx, dataList, cm, signer, gen) + var marshalledData [][]byte + err = submitter.SubmitData(ctx, dataList, marshalledData, cm, signer, gen) require.NoError(t, err) // Should succeed with no action mockDA.AssertNotCalled(t, "Submit", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) } @@ -443,9 +467,16 @@ func TestDASubmitter_SubmitData_NilSigner(t *testing.T) { require.NoError(t, batch.Commit()) // Get data from cache and submit with nil signer - should fail - dataList, err := cm.GetPendingData(ctx) + signedDataList, err := cm.GetPendingData(ctx) require.NoError(t, err) - err = submitter.SubmitData(ctx, dataList, cm, nil, gen) + // Marshal data + marshalledData := make([][]byte, len(signedDataList)) + for i, d := range signedDataList { + data, err := d.MarshalBinary() + require.NoError(t, err) + marshalledData[i] = data + } + err = submitter.SubmitData(ctx, signedDataList, marshalledData, cm, nil, gen) require.Error(t, err) assert.Contains(t, err.Error(), "signer is nil") mockDA.AssertNotCalled(t, "Submit", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) diff --git a/block/internal/submitting/submitter.go b/block/internal/submitting/submitter.go index 8b79a9f9e7..01ff770427 100644 --- a/block/internal/submitting/submitter.go +++ b/block/internal/submitting/submitter.go @@ -25,8 +25,8 @@ import ( // daSubmitterAPI defines minimal methods needed by Submitter for DA submissions. type daSubmitterAPI interface { - SubmitHeaders(ctx context.Context, headers []*types.SignedHeader, cache cache.Manager, signer signer.Signer) error - SubmitData(ctx context.Context, signedDataList []*types.SignedData, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error + SubmitHeaders(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error + SubmitData(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error } // Submitter handles DA submission and inclusion processing for both sync and aggregator nodes @@ -189,13 +189,17 @@ func (s *Submitter) daSubmissionLoop() { return } - // Marshal headers once and estimate total size + // Marshal headers once for both size estimation and submission + marshalledHeaders := make([][]byte, len(headers)) totalSize := 0 - for _, h := range headers { + for i, h := range headers { data, err := h.MarshalBinary() - if err == nil { - totalSize += len(data) + if err != nil { + s.logger.Error().Err(err).Int("index", i).Msg("failed to marshal header") + return } + marshalledHeaders[i] = data + totalSize += len(data) } shouldSubmit := s.batchingStrategy.ShouldSubmit( @@ -213,7 +217,7 @@ func (s *Submitter) daSubmissionLoop() { Dur("time_since_last", timeSinceLastSubmit). Msg("batching strategy triggered header submission") - if err := s.daSubmitter.SubmitHeaders(s.ctx, headers, s.cache, s.signer); err != nil { + if err := s.daSubmitter.SubmitHeaders(s.ctx, headers, marshalledHeaders, s.cache, s.signer); err != nil { // Check for unrecoverable errors that indicate a critical issue if errors.Is(err, common.ErrOversizedItem) { s.logger.Error().Err(err). @@ -246,13 +250,17 @@ func (s *Submitter) daSubmissionLoop() { return } - // Marshal data once and estimate total size + // Marshal data once for both size estimation and submission + marshalledData := make([][]byte, len(signedDataList)) totalSize := 0 - for _, sd := range signedDataList { + for i, sd := range signedDataList { data, err := sd.MarshalBinary() - if err == nil { - totalSize += len(data) + if err != nil { + s.logger.Error().Err(err).Int("index", i).Msg("failed to marshal data") + return } + marshalledData[i] = data + totalSize += len(data) } shouldSubmit := s.batchingStrategy.ShouldSubmit( @@ -270,7 +278,7 @@ func (s *Submitter) daSubmissionLoop() { Dur("time_since_last", timeSinceLastSubmit). Msg("batching strategy triggered data submission") - if err := s.daSubmitter.SubmitData(s.ctx, signedDataList, s.cache, s.signer, s.genesis); err != nil { + if err := s.daSubmitter.SubmitData(s.ctx, signedDataList, marshalledData, s.cache, s.signer, s.genesis); err != nil { // Check for unrecoverable errors that indicate a critical issue if errors.Is(err, common.ErrOversizedItem) { s.logger.Error().Err(err). diff --git a/block/internal/submitting/submitter_test.go b/block/internal/submitting/submitter_test.go index 20d58911d1..07703be94a 100644 --- a/block/internal/submitting/submitter_test.go +++ b/block/internal/submitting/submitter_test.go @@ -415,7 +415,7 @@ type fakeDASubmitter struct { chData chan struct{} } -func (f *fakeDASubmitter) SubmitHeaders(ctx context.Context, _ []*types.SignedHeader, _ cache.Manager, _ signer.Signer) error { +func (f *fakeDASubmitter) SubmitHeaders(ctx context.Context, _ []*types.SignedHeader, _ [][]byte, _ cache.Manager, _ signer.Signer) error { select { case f.chHdr <- struct{}{}: default: @@ -423,7 +423,7 @@ func (f *fakeDASubmitter) SubmitHeaders(ctx context.Context, _ []*types.SignedHe return nil } -func (f *fakeDASubmitter) SubmitData(ctx context.Context, _ []*types.SignedData, _ cache.Manager, _ signer.Signer, _ genesis.Genesis) error { +func (f *fakeDASubmitter) SubmitData(ctx context.Context, _ []*types.SignedData, _ [][]byte, _ cache.Manager, _ signer.Signer, _ genesis.Genesis) error { select { case f.chData <- struct{}{}: default: From 98e93177339131b0af459c49d0ab12d4248d6d58 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 15 Jan 2026 13:40:03 +0100 Subject: [PATCH 10/15] bring back concurrent marhalling but in submitter --- .../submitting/da_submitter_mocks_test.go | 10 +- block/internal/submitting/submitter.go | 95 ++++++++++++++----- 2 files changed, 78 insertions(+), 27 deletions(-) diff --git a/block/internal/submitting/da_submitter_mocks_test.go b/block/internal/submitting/da_submitter_mocks_test.go index bffa8c80bf..3b30a208cf 100644 --- a/block/internal/submitting/da_submitter_mocks_test.go +++ b/block/internal/submitting/da_submitter_mocks_test.go @@ -65,7 +65,7 @@ func TestSubmitToDA_MempoolRetry_IncreasesGasAndSucceeds(t *testing.T) { s := newTestSubmitter(t, client, nil) items := []string{"a", "b", "c"} - marshalledItems := make([][]byte, 0, len(items)) + marshalledItems := make([][]byte, len(items)) for idx, item := range items { marshalledItems[idx] = []byte(item) } @@ -114,7 +114,7 @@ func TestSubmitToDA_UnknownError_RetriesSameGasThenSucceeds(t *testing.T) { s := newTestSubmitter(t, client, nil) items := []string{"x"} - marshalledItems := make([][]byte, 0, len(items)) + marshalledItems := make([][]byte, len(items)) for idx, item := range items { marshalledItems[idx] = []byte(item) } @@ -165,7 +165,7 @@ func TestSubmitToDA_TooBig_HalvesBatch(t *testing.T) { s := newTestSubmitter(t, client, nil) items := []string{"a", "b", "c", "d"} - marshalledItems := make([][]byte, 0, len(items)) + marshalledItems := make([][]byte, len(items)) for idx, item := range items { marshalledItems[idx] = []byte(item) } @@ -210,7 +210,7 @@ func TestSubmitToDA_SentinelNoGas_PreservesGasAcrossRetries(t *testing.T) { s := newTestSubmitter(t, client, nil) items := []string{"only"} - marshalledItems := make([][]byte, 0, len(items)) + marshalledItems := make([][]byte, len(items)) for idx, item := range items { marshalledItems[idx] = []byte(item) } @@ -254,7 +254,7 @@ func TestSubmitToDA_PartialSuccess_AdvancesWindow(t *testing.T) { s := newTestSubmitter(t, client, nil) items := []string{"a", "b", "c"} - marshalledItems := make([][]byte, 0, len(items)) + marshalledItems := make([][]byte, len(items)) for idx, item := range items { marshalledItems[idx] = []byte(item) } diff --git a/block/internal/submitting/submitter.go b/block/internal/submitting/submitter.go index 01ff770427..0b57b410db 100644 --- a/block/internal/submitting/submitter.go +++ b/block/internal/submitting/submitter.go @@ -189,17 +189,13 @@ func (s *Submitter) daSubmissionLoop() { return } - // Marshal headers once for both size estimation and submission - marshalledHeaders := make([][]byte, len(headers)) - totalSize := 0 - for i, h := range headers { - data, err := h.MarshalBinary() - if err != nil { - s.logger.Error().Err(err).Int("index", i).Msg("failed to marshal header") - return - } - marshalledHeaders[i] = data - totalSize += len(data) + // Marshal headers concurrently for both size estimation and submission + marshalledHeaders, totalSize, err := marshalItems(s.ctx, headers, func(h *types.SignedHeader) ([]byte, error) { + return h.MarshalBinary() + }) + if err != nil { + s.logger.Error().Err(err).Msg("failed to marshal headers") + return } shouldSubmit := s.batchingStrategy.ShouldSubmit( @@ -250,17 +246,13 @@ func (s *Submitter) daSubmissionLoop() { return } - // Marshal data once for both size estimation and submission - marshalledData := make([][]byte, len(signedDataList)) - totalSize := 0 - for i, sd := range signedDataList { - data, err := sd.MarshalBinary() - if err != nil { - s.logger.Error().Err(err).Int("index", i).Msg("failed to marshal data") - return - } - marshalledData[i] = data - totalSize += len(data) + // Marshal data concurrently for both size estimation and submission + marshalledData, totalSize, err := marshalItems(s.ctx, signedDataList, func(sd *types.SignedData) ([]byte, error) { + return sd.MarshalBinary() + }) + if err != nil { + s.logger.Error().Err(err).Msg("failed to marshal data") + return } shouldSubmit := s.batchingStrategy.ShouldSubmit( @@ -298,6 +290,65 @@ func (s *Submitter) daSubmissionLoop() { } } +// marshalItems marshals items concurrently with a worker pool +func marshalItems[T any]( + ctx context.Context, + items []T, + marshalFn func(T) ([]byte, error), +) ([][]byte, int, error) { + if len(items) == 0 { + return nil, 0, nil + } + + marshaled := make([][]byte, len(items)) + workerCtx, cancel := context.WithCancel(ctx) + defer cancel() + + // Semaphore to limit concurrency to 32 workers + sem := make(chan struct{}, 32) + + // Use a channel to collect results from goroutines + type result struct { + idx int + err error + size int + } + resultCh := make(chan result, len(items)) + + // Marshal items concurrently + for i, item := range items { + go func(idx int, itm T) { + sem <- struct{}{} + defer func() { <-sem }() + + select { + case <-workerCtx.Done(): + resultCh <- result{idx: idx, err: workerCtx.Err()} + default: + bz, err := marshalFn(itm) + if err != nil { + resultCh <- result{idx: idx, err: fmt.Errorf("failed to marshal item at index %d: %w", idx, err)} + return + } + marshaled[idx] = bz + resultCh <- result{idx: idx, size: len(bz)} + } + }(i, item) + } + + // Wait for all goroutines to complete and accumulate total size + totalSize := 0 + for range items { + res := <-resultCh + if res.err != nil { + return nil, 0, res.err + } + totalSize += res.size + } + + return marshaled, totalSize, nil +} + // processDAInclusionLoop handles DA inclusion processing (both sync and aggregator nodes) func (s *Submitter) processDAInclusionLoop() { s.logger.Info().Msg("starting DA inclusion processing loop") From 4ff727d83898a7e39e034f885ca0b90d23e735cb Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 15 Jan 2026 14:18:55 +0100 Subject: [PATCH 11/15] marshal in cache --- block/internal/cache/bench_test.go | 4 +- block/internal/cache/manager.go | 20 +++++---- block/internal/cache/manager_test.go | 8 ++-- block/internal/cache/pending_base.go | 42 +++++++++++++++++-- block/internal/cache/pending_base_test.go | 2 +- block/internal/cache/pending_data.go | 40 ++++++++++++++++-- block/internal/cache/pending_data_test.go | 6 +-- block/internal/cache/pending_headers.go | 40 ++++++++++++++++-- block/internal/cache/pending_headers_test.go | 6 +-- .../da_submitter_integration_test.go | 22 ++-------- .../internal/submitting/da_submitter_test.go | 42 +++---------------- block/internal/submitting/submitter.go | 28 ++++++------- 12 files changed, 157 insertions(+), 103 deletions(-) diff --git a/block/internal/cache/bench_test.go b/block/internal/cache/bench_test.go index 0cd51b7e21..aef81866f2 100644 --- a/block/internal/cache/bench_test.go +++ b/block/internal/cache/bench_test.go @@ -72,7 +72,7 @@ func BenchmarkManager_GetPendingHeaders(b *testing.B) { b.ReportAllocs() b.ResetTimer() for b.Loop() { - hs, err := m.GetPendingHeaders(ctx) + hs, _, err := m.GetPendingHeaders(ctx) if err != nil { b.Fatal(err) } @@ -93,7 +93,7 @@ func BenchmarkManager_GetPendingData(b *testing.B) { b.ReportAllocs() b.ResetTimer() for b.Loop() { - ds, err := m.GetPendingData(ctx) + ds, _, err := m.GetPendingData(ctx) if err != nil { b.Fatal(err) } diff --git a/block/internal/cache/manager.go b/block/internal/cache/manager.go index db022fc470..bb1fcb7369 100644 --- a/block/internal/cache/manager.go +++ b/block/internal/cache/manager.go @@ -78,8 +78,8 @@ type CacheManager interface { // PendingManager provides operations for managing pending headers and data type PendingManager interface { - GetPendingHeaders(ctx context.Context) ([]*types.SignedHeader, error) - GetPendingData(ctx context.Context) ([]*types.SignedData, error) + GetPendingHeaders(ctx context.Context) ([]*types.SignedHeader, [][]byte, error) + GetPendingData(ctx context.Context) ([]*types.SignedData, [][]byte, error) SetLastSubmittedHeaderHeight(ctx context.Context, height uint64) SetLastSubmittedDataHeight(ctx context.Context, height uint64) NumPendingHeaders() uint64 @@ -318,20 +318,21 @@ func (m *implementation) DeleteHeight(blockHeight uint64) { } // Pending operations -func (m *implementation) GetPendingHeaders(ctx context.Context) ([]*types.SignedHeader, error) { +func (m *implementation) GetPendingHeaders(ctx context.Context) ([]*types.SignedHeader, [][]byte, error) { return m.pendingHeaders.GetPendingHeaders(ctx) } -func (m *implementation) GetPendingData(ctx context.Context) ([]*types.SignedData, error) { - // Get pending raw data - dataList, err := m.pendingData.GetPendingData(ctx) +func (m *implementation) GetPendingData(ctx context.Context) ([]*types.SignedData, [][]byte, error) { + // Get pending raw data with marshalled bytes + dataList, marshalledData, err := m.pendingData.GetPendingData(ctx) if err != nil { - return nil, err + return nil, nil, err } // Convert to SignedData (this logic was in manager.go) signedDataList := make([]*types.SignedData, 0, len(dataList)) - for _, data := range dataList { + marshalledSignedData := make([][]byte, 0, len(dataList)) + for i, data := range dataList { if len(data.Txs) == 0 { continue // Skip empty data } @@ -342,9 +343,10 @@ func (m *implementation) GetPendingData(ctx context.Context) ([]*types.SignedDat Data: *data, // Signature and Signer will be set by executing component }) + marshalledSignedData = append(marshalledSignedData, marshalledData[i]) } - return signedDataList, nil + return signedDataList, marshalledSignedData, nil } func (m *implementation) SetLastSubmittedHeaderHeight(ctx context.Context, height uint64) { diff --git a/block/internal/cache/manager_test.go b/block/internal/cache/manager_test.go index 7dc2051913..328a1de7db 100644 --- a/block/internal/cache/manager_test.go +++ b/block/internal/cache/manager_test.go @@ -183,14 +183,14 @@ func TestPendingHeadersAndData_Flow(t *testing.T) { require.NoError(t, err) // headers: all 3 should be pending initially - headers, err := cm.GetPendingHeaders(ctx) + headers, _, err := cm.GetPendingHeaders(ctx) require.NoError(t, err) require.Len(t, headers, 3) assert.Equal(t, uint64(1), headers[0].Height()) assert.Equal(t, uint64(3), headers[2].Height()) // data: empty one filtered, so 2 and 3 only - signedData, err := cm.GetPendingData(ctx) + signedData, _, err := cm.GetPendingData(ctx) require.NoError(t, err) require.Len(t, signedData, 2) assert.Equal(t, uint64(2), signedData[0].Height()) @@ -200,12 +200,12 @@ func TestPendingHeadersAndData_Flow(t *testing.T) { cm.SetLastSubmittedHeaderHeight(ctx, 1) cm.SetLastSubmittedDataHeight(ctx, 2) - headers, err = cm.GetPendingHeaders(ctx) + headers, _, err = cm.GetPendingHeaders(ctx) require.NoError(t, err) require.Len(t, headers, 2) assert.Equal(t, uint64(2), headers[0].Height()) - signedData, err = cm.GetPendingData(ctx) + signedData, _, err = cm.GetPendingData(ctx) require.NoError(t, err) require.Len(t, signedData, 1) assert.Equal(t, uint64(3), signedData[0].Height()) diff --git a/block/internal/cache/pending_base.go b/block/internal/cache/pending_base.go index ca36e999f4..6919c44ffa 100644 --- a/block/internal/cache/pending_base.go +++ b/block/internal/cache/pending_base.go @@ -5,6 +5,7 @@ import ( "encoding/binary" "errors" "fmt" + "sync" "sync/atomic" ds "github.com/ipfs/go-datastore" @@ -22,15 +23,20 @@ type pendingBase[T any] struct { metaKey string fetch func(ctx context.Context, store store.Store, height uint64) (T, error) lastHeight atomic.Uint64 + + // Marshalling cache to avoid redundant marshalling + marshalledMtx sync.RWMutex + marshalledCache map[uint64][]byte // key: height } // newPendingBase constructs a new pendingBase for a given type. func newPendingBase[T any](store store.Store, logger zerolog.Logger, metaKey string, fetch func(ctx context.Context, store store.Store, height uint64) (T, error)) (*pendingBase[T], error) { pb := &pendingBase[T]{ - store: store, - logger: logger, - metaKey: metaKey, - fetch: fetch, + store: store, + logger: logger, + metaKey: metaKey, + fetch: fetch, + marshalledCache: make(map[uint64][]byte), } if err := pb.init(); err != nil { return nil, err @@ -80,6 +86,9 @@ func (pb *pendingBase[T]) setLastSubmittedHeight(ctx context.Context, newLastSub if err != nil { pb.logger.Error().Err(err).Msg("failed to store height of latest item submitted to DA") } + + // Clear marshalled cache for submitted heights + pb.clearMarshalledCacheUpTo(newLastSubmittedHeight) } } @@ -101,3 +110,28 @@ func (pb *pendingBase[T]) init() error { pb.lastHeight.CompareAndSwap(0, lsh) return nil } + +// getMarshalledForHeight returns cached marshalled bytes for a height, or nil if not cached +func (pb *pendingBase[T]) getMarshalledForHeight(height uint64) []byte { + pb.marshalledMtx.RLock() + defer pb.marshalledMtx.RUnlock() + return pb.marshalledCache[height] +} + +// setMarshalledForHeight caches marshalled bytes for a height +func (pb *pendingBase[T]) setMarshalledForHeight(height uint64, marshalled []byte) { + pb.marshalledMtx.Lock() + defer pb.marshalledMtx.Unlock() + pb.marshalledCache[height] = marshalled +} + +// clearMarshalledCacheUpTo removes cached marshalled bytes up to and including the given height +func (pb *pendingBase[T]) clearMarshalledCacheUpTo(height uint64) { + pb.marshalledMtx.Lock() + defer pb.marshalledMtx.Unlock() + for h := range pb.marshalledCache { + if h <= height { + delete(pb.marshalledCache, h) + } + } +} diff --git a/block/internal/cache/pending_base_test.go b/block/internal/cache/pending_base_test.go index 21c9814d3c..eb9734a035 100644 --- a/block/internal/cache/pending_base_test.go +++ b/block/internal/cache/pending_base_test.go @@ -35,7 +35,7 @@ func TestPendingBase_ErrorConditions(t *testing.T) { // ensure store height stays lower (0) ph, err := NewPendingHeaders(st, logger) require.NoError(t, err) - pending, err := ph.GetPendingHeaders(ctx) + pending, _, err := ph.GetPendingHeaders(ctx) assert.Error(t, err) assert.Len(t, pending, 0) diff --git a/block/internal/cache/pending_data.go b/block/internal/cache/pending_data.go index 6f75eb54aa..33cff58d65 100644 --- a/block/internal/cache/pending_data.go +++ b/block/internal/cache/pending_data.go @@ -2,6 +2,7 @@ package cache import ( "context" + "fmt" "github.com/rs/zerolog" @@ -46,9 +47,42 @@ func (pd *PendingData) init() error { return pd.base.init() } -// GetPendingData returns a sorted slice of pending Data. -func (pd *PendingData) GetPendingData(ctx context.Context) ([]*types.Data, error) { - return pd.base.getPending(ctx) +// GetPendingData returns a sorted slice of pending Data along with their marshalled bytes. +// It uses an internal cache to avoid re-marshalling data on subsequent calls. +func (pd *PendingData) GetPendingData(ctx context.Context) ([]*types.Data, [][]byte, error) { + dataList, err := pd.base.getPending(ctx) + if err != nil { + return nil, nil, err + } + + if len(dataList) == 0 { + return nil, nil, nil + } + + marshalled := make([][]byte, len(dataList)) + lastSubmitted := pd.base.lastHeight.Load() + + for i, data := range dataList { + height := lastSubmitted + uint64(i) + 1 + + // Try to get from cache first + if cached := pd.base.getMarshalledForHeight(height); cached != nil { + marshalled[i] = cached + continue + } + + // Marshal if not in cache + dataBytes, err := data.MarshalBinary() + if err != nil { + return nil, nil, fmt.Errorf("failed to marshal data at height %d: %w", height, err) + } + marshalled[i] = dataBytes + + // Store in cache + pd.base.setMarshalledForHeight(height, dataBytes) + } + + return dataList, marshalled, nil } func (pd *PendingData) NumPendingData() uint64 { diff --git a/block/internal/cache/pending_data_test.go b/block/internal/cache/pending_data_test.go index a3c34fdaf9..75679a24f3 100644 --- a/block/internal/cache/pending_data_test.go +++ b/block/internal/cache/pending_data_test.go @@ -39,7 +39,7 @@ func TestPendingData_BasicFlow(t *testing.T) { // initially all 3 data items are pending, incl. empty require.Equal(t, uint64(3), pendingData.NumPendingData()) - pendingDataList, err := pendingData.GetPendingData(ctx) + pendingDataList, _, err := pendingData.GetPendingData(ctx) require.NoError(t, err) require.Len(t, pendingDataList, 3) require.Equal(t, uint64(1), pendingDataList[0].Height()) @@ -53,7 +53,7 @@ func TestPendingData_BasicFlow(t *testing.T) { require.Equal(t, uint64(1), binary.LittleEndian.Uint64(metadataRaw)) require.Equal(t, uint64(2), pendingData.NumPendingData()) - pendingDataList, err = pendingData.GetPendingData(ctx) + pendingDataList, _, err = pendingData.GetPendingData(ctx) require.NoError(t, err) require.Len(t, pendingDataList, 2) require.Equal(t, uint64(2), pendingDataList[0].Height()) @@ -97,7 +97,7 @@ func TestPendingData_GetPending_PropagatesFetchError(t *testing.T) { require.NoError(t, err) // fetching pending should propagate the not-found error from store - pending, err := pendingData.GetPendingData(ctx) + pending, _, err := pendingData.GetPendingData(ctx) require.Error(t, err) require.Empty(t, pending) } diff --git a/block/internal/cache/pending_headers.go b/block/internal/cache/pending_headers.go index ab8453ec1b..712d18cd62 100644 --- a/block/internal/cache/pending_headers.go +++ b/block/internal/cache/pending_headers.go @@ -2,6 +2,7 @@ package cache import ( "context" + "fmt" "github.com/rs/zerolog" @@ -39,9 +40,42 @@ func NewPendingHeaders(store storepkg.Store, logger zerolog.Logger) (*PendingHea return &PendingHeaders{base: base}, nil } -// GetPendingHeaders returns a sorted slice of pending headers. -func (ph *PendingHeaders) GetPendingHeaders(ctx context.Context) ([]*types.SignedHeader, error) { - return ph.base.getPending(ctx) +// GetPendingHeaders returns a sorted slice of pending headers along with their marshalled bytes. +// It uses an internal cache to avoid re-marshalling headers on subsequent calls. +func (ph *PendingHeaders) GetPendingHeaders(ctx context.Context) ([]*types.SignedHeader, [][]byte, error) { + headers, err := ph.base.getPending(ctx) + if err != nil { + return nil, nil, err + } + + if len(headers) == 0 { + return nil, nil, nil + } + + marshalled := make([][]byte, len(headers)) + lastSubmitted := ph.base.lastHeight.Load() + + for i, header := range headers { + height := lastSubmitted + uint64(i) + 1 + + // Try to get from cache first + if cached := ph.base.getMarshalledForHeight(height); cached != nil { + marshalled[i] = cached + continue + } + + // Marshal if not in cache + data, err := header.MarshalBinary() + if err != nil { + return nil, nil, fmt.Errorf("failed to marshal header at height %d: %w", height, err) + } + marshalled[i] = data + + // Store in cache + ph.base.setMarshalledForHeight(height, data) + } + + return headers, marshalled, nil } func (ph *PendingHeaders) NumPendingHeaders() uint64 { diff --git a/block/internal/cache/pending_headers_test.go b/block/internal/cache/pending_headers_test.go index 758ff685b9..25c029700c 100644 --- a/block/internal/cache/pending_headers_test.go +++ b/block/internal/cache/pending_headers_test.go @@ -39,7 +39,7 @@ func TestPendingHeaders_BasicFlow(t *testing.T) { // initially all three are pending require.Equal(t, uint64(3), pendingHeaders.NumPendingHeaders()) - headers, err := pendingHeaders.GetPendingHeaders(ctx) + headers, _, err := pendingHeaders.GetPendingHeaders(ctx) require.NoError(t, err) require.Len(t, headers, 3) require.Equal(t, uint64(1), headers[0].Height()) @@ -53,7 +53,7 @@ func TestPendingHeaders_BasicFlow(t *testing.T) { require.Equal(t, uint64(2), binary.LittleEndian.Uint64(metadataRaw)) require.Equal(t, uint64(1), pendingHeaders.NumPendingHeaders()) - headers, err = pendingHeaders.GetPendingHeaders(ctx) + headers, _, err = pendingHeaders.GetPendingHeaders(ctx) require.NoError(t, err) require.Len(t, headers, 1) require.Equal(t, uint64(3), headers[0].Height()) @@ -82,7 +82,7 @@ func TestPendingHeaders_EmptyWhenUpToDate(t *testing.T) { // set last submitted to the current height, so nothing pending pendingHeaders.SetLastSubmittedHeaderHeight(ctx, 1) require.Equal(t, uint64(0), pendingHeaders.NumPendingHeaders()) - headers, err := pendingHeaders.GetPendingHeaders(ctx) + headers, _, err := pendingHeaders.GetPendingHeaders(ctx) require.NoError(t, err) require.Empty(t, headers) } diff --git a/block/internal/submitting/da_submitter_integration_test.go b/block/internal/submitting/da_submitter_integration_test.go index ea45fb8d90..0c23f7d08a 100644 --- a/block/internal/submitting/da_submitter_integration_test.go +++ b/block/internal/submitting/da_submitter_integration_test.go @@ -98,29 +98,13 @@ func TestDASubmitter_SubmitHeadersAndData_MarksInclusionAndUpdatesLastSubmitted( }).Twice() daSubmitter := NewDASubmitter(client, cfg, gen, common.DefaultBlockOptions(), common.NopMetrics(), zerolog.Nop()) - // Submit headers and data - headers, err := cm.GetPendingHeaders(context.Background()) + // Submit headers and data - cache returns both items and marshalled bytes + headers, marshalledHeaders, err := cm.GetPendingHeaders(context.Background()) require.NoError(t, err) - - // Marshal headers - marshalledHeaders := make([][]byte, len(headers)) - for i, h := range headers { - data, err := h.MarshalBinary() - require.NoError(t, err) - marshalledHeaders[i] = data - } require.NoError(t, daSubmitter.SubmitHeaders(context.Background(), headers, marshalledHeaders, cm, n)) - dataList, err := cm.GetPendingData(context.Background()) + dataList, marshalledData, err := cm.GetPendingData(context.Background()) require.NoError(t, err) - - // Marshal data - marshalledData := make([][]byte, len(dataList)) - for i, d := range dataList { - data, err := d.MarshalBinary() - require.NoError(t, err) - marshalledData[i] = data - } require.NoError(t, daSubmitter.SubmitData(context.Background(), dataList, marshalledData, cm, n, gen)) // After submission, inclusion markers should be set diff --git a/block/internal/submitting/da_submitter_test.go b/block/internal/submitting/da_submitter_test.go index 21a7235d43..9df66d4f69 100644 --- a/block/internal/submitting/da_submitter_test.go +++ b/block/internal/submitting/da_submitter_test.go @@ -213,15 +213,8 @@ func TestDASubmitter_SubmitHeaders_Success(t *testing.T) { require.NoError(t, batch2.Commit()) // Get headers from cache and submit - headers, err := cm.GetPendingHeaders(ctx) + headers, marshalledHeaders, err := cm.GetPendingHeaders(ctx) require.NoError(t, err) - // Marshal headers - marshalledHeaders := make([][]byte, len(headers)) - for i, h := range headers { - data, err := h.MarshalBinary() - require.NoError(t, err) - marshalledHeaders[i] = data - } err = submitter.SubmitHeaders(ctx, headers, marshalledHeaders, cm, signer) require.NoError(t, err) @@ -242,9 +235,8 @@ func TestDASubmitter_SubmitHeaders_NoPendingHeaders(t *testing.T) { _, _, signer := createTestSigner(t) // Get headers from cache (should be empty) and submit - headers, err := cm.GetPendingHeaders(ctx) + headers, marshalledHeaders, err := cm.GetPendingHeaders(ctx) require.NoError(t, err) - var marshalledHeaders [][]byte err = submitter.SubmitHeaders(ctx, headers, marshalledHeaders, cm, signer) require.NoError(t, err) // Should succeed with no action mockDA.AssertNotCalled(t, "Submit", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) @@ -338,15 +330,8 @@ func TestDASubmitter_SubmitData_Success(t *testing.T) { require.NoError(t, batch2.Commit()) // Get data from cache and submit - signedDataList, err := cm.GetPendingData(ctx) + signedDataList, marshalledData, err := cm.GetPendingData(ctx) require.NoError(t, err) - // Marshal data - marshalledData := make([][]byte, len(signedDataList)) - for i, d := range signedDataList { - data, err := d.MarshalBinary() - require.NoError(t, err) - marshalledData[i] = data - } err = submitter.SubmitData(ctx, signedDataList, marshalledData, cm, signer, gen) require.NoError(t, err) @@ -399,15 +384,8 @@ func TestDASubmitter_SubmitData_SkipsEmptyData(t *testing.T) { // Get data from cache and submit - should succeed but skip empty data // Get data from cache and submit - signedDataList, err := cm.GetPendingData(ctx) + signedDataList, marshalledData, err := cm.GetPendingData(ctx) require.NoError(t, err) - // Marshal data - marshalledData := make([][]byte, len(signedDataList)) - for i, d := range signedDataList { - data, err := d.MarshalBinary() - require.NoError(t, err) - marshalledData[i] = data - } err = submitter.SubmitData(ctx, signedDataList, marshalledData, cm, signer, gen) require.NoError(t, err) mockDA.AssertNotCalled(t, "Submit", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) @@ -425,9 +403,8 @@ func TestDASubmitter_SubmitData_NoPendingData(t *testing.T) { _, _, signer := createTestSigner(t) // Get data from cache (should be empty) and submit - dataList, err := cm.GetPendingData(ctx) + dataList, marshalledData, err := cm.GetPendingData(ctx) require.NoError(t, err) - var marshalledData [][]byte err = submitter.SubmitData(ctx, dataList, marshalledData, cm, signer, gen) require.NoError(t, err) // Should succeed with no action mockDA.AssertNotCalled(t, "Submit", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) @@ -467,15 +444,8 @@ func TestDASubmitter_SubmitData_NilSigner(t *testing.T) { require.NoError(t, batch.Commit()) // Get data from cache and submit with nil signer - should fail - signedDataList, err := cm.GetPendingData(ctx) + signedDataList, marshalledData, err := cm.GetPendingData(ctx) require.NoError(t, err) - // Marshal data - marshalledData := make([][]byte, len(signedDataList)) - for i, d := range signedDataList { - data, err := d.MarshalBinary() - require.NoError(t, err) - marshalledData[i] = data - } err = submitter.SubmitData(ctx, signedDataList, marshalledData, cm, nil, gen) require.Error(t, err) assert.Contains(t, err.Error(), "signer is nil") diff --git a/block/internal/submitting/submitter.go b/block/internal/submitting/submitter.go index 0b57b410db..8c4639bee9 100644 --- a/block/internal/submitting/submitter.go +++ b/block/internal/submitting/submitter.go @@ -183,19 +183,17 @@ func (s *Submitter) daSubmissionLoop() { go func() { defer s.headerSubmissionMtx.Unlock() - headers, err := s.cache.GetPendingHeaders(s.ctx) + // Get headers with marshalled bytes from cache + headers, marshalledHeaders, err := s.cache.GetPendingHeaders(s.ctx) if err != nil { s.logger.Error().Err(err).Msg("failed to get pending headers for batching decision") return } - // Marshal headers concurrently for both size estimation and submission - marshalledHeaders, totalSize, err := marshalItems(s.ctx, headers, func(h *types.SignedHeader) ([]byte, error) { - return h.MarshalBinary() - }) - if err != nil { - s.logger.Error().Err(err).Msg("failed to marshal headers") - return + // Calculate total size + totalSize := 0 + for _, marshalled := range marshalledHeaders { + totalSize += len(marshalled) } shouldSubmit := s.batchingStrategy.ShouldSubmit( @@ -240,19 +238,17 @@ func (s *Submitter) daSubmissionLoop() { go func() { defer s.dataSubmissionMtx.Unlock() - signedDataList, err := s.cache.GetPendingData(s.ctx) + // Get data with marshalled bytes from cache + signedDataList, marshalledData, err := s.cache.GetPendingData(s.ctx) if err != nil { s.logger.Error().Err(err).Msg("failed to get pending data for batching decision") return } - // Marshal data concurrently for both size estimation and submission - marshalledData, totalSize, err := marshalItems(s.ctx, signedDataList, func(sd *types.SignedData) ([]byte, error) { - return sd.MarshalBinary() - }) - if err != nil { - s.logger.Error().Err(err).Msg("failed to marshal data") - return + // Calculate total size + totalSize := 0 + for _, marshalled := range marshalledData { + totalSize += len(marshalled) } shouldSubmit := s.batchingStrategy.ShouldSubmit( From de67ecf29006dfd3c7bfe4e5eb0f35c0f2dee92d Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 15 Jan 2026 14:31:54 +0100 Subject: [PATCH 12/15] lower max blob --- block/internal/common/consts.go | 2 +- test/testda/dummy.go | 4 ++-- tools/local-da/local.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/block/internal/common/consts.go b/block/internal/common/consts.go index 76aed4249e..c9dd8cdb72 100644 --- a/block/internal/common/consts.go +++ b/block/internal/common/consts.go @@ -1,3 +1,3 @@ package common -const DefaultMaxBlobSize = 8 * 1024 * 1024 // 8MB fallback blob size limit +const DefaultMaxBlobSize = 7 * 1024 * 1024 // 7MB fallback blob size limit diff --git a/test/testda/dummy.go b/test/testda/dummy.go index 49469a350d..04a9700a38 100644 --- a/test/testda/dummy.go +++ b/test/testda/dummy.go @@ -11,8 +11,8 @@ import ( ) const ( - // DefaultMaxBlobSize is the default maximum blob size (8MB). - DefaultMaxBlobSize = 8 * 1024 * 1024 + // DefaultMaxBlobSize is the default maximum blob size (7MB). + DefaultMaxBlobSize = 7 * 1024 * 1024 ) // Header contains DA layer header information for a given height. diff --git a/tools/local-da/local.go b/tools/local-da/local.go index 9f06471dac..82ccfc0493 100644 --- a/tools/local-da/local.go +++ b/tools/local-da/local.go @@ -19,7 +19,7 @@ import ( ) // DefaultMaxBlobSize is the default max blob size -const DefaultMaxBlobSize uint64 = 8 * 1024 * 1024 // 8MB +const DefaultMaxBlobSize uint64 = 7 * 1024 * 1024 // 7MB // LocalDA is a simple implementation of in-memory DA. Not production ready! Intended only for testing! // From fabc2de1dee5843aeb69eb9f53730c6926c72aae Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 15 Jan 2026 14:40:55 +0100 Subject: [PATCH 13/15] updates --- .../internal/submitting/batching_strategy.go | 3 +- block/internal/submitting/submitter.go | 63 +------------------ 2 files changed, 4 insertions(+), 62 deletions(-) diff --git a/block/internal/submitting/batching_strategy.go b/block/internal/submitting/batching_strategy.go index 58436e0597..72eccdc4d9 100644 --- a/block/internal/submitting/batching_strategy.go +++ b/block/internal/submitting/batching_strategy.go @@ -8,10 +8,11 @@ import ( ) // BatchingStrategy defines the interface for different batching strategies +// Batching strategies always go through the da submitter which does extra size checks and possible further splitting for batches above the DA layer blob size. type BatchingStrategy interface { // ShouldSubmit determines if a batch should be submitted based on the strategy // Returns true if submission should happen now - ShouldSubmit(pendingCount uint64, totalSize int, maxBlobSize int, timeSinceLastSubmit time.Duration) bool + ShouldSubmit(pendingCount uint64, totalSizeBeforeSig int, maxBlobSize int, timeSinceLastSubmit time.Duration) bool } // NewBatchingStrategy creates a batching strategy based on configuration diff --git a/block/internal/submitting/submitter.go b/block/internal/submitting/submitter.go index 8c4639bee9..28e0c6d685 100644 --- a/block/internal/submitting/submitter.go +++ b/block/internal/submitting/submitter.go @@ -190,7 +190,7 @@ func (s *Submitter) daSubmissionLoop() { return } - // Calculate total size + // Calculate total size (excluding signature) totalSize := 0 for _, marshalled := range marshalledHeaders { totalSize += len(marshalled) @@ -245,7 +245,7 @@ func (s *Submitter) daSubmissionLoop() { return } - // Calculate total size + // Calculate total size (excluding signature) totalSize := 0 for _, marshalled := range marshalledData { totalSize += len(marshalled) @@ -286,65 +286,6 @@ func (s *Submitter) daSubmissionLoop() { } } -// marshalItems marshals items concurrently with a worker pool -func marshalItems[T any]( - ctx context.Context, - items []T, - marshalFn func(T) ([]byte, error), -) ([][]byte, int, error) { - if len(items) == 0 { - return nil, 0, nil - } - - marshaled := make([][]byte, len(items)) - workerCtx, cancel := context.WithCancel(ctx) - defer cancel() - - // Semaphore to limit concurrency to 32 workers - sem := make(chan struct{}, 32) - - // Use a channel to collect results from goroutines - type result struct { - idx int - err error - size int - } - resultCh := make(chan result, len(items)) - - // Marshal items concurrently - for i, item := range items { - go func(idx int, itm T) { - sem <- struct{}{} - defer func() { <-sem }() - - select { - case <-workerCtx.Done(): - resultCh <- result{idx: idx, err: workerCtx.Err()} - default: - bz, err := marshalFn(itm) - if err != nil { - resultCh <- result{idx: idx, err: fmt.Errorf("failed to marshal item at index %d: %w", idx, err)} - return - } - marshaled[idx] = bz - resultCh <- result{idx: idx, size: len(bz)} - } - }(i, item) - } - - // Wait for all goroutines to complete and accumulate total size - totalSize := 0 - for range items { - res := <-resultCh - if res.err != nil { - return nil, 0, res.err - } - totalSize += res.size - } - - return marshaled, totalSize, nil -} - // processDAInclusionLoop handles DA inclusion processing (both sync and aggregator nodes) func (s *Submitter) processDAInclusionLoop() { s.logger.Info().Msg("starting DA inclusion processing loop") From c7d20d0aceffb9e11d3ca4d50ac01ea248785347 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 15 Jan 2026 14:42:21 +0100 Subject: [PATCH 14/15] add cl --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 438bc7fec6..7b299beb19 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added `post-tx` command and force inclusion server to submit transaction directly to the DA layer. ([#2888](https://github.com/evstack/ev-node/pull/2888)) Additionally, modified the core package to support marking transactions as forced included transactions. The execution client ought to perform basic validation on those transactions as they have skipped the execution client's mempool. +- Add batching stategies (default stay time-based, unchanged with previous betas). Currently available strategies are `time`, `size`, `immediate` and `adaptive`. ### Changed From 4a6661ef2f0597427e5d827c9e3ad7cd9db080f6 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 15 Jan 2026 14:58:39 +0100 Subject: [PATCH 15/15] feedback --- block/internal/cache/pending_base.go | 34 ++++++++++++-------------- block/internal/submitting/submitter.go | 2 +- 2 files changed, 16 insertions(+), 20 deletions(-) diff --git a/block/internal/cache/pending_base.go b/block/internal/cache/pending_base.go index 6919c44ffa..5e22287869 100644 --- a/block/internal/cache/pending_base.go +++ b/block/internal/cache/pending_base.go @@ -25,18 +25,16 @@ type pendingBase[T any] struct { lastHeight atomic.Uint64 // Marshalling cache to avoid redundant marshalling - marshalledMtx sync.RWMutex - marshalledCache map[uint64][]byte // key: height + marshalledCache sync.Map // key: uint64 (height), value: []byte } // newPendingBase constructs a new pendingBase for a given type. func newPendingBase[T any](store store.Store, logger zerolog.Logger, metaKey string, fetch func(ctx context.Context, store store.Store, height uint64) (T, error)) (*pendingBase[T], error) { pb := &pendingBase[T]{ - store: store, - logger: logger, - metaKey: metaKey, - fetch: fetch, - marshalledCache: make(map[uint64][]byte), + store: store, + logger: logger, + metaKey: metaKey, + fetch: fetch, } if err := pb.init(); err != nil { return nil, err @@ -113,25 +111,23 @@ func (pb *pendingBase[T]) init() error { // getMarshalledForHeight returns cached marshalled bytes for a height, or nil if not cached func (pb *pendingBase[T]) getMarshalledForHeight(height uint64) []byte { - pb.marshalledMtx.RLock() - defer pb.marshalledMtx.RUnlock() - return pb.marshalledCache[height] + if val, ok := pb.marshalledCache.Load(height); ok { + return val.([]byte) + } + return nil } // setMarshalledForHeight caches marshalled bytes for a height func (pb *pendingBase[T]) setMarshalledForHeight(height uint64, marshalled []byte) { - pb.marshalledMtx.Lock() - defer pb.marshalledMtx.Unlock() - pb.marshalledCache[height] = marshalled + pb.marshalledCache.Store(height, marshalled) } // clearMarshalledCacheUpTo removes cached marshalled bytes up to and including the given height func (pb *pendingBase[T]) clearMarshalledCacheUpTo(height uint64) { - pb.marshalledMtx.Lock() - defer pb.marshalledMtx.Unlock() - for h := range pb.marshalledCache { - if h <= height { - delete(pb.marshalledCache, h) + pb.marshalledCache.Range(func(key, _ any) bool { + if h := key.(uint64); h <= height { + pb.marshalledCache.Delete(h) } - } + return true + }) } diff --git a/block/internal/submitting/submitter.go b/block/internal/submitting/submitter.go index 28e0c6d685..0c70db9788 100644 --- a/block/internal/submitting/submitter.go +++ b/block/internal/submitting/submitter.go @@ -161,7 +161,7 @@ func (s *Submitter) daSubmissionLoop() { defer s.logger.Info().Msg("DA submission loop stopped") // Use a shorter ticker interval to check batching strategy more frequently - checkInterval := min(s.config.DA.BlockTime.Duration/4, 100*time.Millisecond) + checkInterval := max(s.config.DA.BlockTime.Duration/4, 100*time.Millisecond) ticker := time.NewTicker(checkInterval) defer ticker.Stop()