Skip to content

Commit 3413d52

Browse files
committed
feat(processor): add gap detection for block processing
Add ClickHouse gap detection to find missing blocks between the oldest processed block and the current block. This catches gaps that fall outside the maxPendingBlockRange window. Changes: - Add GetIncompleteBlocksInRange() to state manager for range queries - Add GapStateProvider interface extending StateProvider - Add GetGaps() method to limiter for full-range gap detection - Add GapDetectionConfig with configurable scan interval, batch size, and lookback range - Add periodic gap scanner goroutine to processor - Exclude maxPendingBlockRange window from gap scan to avoid double work The gap scanner runs periodically (default: 5m) and reprocesses any incomplete blocks found outside the limiter's normal detection window.
1 parent 623b257 commit 3413d52

5 files changed

Lines changed: 538 additions & 0 deletions

File tree

pkg/processor/tracker/limiter.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package tracker
33
import (
44
"context"
55
"fmt"
6+
"math/big"
67

78
"github.com/ethpandaops/execution-processor/pkg/common"
89
"github.com/sirupsen/logrus"
@@ -15,6 +16,13 @@ type StateProvider interface {
1516
MarkBlockComplete(ctx context.Context, blockNumber uint64, network, processor string) error
1617
}
1718

19+
// GapStateProvider extends StateProvider with gap detection capabilities.
20+
type GapStateProvider interface {
21+
StateProvider
22+
GetIncompleteBlocksInRange(ctx context.Context, network, processor string, minBlock, maxBlock uint64, limit int) ([]uint64, error)
23+
GetMinMaxStoredBlocks(ctx context.Context, network, processor string) (*big.Int, *big.Int, error)
24+
}
25+
1826
// LimiterConfig holds configuration for the Limiter.
1927
type LimiterConfig struct {
2028
MaxPendingBlockRange int
@@ -207,3 +215,76 @@ func (l *Limiter) ValidateBatchWithinLeash(ctx context.Context, startBlock uint6
207215

208216
return nil
209217
}
218+
219+
// GetGaps returns incomplete blocks outside the maxPendingBlockRange window.
220+
// If lookbackRange is 0, scans from the oldest stored block.
221+
// This performs a full-range scan for gap detection, excluding the recent window
222+
// that is already handled by IsBlockedByIncompleteBlocks.
223+
func (l *Limiter) GetGaps(ctx context.Context, currentBlock uint64, lookbackRange uint64, limit int) ([]uint64, error) {
224+
gapProvider, ok := l.stateProvider.(GapStateProvider)
225+
if !ok {
226+
return nil, fmt.Errorf("state provider does not support gap detection")
227+
}
228+
229+
var minBlock uint64
230+
231+
if lookbackRange == 0 {
232+
// Unlimited: scan from oldest stored block
233+
minStored, _, err := gapProvider.GetMinMaxStoredBlocks(ctx, l.network, l.processor)
234+
if err != nil {
235+
return nil, fmt.Errorf("failed to get min stored block: %w", err)
236+
}
237+
238+
if minStored == nil {
239+
// No blocks stored yet
240+
return nil, nil
241+
}
242+
243+
minBlock = minStored.Uint64()
244+
} else {
245+
// Limited: scan from currentBlock - lookbackRange
246+
if currentBlock > lookbackRange {
247+
minBlock = currentBlock - lookbackRange
248+
}
249+
}
250+
251+
// Calculate maxBlock to exclude the window handled by IsBlockedByIncompleteBlocks.
252+
// The limiter already handles blocks within [currentBlock - maxPendingBlockRange, currentBlock],
253+
// so we only scan up to (currentBlock - maxPendingBlockRange - 1) to avoid double work.
254+
maxBlock := currentBlock
255+
256+
if l.config.MaxPendingBlockRange > 0 {
257+
exclusionWindow := uint64(l.config.MaxPendingBlockRange) //nolint:gosec // validated in config
258+
259+
if currentBlock > exclusionWindow {
260+
maxBlock = currentBlock - exclusionWindow - 1
261+
} else {
262+
// Current block is within the exclusion window, nothing to scan
263+
return nil, nil
264+
}
265+
}
266+
267+
// Ensure minBlock doesn't exceed maxBlock
268+
if minBlock > maxBlock {
269+
return nil, nil
270+
}
271+
272+
gaps, err := gapProvider.GetIncompleteBlocksInRange(
273+
ctx, l.network, l.processor,
274+
minBlock, maxBlock, limit,
275+
)
276+
if err != nil {
277+
return nil, fmt.Errorf("failed to get incomplete blocks in range: %w", err)
278+
}
279+
280+
if len(gaps) > 0 {
281+
l.log.WithFields(logrus.Fields{
282+
"min_block": minBlock,
283+
"max_block": maxBlock,
284+
"gap_count": len(gaps),
285+
"first_gap": gaps[0],
286+
}).Debug("Found gaps in block range")
287+
}
288+
289+
return gaps, nil
290+
}
Lines changed: 292 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,292 @@
1+
package tracker
2+
3+
import (
4+
"context"
5+
"math/big"
6+
"testing"
7+
8+
"github.com/sirupsen/logrus"
9+
"github.com/stretchr/testify/assert"
10+
"github.com/stretchr/testify/require"
11+
)
12+
13+
// mockGapStateProvider implements GapStateProvider for testing.
14+
type mockGapStateProvider struct {
15+
mockStateProvider
16+
incompleteBlocksInRange []uint64
17+
minStoredBlock *big.Int
18+
maxStoredBlock *big.Int
19+
getIncompleteErr error
20+
getMinMaxErr error
21+
}
22+
23+
func (m *mockGapStateProvider) GetIncompleteBlocksInRange(
24+
_ context.Context, _, _ string, _, _ uint64, _ int,
25+
) ([]uint64, error) {
26+
if m.getIncompleteErr != nil {
27+
return nil, m.getIncompleteErr
28+
}
29+
30+
return m.incompleteBlocksInRange, nil
31+
}
32+
33+
func (m *mockGapStateProvider) GetMinMaxStoredBlocks(
34+
_ context.Context, _, _ string,
35+
) (*big.Int, *big.Int, error) {
36+
if m.getMinMaxErr != nil {
37+
return nil, nil, m.getMinMaxErr
38+
}
39+
40+
return m.minStoredBlock, m.maxStoredBlock, nil
41+
}
42+
43+
func TestGetGaps_FindsMissingBlocks(t *testing.T) {
44+
// Setup: blocks 5,6,7,9,10...100 are complete, block 8 is incomplete
45+
// With maxPendingBlockRange=2 and currentBlock=100, gap scanner looks at blocks up to 97
46+
// (excluding the window 98-100 that the limiter handles)
47+
mockProvider := &mockGapStateProvider{
48+
minStoredBlock: big.NewInt(5),
49+
maxStoredBlock: big.NewInt(100),
50+
incompleteBlocksInRange: []uint64{8},
51+
}
52+
53+
limiter := NewLimiter(&LimiterDeps{
54+
Log: logrus.NewEntry(logrus.New()),
55+
StateProvider: mockProvider,
56+
Network: "mainnet",
57+
Processor: "simple",
58+
}, LimiterConfig{MaxPendingBlockRange: 2})
59+
60+
ctx := context.Background()
61+
currentBlock := uint64(100)
62+
lookbackRange := uint64(0) // Unlimited
63+
64+
// Gap scanner searches [5, 97] (excludes 98-100 handled by limiter)
65+
gaps, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100)
66+
67+
require.NoError(t, err)
68+
assert.Equal(t, []uint64{8}, gaps)
69+
}
70+
71+
func TestGetGaps_RespectsLookbackRange(t *testing.T) {
72+
mockProvider := &mockGapStateProvider{
73+
// GetMinMaxStoredBlocks should NOT be called when lookbackRange is set
74+
incompleteBlocksInRange: []uint64{75, 80},
75+
}
76+
77+
limiter := NewLimiter(&LimiterDeps{
78+
Log: logrus.NewEntry(logrus.New()),
79+
StateProvider: mockProvider,
80+
Network: "mainnet",
81+
Processor: "simple",
82+
}, LimiterConfig{MaxPendingBlockRange: 2})
83+
84+
ctx := context.Background()
85+
currentBlock := uint64(100)
86+
lookbackRange := uint64(50) // Only look back 50 blocks
87+
88+
// Should query from block 50 (100 - 50) to 100
89+
gaps, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100)
90+
91+
require.NoError(t, err)
92+
assert.Equal(t, []uint64{75, 80}, gaps)
93+
}
94+
95+
func TestGetGaps_NoGaps(t *testing.T) {
96+
mockProvider := &mockGapStateProvider{
97+
minStoredBlock: big.NewInt(1),
98+
maxStoredBlock: big.NewInt(100),
99+
incompleteBlocksInRange: []uint64{},
100+
}
101+
102+
limiter := NewLimiter(&LimiterDeps{
103+
Log: logrus.NewEntry(logrus.New()),
104+
StateProvider: mockProvider,
105+
Network: "mainnet",
106+
Processor: "simple",
107+
}, LimiterConfig{MaxPendingBlockRange: 2})
108+
109+
ctx := context.Background()
110+
currentBlock := uint64(100)
111+
lookbackRange := uint64(0)
112+
113+
gaps, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100)
114+
115+
require.NoError(t, err)
116+
assert.Empty(t, gaps)
117+
}
118+
119+
func TestGetGaps_DoesNotLookBeforeOldestStoredBlock(t *testing.T) {
120+
// Ensure we don't query for blocks before the oldest stored block
121+
mockProvider := &mockGapStateProvider{
122+
// Oldest stored is 50, so we should only query from 50 onwards
123+
minStoredBlock: big.NewInt(50),
124+
maxStoredBlock: big.NewInt(100),
125+
incompleteBlocksInRange: []uint64{},
126+
}
127+
128+
limiter := NewLimiter(&LimiterDeps{
129+
Log: logrus.NewEntry(logrus.New()),
130+
StateProvider: mockProvider,
131+
Network: "mainnet",
132+
Processor: "simple",
133+
}, LimiterConfig{MaxPendingBlockRange: 2})
134+
135+
ctx := context.Background()
136+
currentBlock := uint64(100)
137+
lookbackRange := uint64(0) // Unlimited
138+
139+
gaps, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100)
140+
141+
require.NoError(t, err)
142+
assert.Empty(t, gaps)
143+
}
144+
145+
func TestGetGaps_NoBlocksStored(t *testing.T) {
146+
mockProvider := &mockGapStateProvider{
147+
minStoredBlock: nil, // No blocks stored
148+
maxStoredBlock: nil,
149+
}
150+
151+
limiter := NewLimiter(&LimiterDeps{
152+
Log: logrus.NewEntry(logrus.New()),
153+
StateProvider: mockProvider,
154+
Network: "mainnet",
155+
Processor: "simple",
156+
}, LimiterConfig{MaxPendingBlockRange: 2})
157+
158+
ctx := context.Background()
159+
currentBlock := uint64(100)
160+
lookbackRange := uint64(0)
161+
162+
gaps, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100)
163+
164+
require.NoError(t, err)
165+
assert.Nil(t, gaps)
166+
}
167+
168+
func TestGetGaps_StateProviderDoesNotSupportGapDetection(t *testing.T) {
169+
// Use the basic mockStateProvider which doesn't implement GapStateProvider
170+
mockProvider := &mockStateProvider{}
171+
172+
limiter := NewLimiter(&LimiterDeps{
173+
Log: logrus.NewEntry(logrus.New()),
174+
StateProvider: mockProvider,
175+
Network: "mainnet",
176+
Processor: "simple",
177+
}, LimiterConfig{MaxPendingBlockRange: 2})
178+
179+
ctx := context.Background()
180+
currentBlock := uint64(100)
181+
lookbackRange := uint64(0)
182+
183+
gaps, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100)
184+
185+
require.Error(t, err)
186+
assert.Contains(t, err.Error(), "state provider does not support gap detection")
187+
assert.Nil(t, gaps)
188+
}
189+
190+
func TestGetGaps_MultipleGaps(t *testing.T) {
191+
mockProvider := &mockGapStateProvider{
192+
minStoredBlock: big.NewInt(1),
193+
maxStoredBlock: big.NewInt(100),
194+
incompleteBlocksInRange: []uint64{5, 10, 15, 20, 25},
195+
}
196+
197+
limiter := NewLimiter(&LimiterDeps{
198+
Log: logrus.NewEntry(logrus.New()),
199+
StateProvider: mockProvider,
200+
Network: "mainnet",
201+
Processor: "simple",
202+
}, LimiterConfig{MaxPendingBlockRange: 2})
203+
204+
ctx := context.Background()
205+
currentBlock := uint64(100)
206+
lookbackRange := uint64(0)
207+
208+
gaps, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100)
209+
210+
require.NoError(t, err)
211+
assert.Equal(t, []uint64{5, 10, 15, 20, 25}, gaps)
212+
}
213+
214+
func TestGetGaps_LookbackRangeGreaterThanCurrentBlock(t *testing.T) {
215+
// When lookbackRange is greater than currentBlock, minBlock should be 0
216+
// With maxPendingBlockRange=2 and currentBlock=10, gap scanner looks at blocks up to 7
217+
mockProvider := &mockGapStateProvider{
218+
incompleteBlocksInRange: []uint64{3},
219+
}
220+
221+
limiter := NewLimiter(&LimiterDeps{
222+
Log: logrus.NewEntry(logrus.New()),
223+
StateProvider: mockProvider,
224+
Network: "mainnet",
225+
Processor: "simple",
226+
}, LimiterConfig{MaxPendingBlockRange: 2})
227+
228+
ctx := context.Background()
229+
currentBlock := uint64(10)
230+
lookbackRange := uint64(100) // Greater than currentBlock
231+
232+
// Gap scanner searches [0, 7] (excludes 8-10 handled by limiter)
233+
gaps, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100)
234+
235+
require.NoError(t, err)
236+
assert.Equal(t, []uint64{3}, gaps)
237+
}
238+
239+
func TestGetGaps_ExcludesMaxPendingBlockRangeWindow(t *testing.T) {
240+
// Verify that gaps within maxPendingBlockRange are NOT returned
241+
// because they're already handled by IsBlockedByIncompleteBlocks
242+
mockProvider := &mockGapStateProvider{
243+
minStoredBlock: big.NewInt(1),
244+
maxStoredBlock: big.NewInt(100),
245+
incompleteBlocksInRange: []uint64{50}, // Gap at block 50, outside the exclusion window
246+
}
247+
248+
limiter := NewLimiter(&LimiterDeps{
249+
Log: logrus.NewEntry(logrus.New()),
250+
StateProvider: mockProvider,
251+
Network: "mainnet",
252+
Processor: "simple",
253+
}, LimiterConfig{MaxPendingBlockRange: 5})
254+
255+
ctx := context.Background()
256+
currentBlock := uint64(100)
257+
lookbackRange := uint64(0)
258+
259+
// With maxPendingBlockRange=5, gap scanner searches [1, 94] (excludes 95-100)
260+
// Block 50 should be found since it's outside the exclusion window
261+
gaps, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100)
262+
263+
require.NoError(t, err)
264+
assert.Equal(t, []uint64{50}, gaps)
265+
}
266+
267+
func TestGetGaps_CurrentBlockWithinExclusionWindow(t *testing.T) {
268+
// When currentBlock is smaller than or equal to maxPendingBlockRange,
269+
// there's nothing to scan outside the exclusion window
270+
mockProvider := &mockGapStateProvider{
271+
minStoredBlock: big.NewInt(1),
272+
maxStoredBlock: big.NewInt(5),
273+
incompleteBlocksInRange: []uint64{2},
274+
}
275+
276+
limiter := NewLimiter(&LimiterDeps{
277+
Log: logrus.NewEntry(logrus.New()),
278+
StateProvider: mockProvider,
279+
Network: "mainnet",
280+
Processor: "simple",
281+
}, LimiterConfig{MaxPendingBlockRange: 10}) // Larger than currentBlock
282+
283+
ctx := context.Background()
284+
currentBlock := uint64(5)
285+
lookbackRange := uint64(0)
286+
287+
// currentBlock (5) <= maxPendingBlockRange (10), so nothing to scan
288+
gaps, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100)
289+
290+
require.NoError(t, err)
291+
assert.Nil(t, gaps)
292+
}

0 commit comments

Comments
 (0)