Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions block/internal/syncing/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,11 @@ func (s *Syncer) fetchDAUntilCaughtUp(ctx context.Context) error {
}
}

// PendingCount returns the number of unprocessed height events in the pipeline.
func (s *Syncer) PendingCount() int {
return len(s.heightInCh)
}

func (s *Syncer) pendingWorkerLoop(ctx context.Context) {
defer s.wg.Done()

Expand Down
3 changes: 2 additions & 1 deletion node/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,8 @@ func (f *failoverState) waitForCatchup(ctx context.Context) (bool, error) {
p2pCaughtUp = true
}

if daCaughtUp && p2pCaughtUp {
pipelineDrained := f.bc.Syncer == nil || f.bc.Syncer.PendingCount() == 0
if daCaughtUp && p2pCaughtUp && pipelineDrained {
Comment on lines +318 to +319
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== 1) PendingCount implementation ==="
rg -n -A5 'func \(s \*Syncer\) PendingCount\(\) int' block/internal/syncing/syncer.go

echo
echo "=== 2) Syncer struct definition and state fields ==="
rg -n -B2 -A10 'type Syncer struct' block/internal/syncing/syncer.go

echo
echo "=== 3) heightInCh and pipeline queue operations ==="
rg -n -B2 -A2 'heightInCh' block/internal/syncing/syncer.go | head -80

echo
echo "=== 4) Failover.go catchup logic ==="
rg -n -B5 -A5 'pipelineDrained' node/failover.go

Repository: evstack/ev-node

Length of output: 3017


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== 1) processHeightEvent implementation ==="
rg -n -A20 'func \(s \*Syncer\) processHeightEvent' block/internal/syncing/syncer.go

echo
echo "=== 2) Cache operations and pending events ==="
rg -n -B2 -A2 'cache\.GetNextPendingEvent\|SetPendingEvent\|pendingWorkerLoop' block/internal/syncing/syncer.go | head -100

echo
echo "=== 3) Check if Syncer has any Done/Idle/Quiescent signals ==="
rg -n 'sync\.WaitGroup\|chan.*done\|chan.*idle\|chan.*quiescent' block/internal/syncing/syncer.go

echo
echo "=== 4) Any existing waiters for pipeline quiescence ==="
rg -n 'WaitForIdle\|WaitForCompletion\|WaitForQuiescence' block/internal/syncing/syncer.go

Repository: evstack/ev-node

Length of output: 869


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== 1) Full processHeightEvent implementation ==="
rg -n -A50 'func \(s \*Syncer\) processHeightEvent' block/internal/syncing/syncer.go | head -80

echo
echo "=== 2) pendingWorkerLoop - cache to channel pipeline ==="
rg -n -A30 'func \(s \*Syncer\) pendingWorkerLoop' block/internal/syncing/syncer.go

echo
echo "=== 3) Check Syncer fields for tracking work state ==="
sed -n '46,150p' block/internal/syncing/syncer.go

Repository: evstack/ev-node

Length of output: 6599


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== 1) processPendingEvents to confirm cache polling ==="
rg -n -A15 'func \(s \*Syncer\) processPendingEvents' block/internal/syncing/syncer.go

echo
echo "=== 2) Check cache.GetNextPendingEvent calls ==="
rg -n 'GetNextPendingEvent' block/internal/syncing/syncer.go

echo
echo "=== 3) How failover uses/waits for syncer ==="
rg -n -B5 -A10 'bc\.Syncer.*PendingCount\|bc\.Stop\|Syncer.*idle\|Syncer.*done' node/failover.go | head -80

Repository: evstack/ev-node

Length of output: 874


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== 1) Failover.bc.Stop and syncer lifecycle ==="
rg -n -B10 -A10 'bc\.Stop\(\)' node/failover.go

echo
echo "=== 2) Check if syncer has any Stop or WaitAll methods ==="
rg -n 'func \(s \*Syncer\) Stop\|func \(s \*Syncer\) Wait' block/internal/syncing/syncer.go

echo
echo "=== 3) Full catchup logic context in failover ==="
sed -n '300,330p' node/failover.go

echo
echo "=== 4) Verify cache.GetNextPendingEvent can be non-nil when heightInCh empty ==="
rg -n -B5 -A10 'cache.GetNextPendingEvent\|pendingEvents\|cache.*map' block/internal/syncing/syncer.go | head -60

Repository: evstack/ev-node

Length of output: 1013


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== 1) Syncer Stop method ==="
rg -n 'func \(s \*Syncer\) Stop' block/internal/syncing/syncer.go

echo
echo "=== 2) All public Syncer methods ==="
rg -n '^func \(s \*Syncer\)' block/internal/syncing/syncer.go | head -30

echo
echo "=== 3) Look at sync.WaitGroup usage in syncer ==="
rg -n 'wg\.Add\|wg\.Wait\|wg\.Done' block/internal/syncing/syncer.go

echo
echo "=== 4) Full catchup decision logic in failover ==="
sed -n '300,330p' node/failover.go

Repository: evstack/ev-node

Length of output: 2363


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== 1) Syncer.Stop() method ==="
sed -n '209,250p' block/internal/syncing/syncer.go

echo
echo "=== 2) Syncer.Start() - understand wg usage ==="
sed -n '166,210p' block/internal/syncing/syncer.go

echo
echo "=== 3) Full failover catchup check context ==="
sed -n '295,330p' node/failover.go

echo
echo "=== 4) Understanding cache state - pending events structure ==="
rg -n 'pendingEventsMu\|pendingEvents\s*map' block/internal/syncing/ -A2

Repository: evstack/ev-node

Length of output: 3638


Pipeline can complete catchup before pending cache events are processed.

PendingCount() only measures channel buffer length (line 471-472) and misses two important pipeline states:

  1. Pending cache events: When processHeightEvent() encounters an out-of-sequence block, it stores it in cache via SetPendingEvent() (line 599). The pendingWorkerLoop() polls this cache every 10ms (line 481) and re-queues events to heightInCh when the sequence is complete (line 1109-1111). These cache events don't appear in PendingCount().

  2. In-flight processing: Events dequeued from heightInCh (line 354) remain in-flight during processHeightEvent() execution, which performs I/O and state validation (lines 584-602). This work isn't tracked by channel depth.

The evidence: Stop() calls wg.Wait() (line 216) to wait for all worker goroutines, then after goroutine completion, drains heightInCh in a loop (lines 221-237). This pattern shows the channel can be empty while the pipeline hasn't reached true quiescence.

Line 318 in failover.go declares catchup complete as soon as PendingCount() == 0, which creates a race where pending cache events or in-flight processing aren't accounted for.

Implement a dedicated idle signal in Syncer that waits for all three stages: empty channel, no in-flight processing, and empty pending cache.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@node/failover.go` around lines 318 - 319, The current catchup check uses
Syncer.PendingCount() which only looks at channel buffer and misses pending
cache entries and in-flight processing; add a dedicated idle mechanism on Syncer
(e.g., WaitIdle() or IdleChan/IsIdle()) that atomically ensures: heightInCh is
empty, an in-flight counter (incremented/decremented in processHeightEvent and
when dequeuing from heightInCh) is zero, and the pending cache (populated by
SetPendingEvent and polled in pendingWorkerLoop) is empty; update Syncer to
maintain the in-flight counter and a condition or channel that is signaled when
all three are zero, expose a blocking WaitIdle() or boolean IsIdle() used in
failover.go (replace pipelineDrained := f.bc.Syncer == nil ||
f.bc.Syncer.PendingCount() == 0 with a call to the new idle API) so catchup
waits for true quiescence.

f.logger.Info().
Uint64("store_height", storeHeight).
Uint64("max_p2p_height", maxP2PHeight).
Expand Down
Loading