Skip to content

Commit fec76c6

Browse files
authored
feat(rowbuffer): add circuit breaker and concurrent flush limiting (#71)
- Add circuit breaker using sony/gobreaker to protect against cascading failures when ClickHouse is unavailable. Configurable via bufferCircuitBreakerMaxFailures and bufferCircuitBreakerTimeout. - Add semaphore to limit concurrent flush operations, configurable via bufferMaxConcurrentFlushes (default: 10). - Add metrics: inflight_flushes, circuit_open, circuit_rejections_total - Fix RPC batch fetching to automatically chunk requests exceeding the 100 block limit imposed by most RPC nodes. - Fix BlocksProcessed metric to correctly count all blocks in a batch by moving increment to processors. - Fix block not found diff calculation: use diff > 0 && diff <= 5 to correctly distinguish blocks that might appear soon from blocks that should already exist. - Add comprehensive tests for circuit breaker, concurrent flushes, semaphore limiting, and block diff calculation.
1 parent 5dbbf42 commit fec76c6

File tree

18 files changed

+896
-78
lines changed

18 files changed

+896
-78
lines changed

example_config.yaml

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -56,32 +56,41 @@ processors:
5656
addr: "localhost:9000"
5757
database: "default"
5858
table: "canonical_execution_transaction_structlog"
59-
# debug: false # Enable debug logging for ClickHouse queries
60-
# maxPendingBlockRange: 2 # Max distance between oldest incomplete and current block (default: 2)
61-
# bufferMaxRows: 100000 # Max rows before flush (default: 100000)
62-
# bufferFlushInterval: "1s" # Max time before flush (default: 1s)
59+
# debug: false # Enable debug logging for ClickHouse queries
60+
# maxPendingBlockRange: 2 # Max distance between oldest incomplete and current block (default: 2)
61+
# bufferMaxRows: 100000 # Max rows before flush (default: 100000)
62+
# bufferFlushInterval: "1s" # Max time before flush (default: 1s)
63+
# bufferMaxConcurrentFlushes: 10 # Max concurrent ClickHouse inserts (default: 10)
64+
# bufferCircuitBreakerMaxFailures: 5 # Consecutive failures to trip circuit (default: 5)
65+
# bufferCircuitBreakerTimeout: "60s" # Open state duration before half-open (default: 60s)
6366

6467
# Aggregated structlog processor (call frame level aggregation)
6568
transactionStructlogAgg:
6669
enabled: false
6770
addr: "localhost:9000"
6871
database: "default"
6972
table: "canonical_execution_transaction_structlog_agg"
70-
# debug: false # Enable debug logging for ClickHouse queries
71-
# maxPendingBlockRange: 2 # Max distance between oldest incomplete and current block (default: 2)
72-
# bufferMaxRows: 100000 # Max rows before flush (default: 100000)
73-
# bufferFlushInterval: "1s" # Max time before flush (default: 1s)
73+
# debug: false # Enable debug logging for ClickHouse queries
74+
# maxPendingBlockRange: 2 # Max distance between oldest incomplete and current block (default: 2)
75+
# bufferMaxRows: 100000 # Max rows before flush (default: 100000)
76+
# bufferFlushInterval: "1s" # Max time before flush (default: 1s)
77+
# bufferMaxConcurrentFlushes: 10 # Max concurrent ClickHouse inserts (default: 10)
78+
# bufferCircuitBreakerMaxFailures: 5 # Consecutive failures to trip circuit (default: 5)
79+
# bufferCircuitBreakerTimeout: "60s" # Open state duration before half-open (default: 60s)
7480

7581
# Simple transaction processor (lightweight - no debug traces)
7682
transactionSimple:
7783
enabled: false
7884
addr: "localhost:9000"
7985
database: "default"
8086
table: "execution_transaction"
81-
# debug: false # Enable debug logging for ClickHouse queries
82-
# maxPendingBlockRange: 2 # Max distance between oldest incomplete and current block (default: 2)
83-
# bufferMaxRows: 100000 # Max rows before flush (default: 100000)
84-
# bufferFlushInterval: "1s" # Max time before flush (default: 1s)
87+
# debug: false # Enable debug logging for ClickHouse queries
88+
# maxPendingBlockRange: 2 # Max distance between oldest incomplete and current block (default: 2)
89+
# bufferMaxRows: 100000 # Max rows before flush (default: 100000)
90+
# bufferFlushInterval: "1s" # Max time before flush (default: 1s)
91+
# bufferMaxConcurrentFlushes: 10 # Max concurrent ClickHouse inserts (default: 10)
92+
# bufferCircuitBreakerMaxFailures: 5 # Consecutive failures to trip circuit (default: 5)
93+
# bufferCircuitBreakerTimeout: "60s" # Open state duration before half-open (default: 60s)
8594

8695
# Application settings
8796
shutdownTimeout: 6m

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ require (
1515
github.com/prometheus/client_golang v1.20.5
1616
github.com/redis/go-redis/v9 v9.17.2
1717
github.com/sirupsen/logrus v1.9.3
18+
github.com/sony/gobreaker/v2 v2.4.0
1819
github.com/spf13/cobra v1.10.1
1920
github.com/stretchr/testify v1.11.1
2021
github.com/testcontainers/testcontainers-go v0.40.0

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,8 @@ github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp
299299
github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME=
300300
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
301301
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
302+
github.com/sony/gobreaker/v2 v2.4.0 h1:g2KJRW1Ubty3+ZOcSEUN7K+REQJdN6yo6XvaML+jptg=
303+
github.com/sony/gobreaker/v2 v2.4.0/go.mod h1:pTyFJgcZ3h2tdQVLZZruK2C0eoFL1fb/G83wK1ZQl+s=
302304
github.com/spf13/cast v1.7.0 h1:ntdiHjuueXFgm5nzDRdOS4yfT43P5Fnud6DH50rz/7w=
303305
github.com/spf13/cast v1.7.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
304306
github.com/spf13/cobra v1.10.1 h1:lJeBwCfmrnXthfAupyUTzJ/J4Nc1RsHC/mSRU2dll/s=

pkg/common/metrics.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,4 +233,20 @@ var (
233233
Name: "execution_processor_row_buffer_pending_tasks",
234234
Help: "Current number of tasks waiting for their rows to be flushed",
235235
}, []string{"network", "processor", "table"})
236+
237+
RowBufferInflightFlushes = promauto.NewGaugeVec(prometheus.GaugeOpts{
238+
Name: "execution_processor_row_buffer_inflight_flushes",
239+
Help: "Number of flush operations currently in progress",
240+
}, []string{"network", "processor", "table"})
241+
242+
// Circuit breaker metrics for row buffer.
243+
RowBufferCircuitOpen = promauto.NewGaugeVec(prometheus.GaugeOpts{
244+
Name: "execution_processor_row_buffer_circuit_open",
245+
Help: "Whether circuit breaker is open (1) or closed (0)",
246+
}, []string{"network", "processor", "table"})
247+
248+
RowBufferCircuitRejections = promauto.NewCounterVec(prometheus.CounterOpts{
249+
Name: "execution_processor_row_buffer_circuit_rejections_total",
250+
Help: "Total number of flushes rejected by circuit breaker",
251+
}, []string{"network", "processor", "table"})
236252
)

pkg/ethereum/execution/geth/rpc.go

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/ethereum/go-ethereum/common"
1414
"github.com/ethereum/go-ethereum/core/types"
1515
"github.com/ethereum/go-ethereum/rpc"
16+
"github.com/sirupsen/logrus"
1617

1718
pcommon "github.com/ethpandaops/execution-processor/pkg/common"
1819
"github.com/ethpandaops/execution-processor/pkg/ethereum/execution"
@@ -73,13 +74,56 @@ func (n *RPCNode) blockByNumber(ctx context.Context, blockNumber *big.Int) (exec
7374
return NewBlockAdapter(block), nil
7475
}
7576

77+
// maxBatchSize is the maximum number of blocks to request in a single batch RPC call.
78+
// Most RPC nodes have a default limit of 100 (e.g., Erigon's --rpc.batch.limit).
79+
const maxBatchSize = 100
80+
7681
// blocksByNumbers fetches multiple blocks using batch RPC calls.
7782
// Returns blocks up to the first not-found (contiguous only).
83+
// Large requests are automatically chunked to respect RPC batch limits.
7884
func (n *RPCNode) blocksByNumbers(ctx context.Context, numbers []*big.Int) ([]execution.Block, error) {
7985
if len(numbers) == 0 {
8086
return []execution.Block{}, nil
8187
}
8288

89+
// If request exceeds batch limit, chunk it
90+
if len(numbers) > maxBatchSize {
91+
return n.blocksByNumbersChunked(ctx, numbers)
92+
}
93+
94+
return n.blocksByNumbersBatch(ctx, numbers)
95+
}
96+
97+
// blocksByNumbersChunked fetches blocks in chunks to respect RPC batch limits.
98+
func (n *RPCNode) blocksByNumbersChunked(ctx context.Context, numbers []*big.Int) ([]execution.Block, error) {
99+
allBlocks := make([]execution.Block, 0, len(numbers))
100+
101+
for i := 0; i < len(numbers); i += maxBatchSize {
102+
end := i + maxBatchSize
103+
if end > len(numbers) {
104+
end = len(numbers)
105+
}
106+
107+
chunk := numbers[i:end]
108+
109+
blocks, err := n.blocksByNumbersBatch(ctx, chunk)
110+
if err != nil {
111+
return allBlocks, err
112+
}
113+
114+
allBlocks = append(allBlocks, blocks...)
115+
116+
// If we got fewer blocks than requested, a block was not found - stop for contiguity
117+
if len(blocks) < len(chunk) {
118+
break
119+
}
120+
}
121+
122+
return allBlocks, nil
123+
}
124+
125+
// blocksByNumbersBatch fetches a single batch of blocks (must be <= maxBatchSize).
126+
func (n *RPCNode) blocksByNumbersBatch(ctx context.Context, numbers []*big.Int) ([]execution.Block, error) {
83127
start := time.Now()
84128
network := n.Metadata().ChainID()
85129

@@ -132,26 +176,38 @@ func (n *RPCNode) blocksByNumbers(ctx context.Context, numbers []*big.Int) ([]ex
132176
// Check for individual call error - stop at first error for contiguity
133177
// We intentionally don't return this error as we want partial results
134178
if elem.Error != nil {
179+
n.log.WithError(elem.Error).WithField("block_index", i).Debug("Batch element error, stopping")
180+
135181
break
136182
}
137183

138184
// Check for nil/not-found block (null JSON response)
139185
if results[i] == nil || len(*results[i]) == 0 || string(*results[i]) == "null" {
140186
// Block not found - stop here for contiguity
187+
n.log.WithFields(logrus.Fields{
188+
"block_index": i,
189+
"block_number": numbers[i].String(),
190+
}).Debug("Block not found in batch, stopping")
191+
141192
break
142193
}
143194

144195
// Parse the block from JSON
145196
block, parseErr := parseBlockFromJSON(*results[i])
146197
if parseErr != nil {
147198
// Parse error - stop here for contiguity
199+
n.log.WithError(parseErr).WithFields(logrus.Fields{
200+
"block_index": i,
201+
"block_number": numbers[i].String(),
202+
}).Warn("Failed to parse block from JSON, stopping batch")
203+
148204
break
149205
}
150206

151207
blocks = append(blocks, NewBlockAdapter(block))
152208
}
153209

154-
return blocks, nil //nolint:nilerr // Intentionally returning partial results for contiguity
210+
return blocks, nil
155211
}
156212

157213
// toBlockNumArg converts a block number to the RPC argument format.

pkg/processor/manager.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -564,16 +564,15 @@ func (m *Manager) processBlocks(ctx context.Context) bool {
564564
} else {
565565
workDone = true
566566

567-
// Track processing duration
567+
// Track processing duration (block count is tracked by the processor itself)
568568
duration := time.Since(startTime)
569569

570570
common.BlockProcessingDuration.WithLabelValues(m.network.Name, name).Observe(duration.Seconds())
571-
common.BlocksProcessed.WithLabelValues(m.network.Name, name).Inc()
572571

573572
m.log.WithFields(logrus.Fields{
574573
"processor": name,
575574
"duration": duration,
576-
}).Debug("Successfully processed block")
575+
}).Debug("Successfully processed blocks")
577576
}
578577

579578
// Update head distance metric (regardless of success/failure to track current distance)

pkg/processor/transaction/simple/block_processing.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,10 +137,24 @@ func (p *Processor) ProcessNextBlock(ctx context.Context) error {
137137
}).Debug("Fetched batch of blocks")
138138

139139
// Process each block, stopping on first error
140+
processedCount := 0
141+
140142
for _, block := range blocks {
141143
if processErr := p.processBlock(ctx, block); processErr != nil {
144+
// Record blocks that were successfully processed before the error
145+
if processedCount > 0 {
146+
common.BlocksProcessed.WithLabelValues(p.network.Name, p.Name()).Add(float64(processedCount))
147+
}
148+
142149
return processErr
143150
}
151+
152+
processedCount++
153+
}
154+
155+
// Record all successfully processed blocks
156+
if processedCount > 0 {
157+
common.BlocksProcessed.WithLabelValues(p.network.Name, p.Name()).Add(float64(processedCount))
144158
}
145159

146160
return nil

pkg/processor/transaction/simple/config.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,11 @@ import (
99

1010
// Default buffer configuration values.
1111
const (
12-
DefaultBufferMaxRows = 100000
13-
DefaultBufferFlushInterval = time.Second
12+
DefaultBufferMaxRows = 100000
13+
DefaultBufferFlushInterval = time.Second
14+
DefaultBufferMaxConcurrentFlushes = 10
15+
DefaultBufferCircuitBreakerMaxFailures = 5
16+
DefaultBufferCircuitBreakerTimeout = 60 * time.Second
1417
)
1518

1619
// Config holds configuration for the simple transaction processor.
@@ -20,8 +23,11 @@ type Config struct {
2023
Table string `yaml:"table"`
2124

2225
// Row buffer settings for batched ClickHouse inserts
23-
BufferMaxRows int `yaml:"bufferMaxRows"` // Max rows before flush. Default: 100000
24-
BufferFlushInterval time.Duration `yaml:"bufferFlushInterval"` // Max time before flush. Default: 1s
26+
BufferMaxRows int `yaml:"bufferMaxRows"` // Max rows before flush. Default: 100000
27+
BufferFlushInterval time.Duration `yaml:"bufferFlushInterval"` // Max time before flush. Default: 1s
28+
BufferMaxConcurrentFlushes int `yaml:"bufferMaxConcurrentFlushes"` // Max parallel flush ops. Default: 10
29+
BufferCircuitBreakerMaxFailures uint32 `yaml:"bufferCircuitBreakerMaxFailures"` // Consecutive failures to trip circuit. Default: 5
30+
BufferCircuitBreakerTimeout time.Duration `yaml:"bufferCircuitBreakerTimeout"` // Open state duration before half-open. Default: 60s
2531

2632
// Block completion tracking
2733
MaxPendingBlockRange int `yaml:"maxPendingBlockRange"` // Max distance between oldest incomplete and current block. Default: 2

pkg/processor/transaction/simple/processor.go

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,18 @@ func New(deps *Dependencies, config *Config) (*Processor, error) {
8585
config.BufferFlushInterval = DefaultBufferFlushInterval
8686
}
8787

88+
if config.BufferMaxConcurrentFlushes <= 0 {
89+
config.BufferMaxConcurrentFlushes = DefaultBufferMaxConcurrentFlushes
90+
}
91+
92+
if config.BufferCircuitBreakerMaxFailures <= 0 {
93+
config.BufferCircuitBreakerMaxFailures = DefaultBufferCircuitBreakerMaxFailures
94+
}
95+
96+
if config.BufferCircuitBreakerTimeout <= 0 {
97+
config.BufferCircuitBreakerTimeout = DefaultBufferCircuitBreakerTimeout
98+
}
99+
88100
log := deps.Log.WithField("processor", ProcessorName)
89101

90102
// Create the limiter for shared functionality
@@ -129,21 +141,25 @@ func New(deps *Dependencies, config *Config) (*Processor, error) {
129141
// Create the row buffer with the flush function
130142
processor.rowBuffer = rowbuffer.New(
131143
rowbuffer.Config{
132-
MaxRows: config.BufferMaxRows,
133-
FlushInterval: config.BufferFlushInterval,
134-
Network: deps.Network.Name,
135-
Processor: ProcessorName,
136-
Table: config.Table,
144+
MaxRows: config.BufferMaxRows,
145+
FlushInterval: config.BufferFlushInterval,
146+
MaxConcurrentFlushes: config.BufferMaxConcurrentFlushes,
147+
CircuitBreakerMaxFailures: config.BufferCircuitBreakerMaxFailures,
148+
CircuitBreakerTimeout: config.BufferCircuitBreakerTimeout,
149+
Network: deps.Network.Name,
150+
Processor: ProcessorName,
151+
Table: config.Table,
137152
},
138153
processor.flushRows,
139154
log,
140155
)
141156

142157
processor.log.WithFields(logrus.Fields{
143-
"network": processor.network.Name,
144-
"max_pending_block_range": config.MaxPendingBlockRange,
145-
"buffer_max_rows": config.BufferMaxRows,
146-
"buffer_flush_interval": config.BufferFlushInterval,
158+
"network": processor.network.Name,
159+
"max_pending_block_range": config.MaxPendingBlockRange,
160+
"buffer_max_rows": config.BufferMaxRows,
161+
"buffer_flush_interval": config.BufferFlushInterval,
162+
"buffer_max_concurrent_flushes": config.BufferMaxConcurrentFlushes,
147163
}).Info("Simple transaction processor initialized")
148164

149165
return processor, nil

pkg/processor/transaction/structlog/block_processing.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,10 +140,24 @@ func (p *Processor) ProcessNextBlock(ctx context.Context) error {
140140
}).Debug("Fetched batch of blocks")
141141

142142
// Process each block, stopping on first error
143+
processedCount := 0
144+
143145
for _, block := range blocks {
144146
if processErr := p.processBlock(ctx, block); processErr != nil {
147+
// Record blocks that were successfully processed before the error
148+
if processedCount > 0 {
149+
common.BlocksProcessed.WithLabelValues(p.network.Name, p.Name()).Add(float64(processedCount))
150+
}
151+
145152
return processErr
146153
}
154+
155+
processedCount++
156+
}
157+
158+
// Record all successfully processed blocks
159+
if processedCount > 0 {
160+
common.BlocksProcessed.WithLabelValues(p.network.Name, p.Name()).Add(float64(processedCount))
147161
}
148162

149163
return nil
@@ -156,7 +170,7 @@ func (p *Processor) handleBlockNotFound(ctx context.Context, node execution.Node
156170
chainTip := new(big.Int).SetUint64(*latestBlock)
157171
diff := new(big.Int).Sub(nextBlock, chainTip).Int64()
158172

159-
if diff <= 5 { // Within 5 blocks of chain tip
173+
if diff > 0 && diff <= 5 { // 1-5 blocks ahead of chain tip, might appear soon
160174
p.log.WithFields(logrus.Fields{
161175
"network": p.network.Name,
162176
"block_number": nextBlock,

0 commit comments

Comments
 (0)