Skip to content

Commit 52825bf

Browse files
authored
chore: adding syncing tracing (#2981)
<!-- Please read and fill out this form before submitting your PR. Please make sure you have reviewed our contributors guide before submitting your first PR. NOTE: PR titles should follow semantic commits: https://www.conventionalcommits.org/en/v1.0.0/ --> ## Overview Follow up which adds tracing to the block syncer <!-- Please provide an explanation of the PR, including the appropriate context, background, goal, and rationale. If there is an issue with this information, please provide a tl;dr and link the issue. Ex: Closes #<issue number> -->
1 parent 01dcada commit 52825bf

File tree

7 files changed

+507
-32
lines changed

7 files changed

+507
-32
lines changed

block/components.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,10 @@ func NewSyncComponents(
151151
errorCh,
152152
)
153153

154+
if config.Instrumentation.IsTracingEnabled() {
155+
syncer.SetBlockSyncer(syncing.WithTracingBlockSyncer(syncer))
156+
}
157+
154158
// Create submitter for sync nodes (no signer, only DA inclusion processing)
155159
daSubmitter := submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger)
156160
submitter := submitting.NewSubmitter(
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package syncing
2+
3+
import (
4+
"context"
5+
6+
"github.com/evstack/ev-node/block/internal/common"
7+
"github.com/evstack/ev-node/types"
8+
)
9+
10+
// BlockSyncer defines operations for block synchronization that can be traced.
11+
// The Syncer implements this interface, and a tracing decorator can wrap it
12+
// to add OpenTelemetry spans to each operation.
13+
type BlockSyncer interface {
14+
// TrySyncNextBlock attempts to sync the next available block from DA or P2P.
15+
TrySyncNextBlock(ctx context.Context, event *common.DAHeightEvent) error
16+
17+
// ApplyBlock executes block transactions and returns the new state.
18+
ApplyBlock(ctx context.Context, header types.Header, data *types.Data, currentState types.State) (types.State, error)
19+
20+
// ValidateBlock validates block structure and state transitions.
21+
ValidateBlock(ctx context.Context, currState types.State, data *types.Data, header *types.SignedHeader) error
22+
23+
// VerifyForcedInclusionTxs verifies that forced inclusion transactions are properly handled.
24+
VerifyForcedInclusionTxs(ctx context.Context, currentState types.State, data *types.Data) error
25+
}

block/internal/syncing/syncer.go

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ import (
2828
"github.com/evstack/ev-node/types"
2929
)
3030

31+
var _ BlockSyncer = (*Syncer)(nil)
32+
3133
// forcedInclusionGracePeriodConfig contains internal configuration for forced inclusion grace periods.
3234
type forcedInclusionGracePeriodConfig struct {
3335
// basePeriod is the base number of additional epochs allowed for including forced inclusion transactions
@@ -118,6 +120,10 @@ type Syncer struct {
118120

119121
// P2P wait coordination
120122
p2pWaitState atomic.Value // stores p2pWaitState
123+
124+
// blockSyncer is the interface used for block sync operations.
125+
// defaults to self, but can be wrapped with tracing.
126+
blockSyncer BlockSyncer
121127
}
122128

123129
// pendingForcedInclusionTx represents a forced inclusion transaction that hasn't been included yet
@@ -155,7 +161,7 @@ func NewSyncer(
155161
blockFullnessEMA := &atomic.Pointer[float64]{}
156162
blockFullnessEMA.Store(&initialFullness)
157163

158-
return &Syncer{
164+
s := &Syncer{
159165
store: store,
160166
exec: exec,
161167
cache: cache,
@@ -175,6 +181,14 @@ func NewSyncer(
175181
blockFullnessEMA: blockFullnessEMA,
176182
gracePeriodConfig: newForcedInclusionGracePeriodConfig(),
177183
}
184+
s.blockSyncer = s
185+
return s
186+
}
187+
188+
// SetBlockSyncer sets the block syncer interface, allowing injection of
189+
// a tracing wrapper or other decorator.
190+
func (s *Syncer) SetBlockSyncer(bs BlockSyncer) {
191+
s.blockSyncer = bs
178192
}
179193

180194
// Start begins the syncing component
@@ -515,7 +529,7 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) {
515529
s.cancelP2PWait(height)
516530

517531
// Try to sync the next block
518-
if err := s.trySyncNextBlock(event); err != nil {
532+
if err := s.blockSyncer.TrySyncNextBlock(s.ctx, event); err != nil {
519533
s.logger.Error().Err(err).Msg("failed to sync next block")
520534
// If the error is not due to an validation error, re-store the event as pending
521535
switch {
@@ -559,12 +573,12 @@ var (
559573
errInvalidState = errors.New("invalid state")
560574
)
561575

562-
// trySyncNextBlock attempts to sync the next available block
576+
// TrySyncNextBlock attempts to sync the next available block
563577
// the event is always the next block in sequence as processHeightEvent ensures it.
564-
func (s *Syncer) trySyncNextBlock(event *common.DAHeightEvent) error {
578+
func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEvent) error {
565579
select {
566-
case <-s.ctx.Done():
567-
return s.ctx.Err()
580+
case <-ctx.Done():
581+
return ctx.Err()
568582
default:
569583
}
570584

@@ -579,7 +593,7 @@ func (s *Syncer) trySyncNextBlock(event *common.DAHeightEvent) error {
579593
// Compared to the executor logic where the current block needs to be applied first,
580594
// here only the previous block needs to be applied to proceed to the verification.
581595
// The header validation must be done before applying the block to avoid executing gibberish
582-
if err := s.validateBlock(currentState, data, header); err != nil {
596+
if err := s.ValidateBlock(ctx, currentState, data, header); err != nil {
583597
// remove header as da included (not per se needed, but keep cache clean)
584598
s.cache.RemoveHeaderDAIncluded(headerHash)
585599
if !errors.Is(err, errInvalidState) && !errors.Is(err, errInvalidBlock) {
@@ -590,7 +604,7 @@ func (s *Syncer) trySyncNextBlock(event *common.DAHeightEvent) error {
590604

591605
// Verify forced inclusion transactions if configured
592606
if event.Source == common.SourceDA {
593-
if err := s.verifyForcedInclusionTxs(currentState, data); err != nil {
607+
if err := s.VerifyForcedInclusionTxs(ctx, currentState, data); err != nil {
594608
s.logger.Error().Err(err).Uint64("height", nextHeight).Msg("forced inclusion verification failed")
595609
if errors.Is(err, errMaliciousProposer) {
596610
s.cache.RemoveHeaderDAIncluded(headerHash)
@@ -600,7 +614,7 @@ func (s *Syncer) trySyncNextBlock(event *common.DAHeightEvent) error {
600614
}
601615

602616
// Apply block
603-
newState, err := s.applyBlock(header.Header, data, currentState)
617+
newState, err := s.ApplyBlock(ctx, header.Header, data, currentState)
604618
if err != nil {
605619
return fmt.Errorf("failed to apply block: %w", err)
606620
}
@@ -650,16 +664,16 @@ func (s *Syncer) trySyncNextBlock(event *common.DAHeightEvent) error {
650664
return nil
651665
}
652666

653-
// applyBlock applies a block to get the new state
654-
func (s *Syncer) applyBlock(header types.Header, data *types.Data, currentState types.State) (types.State, error) {
667+
// ApplyBlock applies a block to get the new state
668+
func (s *Syncer) ApplyBlock(ctx context.Context, header types.Header, data *types.Data, currentState types.State) (types.State, error) {
655669
// Prepare transactions
656670
rawTxs := make([][]byte, len(data.Txs))
657671
for i, tx := range data.Txs {
658672
rawTxs[i] = []byte(tx)
659673
}
660674

661675
// Execute transactions
662-
ctx := context.WithValue(s.ctx, types.HeaderContextKey, header)
676+
ctx = context.WithValue(ctx, types.HeaderContextKey, header)
663677
newAppHash, err := s.executeTxsWithRetry(ctx, rawTxs, header, currentState)
664678
if err != nil {
665679
s.sendCriticalError(fmt.Errorf("failed to execute transactions: %w", err))
@@ -705,11 +719,11 @@ func (s *Syncer) executeTxsWithRetry(ctx context.Context, rawTxs [][]byte, heade
705719
return nil, nil
706720
}
707721

708-
// validateBlock validates a synced block
722+
// ValidateBlock validates a synced block
709723
// NOTE: if the header was gibberish and somehow passed all validation prior but the data was correct
710724
// or if the data was gibberish and somehow passed all validation prior but the header was correct
711725
// we are still losing both in the pending event. This should never happen.
712-
func (s *Syncer) validateBlock(currState types.State, data *types.Data, header *types.SignedHeader) error {
726+
func (s *Syncer) ValidateBlock(_ context.Context, currState types.State, data *types.Data, header *types.SignedHeader) error {
713727
// Set custom verifier for aggregator node signature
714728
header.SetCustomVerifierForSyncNode(s.options.SyncNodeSignatureBytesProvider)
715729

@@ -804,11 +818,11 @@ func (s *Syncer) getEffectiveGracePeriod() uint64 {
804818
return uint64(max(effectivePeriod, minPeriod))
805819
}
806820

807-
// verifyForcedInclusionTxs verifies that forced inclusion transactions from DA are properly handled.
821+
// VerifyForcedInclusionTxs verifies that forced inclusion transactions from DA are properly handled.
808822
// Note: Due to block size constraints (MaxBytes), sequencers may defer forced inclusion transactions
809823
// to future blocks (smoothing). This is legitimate behavior within an epoch.
810824
// However, ALL forced inclusion txs from an epoch MUST be included before the next epoch begins or grace boundary (whichever comes later).
811-
func (s *Syncer) verifyForcedInclusionTxs(currentState types.State, data *types.Data) error {
825+
func (s *Syncer) VerifyForcedInclusionTxs(_ context.Context, currentState types.State, data *types.Data) error {
812826
if s.fiRetriever == nil {
813827
return nil
814828
}

block/internal/syncing/syncer_forced_inclusion_test.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,7 @@ func TestVerifyForcedInclusionTxs_AllTransactionsIncluded(t *testing.T) {
410410
currentState.DAHeight = 0
411411

412412
// Verify - should pass since all forced txs are included
413-
err = s.verifyForcedInclusionTxs(currentState, data)
413+
err = s.VerifyForcedInclusionTxs(context.Background(), currentState, data)
414414
require.NoError(t, err)
415415
}
416416

@@ -486,7 +486,7 @@ func TestVerifyForcedInclusionTxs_MissingTransactions(t *testing.T) {
486486
currentState.DAHeight = 0
487487

488488
// Verify - should pass since forced tx blob may be legitimately deferred within the epoch
489-
err = s.verifyForcedInclusionTxs(currentState, data)
489+
err = s.VerifyForcedInclusionTxs(context.Background(), currentState, data)
490490
require.NoError(t, err)
491491

492492
// Mock DA for next epoch to return no forced inclusion transactions
@@ -499,7 +499,7 @@ func TestVerifyForcedInclusionTxs_MissingTransactions(t *testing.T) {
499499
data2 := makeData(gen.ChainID, 2, 1)
500500
data2.Txs[0] = []byte("regular_tx_3")
501501

502-
err = s.verifyForcedInclusionTxs(currentState, data2)
502+
err = s.VerifyForcedInclusionTxs(context.Background(), currentState, data2)
503503
require.NoError(t, err) // Should pass since DAHeight=1 equals grace boundary, not past it
504504

505505
// Mock DA for height 2 to return no forced inclusion transactions
@@ -512,7 +512,7 @@ func TestVerifyForcedInclusionTxs_MissingTransactions(t *testing.T) {
512512
data3 := makeData(gen.ChainID, 3, 1)
513513
data3.Txs[0] = types.Tx([]byte("regular_tx_4"))
514514

515-
err = s.verifyForcedInclusionTxs(currentState, data3)
515+
err = s.VerifyForcedInclusionTxs(context.Background(), currentState, data3)
516516
require.Error(t, err)
517517
require.Contains(t, err.Error(), "sequencer is malicious")
518518
require.Contains(t, err.Error(), "past grace boundary")
@@ -591,7 +591,7 @@ func TestVerifyForcedInclusionTxs_PartiallyIncluded(t *testing.T) {
591591
currentState.DAHeight = 0
592592

593593
// Verify - should pass since dataBin2 may be legitimately deferred within the epoch
594-
err = s.verifyForcedInclusionTxs(currentState, data)
594+
err = s.VerifyForcedInclusionTxs(context.Background(), currentState, data)
595595
require.NoError(t, err)
596596

597597
// Mock DA for next epoch to return no forced inclusion transactions
@@ -605,7 +605,7 @@ func TestVerifyForcedInclusionTxs_PartiallyIncluded(t *testing.T) {
605605
data2.Txs[0] = types.Tx([]byte("regular_tx_3"))
606606

607607
// Verify - should pass since we're at the grace boundary, not past it
608-
err = s.verifyForcedInclusionTxs(currentState, data2)
608+
err = s.VerifyForcedInclusionTxs(context.Background(), currentState, data2)
609609
require.NoError(t, err)
610610

611611
// Mock DA for height 2 (when we move to DAHeight 2)
@@ -620,7 +620,7 @@ func TestVerifyForcedInclusionTxs_PartiallyIncluded(t *testing.T) {
620620
data3 := makeData(gen.ChainID, 3, 1)
621621
data3.Txs[0] = types.Tx([]byte("regular_tx_4"))
622622

623-
err = s.verifyForcedInclusionTxs(currentState, data3)
623+
err = s.VerifyForcedInclusionTxs(context.Background(), currentState, data3)
624624
require.Error(t, err)
625625
require.Contains(t, err.Error(), "sequencer is malicious")
626626
require.Contains(t, err.Error(), "past grace boundary")
@@ -691,7 +691,7 @@ func TestVerifyForcedInclusionTxs_NoForcedTransactions(t *testing.T) {
691691
currentState.DAHeight = 0
692692

693693
// Verify - should pass since no forced txs to verify
694-
err = s.verifyForcedInclusionTxs(currentState, data)
694+
err = s.VerifyForcedInclusionTxs(context.Background(), currentState, data)
695695
require.NoError(t, err)
696696
}
697697

@@ -754,7 +754,7 @@ func TestVerifyForcedInclusionTxs_NamespaceNotConfigured(t *testing.T) {
754754
currentState.DAHeight = 0
755755

756756
// Verify - should pass since namespace not configured
757-
err = s.verifyForcedInclusionTxs(currentState, data)
757+
err = s.VerifyForcedInclusionTxs(context.Background(), currentState, data)
758758
require.NoError(t, err)
759759
}
760760

@@ -841,7 +841,7 @@ func TestVerifyForcedInclusionTxs_DeferralWithinEpoch(t *testing.T) {
841841
currentState.DAHeight = 104
842842

843843
// Verify - should pass since dataBin2 can be deferred within epoch
844-
err = s.verifyForcedInclusionTxs(currentState, data1)
844+
err = s.VerifyForcedInclusionTxs(context.Background(), currentState, data1)
845845
require.NoError(t, err)
846846

847847
// Verify that dataBin2 is now tracked as pending
@@ -870,7 +870,7 @@ func TestVerifyForcedInclusionTxs_DeferralWithinEpoch(t *testing.T) {
870870
data2.Txs[1] = types.Tx(dataBin2) // The deferred one we're waiting for
871871

872872
// Verify - should pass since dataBin2 is now included and clears pending
873-
err = s.verifyForcedInclusionTxs(currentState, data2)
873+
err = s.VerifyForcedInclusionTxs(context.Background(), currentState, data2)
874874
require.NoError(t, err)
875875

876876
// Verify that pending queue is now empty (dataBin2 was included)
@@ -965,7 +965,7 @@ func TestVerifyForcedInclusionTxs_MaliciousAfterEpochEnd(t *testing.T) {
965965
currentState.DAHeight = 102
966966

967967
// Verify - should pass, tx can be deferred within epoch
968-
err = s.verifyForcedInclusionTxs(currentState, data1)
968+
err = s.VerifyForcedInclusionTxs(context.Background(), currentState, data1)
969969
require.NoError(t, err)
970970
}
971971

@@ -1059,6 +1059,6 @@ func TestVerifyForcedInclusionTxs_SmoothingExceedsEpoch(t *testing.T) {
10591059
currentState := s.getLastState()
10601060
currentState.DAHeight = 102 // At epoch end
10611061

1062-
err = s.verifyForcedInclusionTxs(currentState, data1)
1062+
err = s.VerifyForcedInclusionTxs(context.Background(), currentState, data1)
10631063
require.NoError(t, err, "smoothing within epoch should be allowed")
10641064
}

block/internal/syncing/syncer_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -134,19 +134,19 @@ func TestSyncer_validateBlock_DataHashMismatch(t *testing.T) {
134134
data := makeData(gen.ChainID, 1, 2) // non-empty
135135
_, header := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, nil, data, nil)
136136

137-
err = s.validateBlock(s.getLastState(), data, header)
137+
err = s.ValidateBlock(context.Background(), s.getLastState(), data, header)
138138
require.NoError(t, err)
139139

140140
// Create header and data with mismatched hash
141141
data = makeData(gen.ChainID, 1, 2) // non-empty
142142
_, header = makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, nil, nil, nil)
143-
err = s.validateBlock(s.getLastState(), data, header)
143+
err = s.ValidateBlock(context.Background(), s.getLastState(), data, header)
144144
require.Error(t, err)
145145

146146
// Create header and empty data
147147
data = makeData(gen.ChainID, 1, 0) // empty
148148
_, header = makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, nil, nil, nil)
149-
err = s.validateBlock(s.getLastState(), data, header)
149+
err = s.ValidateBlock(context.Background(), s.getLastState(), data, header)
150150
require.Error(t, err)
151151
}
152152

0 commit comments

Comments
 (0)