Skip to content

Commit 897a04c

Browse files
authored
feat(processor): add batch block fetching for improved throughput (#69)
Implement batch block fetching to request multiple blocks at once using BatchCallContext RPC, with proper backpressure handling and contiguous-only batching. Changes: - Add BlocksByNumbers to Node/DataSource interfaces for batch RPC - Implement batch RPC using go-ethereum's BatchCallContext - Add GetAvailableCapacity and ValidateBatchWithinLeash to Limiter - Add NextBlocks method to State Manager for batch number generation - Add InitBlocks batch method to PendingTracker using Redis pipeline - Refactor ProcessNextBlock in all processors to support batch fetching - Add exponential backoff with jitter for backpressure handling The batch size is limited by min(maxPendingBlockRange, availableCapacity) and stops at the first missing/not-found block to maintain contiguity.
1 parent 3b9afba commit 897a04c

File tree

14 files changed

+739
-176
lines changed

14 files changed

+739
-176
lines changed

pkg/ethereum/execution/embedded_node.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ type DataSource interface {
3636
// BlockByNumber returns the block at the given number.
3737
BlockByNumber(ctx context.Context, number *big.Int) (Block, error)
3838

39+
// BlocksByNumbers returns blocks at the given numbers.
40+
// Returns blocks up to the first not-found (contiguous only).
41+
BlocksByNumbers(ctx context.Context, numbers []*big.Int) ([]Block, error)
42+
3943
// BlockReceipts returns all receipts for the block at the given number.
4044
BlockReceipts(ctx context.Context, number *big.Int) ([]Receipt, error)
4145

@@ -161,6 +165,11 @@ func (n *EmbeddedNode) BlockByNumber(ctx context.Context, number *big.Int) (Bloc
161165
return n.source.BlockByNumber(ctx, number)
162166
}
163167

168+
// BlocksByNumbers delegates to the DataSource.
169+
func (n *EmbeddedNode) BlocksByNumbers(ctx context.Context, numbers []*big.Int) ([]Block, error) {
170+
return n.source.BlocksByNumbers(ctx, numbers)
171+
}
172+
164173
// BlockReceipts delegates to the DataSource.
165174
func (n *EmbeddedNode) BlockReceipts(ctx context.Context, number *big.Int) ([]Receipt, error) {
166175
return n.source.BlockReceipts(ctx, number)

pkg/ethereum/execution/embedded_node_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,21 @@ func (m *MockDataSource) IsSynced() bool {
170170
return args.Bool(0)
171171
}
172172

173+
func (m *MockDataSource) BlocksByNumbers(ctx context.Context, numbers []*big.Int) ([]execution.Block, error) {
174+
args := m.Called(ctx, numbers)
175+
176+
if args.Get(0) == nil {
177+
return nil, args.Error(1)
178+
}
179+
180+
val, ok := args.Get(0).([]execution.Block)
181+
if !ok {
182+
return nil, args.Error(1)
183+
}
184+
185+
return val, args.Error(1)
186+
}
187+
173188
func TestEmbeddedNode_Creation(t *testing.T) {
174189
log := logrus.New()
175190
log.SetLevel(logrus.ErrorLevel)

pkg/ethereum/execution/geth/rpc.go

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@ package geth
55
import (
66
"context"
77
"encoding/hex"
8+
"encoding/json"
89
"fmt"
910
"math/big"
1011
"time"
1112

1213
"github.com/ethereum/go-ethereum/common"
14+
"github.com/ethereum/go-ethereum/core/types"
1315
"github.com/ethereum/go-ethereum/rpc"
1416

1517
pcommon "github.com/ethpandaops/execution-processor/pkg/common"
@@ -71,6 +73,116 @@ func (n *RPCNode) blockByNumber(ctx context.Context, blockNumber *big.Int) (exec
7173
return NewBlockAdapter(block), nil
7274
}
7375

76+
// blocksByNumbers fetches multiple blocks using batch RPC calls.
77+
// Returns blocks up to the first not-found (contiguous only).
78+
func (n *RPCNode) blocksByNumbers(ctx context.Context, numbers []*big.Int) ([]execution.Block, error) {
79+
if len(numbers) == 0 {
80+
return []execution.Block{}, nil
81+
}
82+
83+
start := time.Now()
84+
network := n.Metadata().ChainID()
85+
86+
// Prepare batch calls using json.RawMessage to handle null responses
87+
batch := make([]rpc.BatchElem, len(numbers))
88+
results := make([]*json.RawMessage, len(numbers))
89+
90+
for i, num := range numbers {
91+
results[i] = new(json.RawMessage)
92+
batch[i] = rpc.BatchElem{
93+
Method: "eth_getBlockByNumber",
94+
Args: []interface{}{toBlockNumArg(num), true}, // true = include transactions
95+
Result: results[i],
96+
}
97+
}
98+
99+
// Execute batch call
100+
err := n.rpcClient.BatchCallContext(ctx, batch)
101+
102+
duration := time.Since(start)
103+
104+
// Record batch RPC metrics
105+
status := statusSuccess
106+
if err != nil {
107+
status = statusError
108+
}
109+
110+
pcommon.RPCCallDuration.WithLabelValues(
111+
fmt.Sprintf("%d", network),
112+
n.config.Name,
113+
"eth_getBlockByNumber_batch",
114+
status,
115+
).Observe(duration.Seconds())
116+
117+
pcommon.RPCCallsTotal.WithLabelValues(
118+
fmt.Sprintf("%d", network),
119+
n.config.Name,
120+
"eth_getBlockByNumber_batch",
121+
status,
122+
).Inc()
123+
124+
if err != nil {
125+
return nil, fmt.Errorf("batch call failed: %w", err)
126+
}
127+
128+
// Process results, stopping at first not-found (contiguity requirement)
129+
blocks := make([]execution.Block, 0, len(numbers))
130+
131+
for i, elem := range batch {
132+
// Check for individual call error - stop at first error for contiguity
133+
// We intentionally don't return this error as we want partial results
134+
if elem.Error != nil {
135+
break
136+
}
137+
138+
// Check for nil/not-found block (null JSON response)
139+
if results[i] == nil || len(*results[i]) == 0 || string(*results[i]) == "null" {
140+
// Block not found - stop here for contiguity
141+
break
142+
}
143+
144+
// Parse the block from JSON
145+
block, parseErr := parseBlockFromJSON(*results[i])
146+
if parseErr != nil {
147+
// Parse error - stop here for contiguity
148+
break
149+
}
150+
151+
blocks = append(blocks, NewBlockAdapter(block))
152+
}
153+
154+
return blocks, nil //nolint:nilerr // Intentionally returning partial results for contiguity
155+
}
156+
157+
// toBlockNumArg converts a block number to the RPC argument format.
158+
func toBlockNumArg(number *big.Int) string {
159+
if number == nil {
160+
return "latest"
161+
}
162+
163+
return fmt.Sprintf("0x%x", number)
164+
}
165+
166+
// parseBlockFromJSON parses a types.Block from JSON-RPC response.
167+
func parseBlockFromJSON(raw json.RawMessage) (*types.Block, error) {
168+
// Use go-ethereum's internal header structure for unmarshaling
169+
var head *types.Header
170+
if err := json.Unmarshal(raw, &head); err != nil {
171+
return nil, fmt.Errorf("failed to unmarshal block header: %w", err)
172+
}
173+
174+
// Parse transactions separately
175+
var body struct {
176+
Transactions []*types.Transaction `json:"transactions"`
177+
}
178+
179+
if err := json.Unmarshal(raw, &body); err != nil {
180+
return nil, fmt.Errorf("failed to unmarshal block body: %w", err)
181+
}
182+
183+
return types.NewBlockWithHeader(head).WithBody(types.Body{Transactions: body.Transactions}), nil
184+
}
185+
74186
// getTraceParams returns VM trace parameters with configurable options.
75187
func getTraceParams(hash string, options execution.TraceOptions) []any {
76188
return []any{

pkg/ethereum/execution/geth/rpc_node.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,12 @@ func (n *RPCNode) BlockByNumber(ctx context.Context, number *big.Int) (execution
308308
return n.blockByNumber(ctx, number)
309309
}
310310

311+
// BlocksByNumbers returns blocks at the given numbers using batch RPC.
312+
// Returns blocks up to the first not-found (contiguous only).
313+
func (n *RPCNode) BlocksByNumbers(ctx context.Context, numbers []*big.Int) ([]execution.Block, error) {
314+
return n.blocksByNumbers(ctx, numbers)
315+
}
316+
311317
// BlockReceipts returns all receipts for the block at the given number.
312318
func (n *RPCNode) BlockReceipts(ctx context.Context, number *big.Int) ([]execution.Receipt, error) {
313319
return n.blockReceipts(ctx, number)

pkg/ethereum/execution/interface.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ type Node interface {
4141
// BlockByNumber returns the block at the given number.
4242
BlockByNumber(ctx context.Context, number *big.Int) (Block, error)
4343

44+
// BlocksByNumbers returns blocks at the given numbers using batch RPC.
45+
// Returns blocks up to the first not-found (contiguous only).
46+
// If a block is not found, the returned slice contains all blocks before that point.
47+
BlocksByNumbers(ctx context.Context, numbers []*big.Int) ([]Block, error)
48+
4449
// BlockReceipts returns all receipts for the block at the given number.
4550
BlockReceipts(ctx context.Context, number *big.Int) ([]Receipt, error)
4651

pkg/ethereum/pool_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,15 @@ func (m *MockNode) BlockByNumber(_ context.Context, number *big.Int) (execution.
9898
return &MockBlock{number: number}, nil
9999
}
100100

101+
func (m *MockNode) BlocksByNumbers(_ context.Context, numbers []*big.Int) ([]execution.Block, error) {
102+
blocks := make([]execution.Block, len(numbers))
103+
for i, num := range numbers {
104+
blocks[i] = &MockBlock{number: num}
105+
}
106+
107+
return blocks, nil
108+
}
109+
101110
func (m *MockNode) BlockReceipts(_ context.Context, _ *big.Int) ([]execution.Receipt, error) {
102111
return []execution.Receipt{}, nil
103112
}
@@ -806,3 +815,12 @@ func (ds *testDataSource) ClientType() string {
806815
func (ds *testDataSource) IsSynced() bool {
807816
return true
808817
}
818+
819+
func (ds *testDataSource) BlocksByNumbers(_ context.Context, numbers []*big.Int) ([]execution.Block, error) {
820+
blocks := make([]execution.Block, len(numbers))
821+
for i, num := range numbers {
822+
blocks[i] = &MockBlock{number: num}
823+
}
824+
825+
return blocks, nil
826+
}

pkg/processor/defaults.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,13 @@ const (
2525

2626
// DefaultLeaderRenewalInterval is the default renewal interval for leader election.
2727
DefaultLeaderRenewalInterval = 3 * time.Second
28+
29+
// DefaultBackpressureBackoffMin is the minimum backoff duration when backpressure is detected.
30+
DefaultBackpressureBackoffMin = 10 * time.Millisecond
31+
32+
// DefaultBackpressureBackoffMax is the maximum backoff duration when backpressure persists.
33+
DefaultBackpressureBackoffMax = 1 * time.Second
34+
35+
// DefaultBackpressureJitterFraction is the fraction of backoff to add as random jitter (0.25 = 25%).
36+
DefaultBackpressureJitterFraction = 0.25
2837
)

pkg/processor/manager.go

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"context"
2828
"fmt"
2929
"math/big"
30+
"math/rand/v2"
3031
"strings"
3132
"sync"
3233
"time"
@@ -101,6 +102,9 @@ type Manager struct {
101102
// Track queue high water marks
102103
queueHighWaterMarks map[string]int
103104
queueMetricsMutex sync.RWMutex
105+
106+
// Backpressure backoff tracking
107+
backpressureBackoff time.Duration
104108
}
105109

106110
func NewManager(log logrus.FieldLogger, config *Config, pool *ethereum.Pool, state *s.Manager, redis *r.Client, redisPrefix string) (*Manager, error) {
@@ -749,15 +753,50 @@ func (m *Manager) runBlockProcessing(ctx context.Context) {
749753
if m.config.Interval > 0 {
750754
// Fixed interval mode - always sleep the configured duration
751755
time.Sleep(m.config.Interval)
756+
m.backpressureBackoff = 0 // Reset backoff in fixed interval mode
752757
} else if !workDone {
753-
// Zero interval mode with no work - small backoff to prevent CPU spin
754-
time.Sleep(DefaultNoWorkBackoff)
758+
// Zero interval mode with no work - exponential backoff with jitter
759+
m.backpressureBackoff = m.calculateBackpressureBackoff(m.backpressureBackoff)
760+
time.Sleep(m.backpressureBackoff)
761+
} else {
762+
// Zero interval with work done - reset backoff and loop immediately
763+
m.backpressureBackoff = 0
755764
}
756-
// Zero interval with work done - loop immediately (no sleep)
757765
}
758766
}
759767
}
760768

769+
// calculateBackpressureBackoff calculates the next backoff duration using exponential backoff with jitter.
770+
func (m *Manager) calculateBackpressureBackoff(current time.Duration) time.Duration {
771+
var next time.Duration
772+
773+
if current == 0 {
774+
// Start with minimum backoff
775+
next = DefaultBackpressureBackoffMin
776+
} else {
777+
// Double the current backoff (exponential)
778+
next = current * 2
779+
}
780+
781+
// Cap at maximum backoff
782+
if next > DefaultBackpressureBackoffMax {
783+
next = DefaultBackpressureBackoffMax
784+
}
785+
786+
// Add jitter (up to 25% of the backoff duration)
787+
//nolint:gosec // G404: Weak RNG is fine for backoff jitter - no security requirement
788+
jitter := time.Duration(rand.Float64() * DefaultBackpressureJitterFraction * float64(next))
789+
next += jitter
790+
791+
m.log.WithFields(logrus.Fields{
792+
"previous_backoff": current,
793+
"next_backoff": next,
794+
"jitter": jitter,
795+
}).Debug("Calculated backpressure backoff")
796+
797+
return next
798+
}
799+
761800
// monitorQueues monitors queue health and archived items.
762801
func (m *Manager) monitorQueues(ctx context.Context) {
763802
// Get Redis options for Asynq Inspector

0 commit comments

Comments
 (0)