Skip to content

Commit e8fa8b5

Browse files
authored
reorg handles transaction/logs count (#310)
* reorg handles transaction/logs count
1 parent 19296e1 commit e8fa8b5

File tree

3 files changed

+244
-90
lines changed

3 files changed

+244
-90
lines changed

internal/committer/reorg.go

Lines changed: 65 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,20 +33,20 @@ func RunReorgValidator() {
3333
continue
3434
}
3535

36-
if endBlock == lastBlockCheck || endBlock-startBlock < 100 {
37-
log.Debug().Msg("Not enough new blocks to check. Sleeping for 1 minute.")
36+
if endBlock-startBlock < 100 {
37+
log.Debug().Int64("last_block_check", lastBlockCheck).Int64("start_block", startBlock).Int64("end_block", endBlock).Msg("Not enough new blocks to check. Sleeping for 1 minute.")
3838
time.Sleep(1 * time.Minute)
3939
continue
4040
}
4141

4242
// Detect reorgs and handle them
43-
err = detectAndHandleReorgs(startBlock, endBlock)
43+
lastValidBlock, err := detectAndHandleReorgs(startBlock, endBlock)
4444
if err != nil {
4545
log.Error().Err(err).Msg("Failed to detect and handle reorgs")
4646
time.Sleep(2 * time.Second)
4747
continue
4848
}
49-
lastBlockCheck = endBlock
49+
lastBlockCheck = lastValidBlock
5050
}
5151
}
5252

@@ -91,21 +91,21 @@ func getLastValidBlock() (int64, error) {
9191
return lastValidBlock, nil
9292
}
9393

94-
func detectAndHandleReorgs(startBlock int64, endBlock int64) error {
94+
func detectAndHandleReorgs(startBlock int64, endBlock int64) (int64, error) {
9595
log.Debug().Msgf("Checking for reorgs from block %d to %d", startBlock, endBlock)
9696

9797
// Fetch block headers for the range
9898
blockHeaders, err := libs.GetBlockHeadersForReorgCheck(libs.ChainId.Uint64(), uint64(startBlock), uint64(endBlock))
9999
if err != nil {
100-
return fmt.Errorf("detectAndHandleReorgs: failed to get block headers: %w", err)
100+
return 0, fmt.Errorf("detectAndHandleReorgs: failed to get block headers: %w", err)
101101
}
102102

103103
if len(blockHeaders) == 0 {
104104
log.Debug().Msg("detectAndHandleReorgs: No block headers found in range")
105-
return nil
105+
return 0, nil
106106
}
107107

108-
// finding the reorg start and end block
108+
// 1) Block verification: find reorg range from header continuity (existing behavior)
109109
reorgStartBlock := int64(-1)
110110
reorgEndBlock := int64(-1)
111111
for i := 1; i < len(blockHeaders); i++ {
@@ -131,20 +131,71 @@ func detectAndHandleReorgs(startBlock int64, endBlock int64) error {
131131
}
132132

133133
// set end to the last block if not set
134+
lastHeaderBlock := blockHeaders[len(blockHeaders)-1].Number.Int64()
134135
if reorgEndBlock == -1 {
135-
reorgEndBlock = blockHeaders[len(blockHeaders)-1].Number.Int64()
136+
// No header-based end detected; default to the last header for last-valid-block tracking.
137+
reorgEndBlock = lastHeaderBlock
138+
}
139+
140+
// 2) Transaction verification: check for mismatches between block.transaction_count
141+
// and the number of transactions stored per block in ClickHouse.
142+
txStart, txEnd, err := libs.GetTransactionMismatchRangeFromClickHouseV2(libs.ChainId.Uint64(), uint64(startBlock), uint64(endBlock))
143+
if err != nil {
144+
return 0, fmt.Errorf("detectAndHandleReorgs: transaction verification failed: %w", err)
136145
}
137146

147+
// 3) Logs verification: check for mismatches between logsBloom and logs stored in ClickHouse.
148+
logsStart, logsEnd, err := libs.GetLogsMismatchRangeFromClickHouseV2(libs.ChainId.Uint64(), uint64(startBlock), uint64(endBlock))
149+
if err != nil {
150+
return 0, fmt.Errorf("detectAndHandleReorgs: logs verification failed: %w", err)
151+
}
152+
153+
// 4) Combine all ranges:
154+
// - If all three ranges (blocks, tx, logs) are empty, then there is no reorg.
155+
// - Otherwise, take min(start) and max(end) across all non-empty ranges as the final reorg range.
156+
finalStart := int64(-1)
157+
finalEnd := int64(-1)
158+
159+
// block headers range
138160
if reorgStartBlock > -1 {
139-
if err := handleReorgForRange(uint64(reorgStartBlock), uint64(reorgEndBlock)); err != nil {
140-
return err
161+
finalStart = reorgStartBlock
162+
finalEnd = reorgEndBlock
163+
}
164+
165+
// transactions range
166+
if txStart > -1 {
167+
if finalStart == -1 || txStart < finalStart {
168+
finalStart = txStart
169+
}
170+
if finalEnd == -1 || txEnd > finalEnd {
171+
finalEnd = txEnd
141172
}
142173
}
143174

144-
// update last valid block. if there was no reorg, this will update to the last block
145-
libs.SetReorgLastValidBlock(libs.ChainIdStr, reorgEndBlock)
175+
// logs range
176+
if logsStart > -1 {
177+
if finalStart == -1 || logsStart < finalStart {
178+
finalStart = logsStart
179+
}
180+
if finalEnd == -1 || logsEnd > finalEnd {
181+
finalEnd = logsEnd
182+
}
183+
}
146184

147-
return nil
185+
lastValidBlock := lastHeaderBlock
186+
if finalStart > -1 {
187+
// We found at least one inconsistent range; reorg from min(start) to max(end).
188+
if err := handleReorgForRange(uint64(finalStart), uint64(finalEnd)); err != nil {
189+
return 0, err
190+
}
191+
lastValidBlock = finalEnd
192+
}
193+
err = libs.SetReorgLastValidBlock(libs.ChainIdStr, lastValidBlock)
194+
if err != nil {
195+
return 0, fmt.Errorf("detectAndHandleReorgs: failed to set last valid block: %w", err)
196+
}
197+
198+
return lastValidBlock, nil
148199
}
149200

150201
func handleReorgForRange(startBlock uint64, endBlock uint64) error {

internal/libs/clickhouse.go

Lines changed: 167 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,17 @@ var defaultTraceFields = []string{
4444
"reward_type", "refund_address",
4545
}
4646

47+
type blockTxAggregate struct {
48+
BlockNumber *big.Int `ch:"block_number"`
49+
TxCount uint64 `ch:"tx_count"`
50+
}
51+
52+
type blockLogAggregate struct {
53+
BlockNumber *big.Int `ch:"block_number"`
54+
LogCount uint64 `ch:"log_count"`
55+
MaxLogIndex uint64 `ch:"max_log_index"`
56+
}
57+
4758
// only use this for backfill or getting old data.
4859
var ClickhouseConnV1 clickhouse.Conn
4960

@@ -229,20 +240,6 @@ func GetBlockDataFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBl
229240
Msg("skipping block because chainId is nil")
230241
continue
231242
}
232-
if blocksRaw[i].TransactionCount != uint64(len(transactionsRaw[i])) {
233-
log.Info().
234-
Any("transactionCount", blocksRaw[i].TransactionCount).
235-
Any("transactionsRaw", transactionsRaw[i]).
236-
Msg("skipping block because transactionCount does not match")
237-
continue
238-
}
239-
if (blocksRaw[i].LogsBloom != "" && blocksRaw[i].LogsBloom != EMPTY_LOGS_BLOOM) && len(logsRaw[i]) == 0 {
240-
log.Info().
241-
Any("logsBloom", blocksRaw[i].LogsBloom).
242-
Any("logsRaw", logsRaw[i]).
243-
Msg("skipping block because logsBloom is not empty and logsRaw is empty")
244-
continue
245-
}
246243
blockData[i] = &common.BlockData{
247244
Block: blocksRaw[i],
248245
Transactions: transactionsRaw[i],
@@ -253,6 +250,162 @@ func GetBlockDataFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBl
253250
return blockData, nil
254251
}
255252

253+
// GetTransactionMismatchRangeFromClickHouseV2 checks, for blocks in the given range,
254+
// where the stored transaction_count in the blocks table does not match the number
255+
// of transactions in the transactions table. It returns the minimum and maximum
256+
// block numbers that have a mismatch, or (-1, -1) if all blocks are consistent.
257+
func GetTransactionMismatchRangeFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) (int64, int64, error) {
258+
if endBlockNumber < startBlockNumber {
259+
return -1, -1, nil
260+
}
261+
262+
blocksRaw, err := getBlocksFromV2(chainId, startBlockNumber, endBlockNumber)
263+
if err != nil {
264+
return -1, -1, fmt.Errorf("GetTransactionMismatchRangeFromClickHouseV2: failed to load blocks: %w", err)
265+
}
266+
267+
// Aggregate transaction counts per block from the transactions table.
268+
query := fmt.Sprintf(
269+
"SELECT block_number, count() AS tx_count FROM %s.transactions FINAL WHERE chain_id = %d AND block_number BETWEEN %d AND %d GROUP BY block_number ORDER BY block_number",
270+
config.Cfg.CommitterClickhouseDatabase,
271+
chainId,
272+
startBlockNumber,
273+
endBlockNumber,
274+
)
275+
276+
txAggRows, err := execQueryV2[blockTxAggregate](query)
277+
if err != nil {
278+
return -1, -1, fmt.Errorf("GetTransactionMismatchRangeFromClickHouseV2: failed to load tx aggregates: %w", err)
279+
}
280+
281+
txCounts := make(map[uint64]uint64, len(txAggRows))
282+
for _, row := range txAggRows {
283+
if row.BlockNumber == nil {
284+
continue
285+
}
286+
txCounts[row.BlockNumber.Uint64()] = row.TxCount
287+
}
288+
289+
var mismatchStart int64 = -1
290+
var mismatchEnd int64 = -1
291+
292+
for _, block := range blocksRaw {
293+
if block.ChainId == nil || block.ChainId.Uint64() == 0 || block.Number == nil {
294+
continue
295+
}
296+
297+
bn := block.Number.Uint64()
298+
expectedTxCount := block.TransactionCount
299+
actualTxCount, hasTx := txCounts[bn]
300+
301+
mismatch := false
302+
if expectedTxCount == 0 {
303+
// Header says no transactions; ensure there are none in the table.
304+
if hasTx && actualTxCount > 0 {
305+
mismatch = true
306+
}
307+
} else {
308+
// Header says there should be transactions.
309+
if !hasTx || actualTxCount != expectedTxCount {
310+
mismatch = true
311+
}
312+
}
313+
314+
if mismatch {
315+
if mismatchStart == -1 || int64(bn) < mismatchStart {
316+
mismatchStart = int64(bn)
317+
}
318+
if mismatchEnd == -1 || int64(bn) > mismatchEnd {
319+
mismatchEnd = int64(bn)
320+
}
321+
}
322+
}
323+
324+
return mismatchStart, mismatchEnd, nil
325+
}
326+
327+
// GetLogsMismatchRangeFromClickHouseV2 checks, for blocks in the given range,
328+
// where logs in the logs table are inconsistent with the block's logs_bloom:
329+
// - logsBloom is non-empty but there are no logs for that block
330+
// - logsBloom is empty/zero but logs exist
331+
// - log indexes are not contiguous (count(*) != max(log_index)+1 when logs exist)
332+
// It returns the minimum and maximum block numbers that have a mismatch, or
333+
// (-1, -1) if all blocks are consistent.
334+
func GetLogsMismatchRangeFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) (int64, int64, error) {
335+
if endBlockNumber < startBlockNumber {
336+
return -1, -1, nil
337+
}
338+
339+
blocksRaw, err := getBlocksFromV2(chainId, startBlockNumber, endBlockNumber)
340+
if err != nil {
341+
return -1, -1, fmt.Errorf("GetLogsMismatchRangeFromClickHouseV2: failed to load blocks: %w", err)
342+
}
343+
344+
// Aggregate log counts and max log_index per block from the logs table.
345+
query := fmt.Sprintf(
346+
"SELECT block_number, count() AS log_count, max(log_index) AS max_log_index FROM %s.logs FINAL WHERE chain_id = %d AND block_number BETWEEN %d AND %d GROUP BY block_number ORDER BY block_number",
347+
config.Cfg.CommitterClickhouseDatabase,
348+
chainId,
349+
startBlockNumber,
350+
endBlockNumber,
351+
)
352+
353+
logAggRows, err := execQueryV2[blockLogAggregate](query)
354+
if err != nil {
355+
return -1, -1, fmt.Errorf("GetLogsMismatchRangeFromClickHouseV2: failed to load log aggregates: %w", err)
356+
}
357+
358+
logAggs := make(map[uint64]blockLogAggregate, len(logAggRows))
359+
for _, row := range logAggRows {
360+
if row.BlockNumber == nil {
361+
continue
362+
}
363+
bn := row.BlockNumber.Uint64()
364+
logAggs[bn] = row
365+
}
366+
367+
var mismatchStart int64 = -1
368+
var mismatchEnd int64 = -1
369+
370+
for _, block := range blocksRaw {
371+
if block.ChainId == nil || block.ChainId.Uint64() == 0 || block.Number == nil {
372+
continue
373+
}
374+
375+
bn := block.Number.Uint64()
376+
hasLogsBloom := block.LogsBloom != "" && block.LogsBloom != EMPTY_LOGS_BLOOM
377+
logAgg, hasLogAgg := logAggs[bn]
378+
379+
mismatch := false
380+
381+
if hasLogsBloom {
382+
// logsBloom indicates logs should exist
383+
if !hasLogAgg || logAgg.LogCount == 0 {
384+
mismatch = true
385+
} else if logAgg.MaxLogIndex+1 != logAgg.LogCount {
386+
// log_index should be contiguous from 0..log_count-1
387+
mismatch = true
388+
}
389+
} else {
390+
// logsBloom is empty/zero; there should be no logs
391+
if hasLogAgg && logAgg.LogCount > 0 {
392+
mismatch = true
393+
}
394+
}
395+
396+
if mismatch {
397+
if mismatchStart == -1 || int64(bn) < mismatchStart {
398+
mismatchStart = int64(bn)
399+
}
400+
if mismatchEnd == -1 || int64(bn) > mismatchEnd {
401+
mismatchEnd = int64(bn)
402+
}
403+
}
404+
}
405+
406+
return mismatchStart, mismatchEnd, nil
407+
}
408+
256409
func getBlocksFromV2(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([]common.Block, error) {
257410
sb := startBlockNumber
258411
length := endBlockNumber - startBlockNumber + 1

0 commit comments

Comments
 (0)