Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions block/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ func NewSyncComponents(
errorCh,
)

if config.Instrumentation.IsTracingEnabled() {
syncer.SetBlockSyncer(syncing.WithTracingBlockSyncer(syncer))
}

// Create submitter for sync nodes (no signer, only DA inclusion processing)
daSubmitter := submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger)
submitter := submitting.NewSubmitter(
Expand Down
25 changes: 25 additions & 0 deletions block/internal/syncing/block_syncer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package syncing

import (
"context"

"github.com/evstack/ev-node/block/internal/common"
"github.com/evstack/ev-node/types"
)

// BlockSyncer defines operations for block synchronization that can be traced.
// The Syncer implements this interface, and a tracing decorator can wrap it
// to add OpenTelemetry spans to each operation.
type BlockSyncer interface {
// TrySyncNextBlock attempts to sync the next available block from DA or P2P.
TrySyncNextBlock(ctx context.Context, event *common.DAHeightEvent) error

// ApplyBlock executes block transactions and returns the new state.
ApplyBlock(ctx context.Context, header types.Header, data *types.Data, currentState types.State) (types.State, error)

// ValidateBlock validates block structure and state transitions.
ValidateBlock(ctx context.Context, currState types.State, data *types.Data, header *types.SignedHeader) error

// VerifyForcedInclusionTxs verifies that forced inclusion transactions are properly handled.
VerifyForcedInclusionTxs(ctx context.Context, currentState types.State, data *types.Data) error
}
46 changes: 30 additions & 16 deletions block/internal/syncing/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"github.com/evstack/ev-node/types"
)

var _ BlockSyncer = (*Syncer)(nil)

// forcedInclusionGracePeriodConfig contains internal configuration for forced inclusion grace periods.
type forcedInclusionGracePeriodConfig struct {
// basePeriod is the base number of additional epochs allowed for including forced inclusion transactions
Expand Down Expand Up @@ -118,6 +120,10 @@ type Syncer struct {

// P2P wait coordination
p2pWaitState atomic.Value // stores p2pWaitState

// blockSyncer is the interface used for block sync operations.
// defaults to self, but can be wrapped with tracing.
blockSyncer BlockSyncer
}

// pendingForcedInclusionTx represents a forced inclusion transaction that hasn't been included yet
Expand Down Expand Up @@ -155,7 +161,7 @@ func NewSyncer(
blockFullnessEMA := &atomic.Pointer[float64]{}
blockFullnessEMA.Store(&initialFullness)

return &Syncer{
s := &Syncer{
store: store,
exec: exec,
cache: cache,
Expand All @@ -175,6 +181,14 @@ func NewSyncer(
blockFullnessEMA: blockFullnessEMA,
gracePeriodConfig: newForcedInclusionGracePeriodConfig(),
}
s.blockSyncer = s
return s
}

// SetBlockSyncer sets the block syncer interface, allowing injection of
// a tracing wrapper or other decorator.
func (s *Syncer) SetBlockSyncer(bs BlockSyncer) {
s.blockSyncer = bs
}

// Start begins the syncing component
Expand Down Expand Up @@ -515,7 +529,7 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) {
s.cancelP2PWait(height)

// Try to sync the next block
if err := s.trySyncNextBlock(event); err != nil {
if err := s.blockSyncer.TrySyncNextBlock(s.ctx, event); err != nil {
s.logger.Error().Err(err).Msg("failed to sync next block")
// If the error is not due to an validation error, re-store the event as pending
switch {
Expand Down Expand Up @@ -559,12 +573,12 @@ var (
errInvalidState = errors.New("invalid state")
)

// trySyncNextBlock attempts to sync the next available block
// TrySyncNextBlock attempts to sync the next available block
// the event is always the next block in sequence as processHeightEvent ensures it.
func (s *Syncer) trySyncNextBlock(event *common.DAHeightEvent) error {
func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEvent) error {
select {
case <-s.ctx.Done():
return s.ctx.Err()
case <-ctx.Done():
return ctx.Err()
default:
}

Expand All @@ -579,7 +593,7 @@ func (s *Syncer) trySyncNextBlock(event *common.DAHeightEvent) error {
// Compared to the executor logic where the current block needs to be applied first,
// here only the previous block needs to be applied to proceed to the verification.
// The header validation must be done before applying the block to avoid executing gibberish
if err := s.validateBlock(currentState, data, header); err != nil {
if err := s.ValidateBlock(ctx, currentState, data, header); err != nil {
// remove header as da included (not per se needed, but keep cache clean)
s.cache.RemoveHeaderDAIncluded(headerHash)
if !errors.Is(err, errInvalidState) && !errors.Is(err, errInvalidBlock) {
Expand All @@ -590,7 +604,7 @@ func (s *Syncer) trySyncNextBlock(event *common.DAHeightEvent) error {

// Verify forced inclusion transactions if configured
if event.Source == common.SourceDA {
if err := s.verifyForcedInclusionTxs(currentState, data); err != nil {
if err := s.VerifyForcedInclusionTxs(ctx, currentState, data); err != nil {
s.logger.Error().Err(err).Uint64("height", nextHeight).Msg("forced inclusion verification failed")
if errors.Is(err, errMaliciousProposer) {
s.cache.RemoveHeaderDAIncluded(headerHash)
Expand All @@ -600,7 +614,7 @@ func (s *Syncer) trySyncNextBlock(event *common.DAHeightEvent) error {
}

// Apply block
newState, err := s.applyBlock(header.Header, data, currentState)
newState, err := s.ApplyBlock(ctx, header.Header, data, currentState)
if err != nil {
return fmt.Errorf("failed to apply block: %w", err)
}
Expand Down Expand Up @@ -650,16 +664,16 @@ func (s *Syncer) trySyncNextBlock(event *common.DAHeightEvent) error {
return nil
}

// applyBlock applies a block to get the new state
func (s *Syncer) applyBlock(header types.Header, data *types.Data, currentState types.State) (types.State, error) {
// ApplyBlock applies a block to get the new state
func (s *Syncer) ApplyBlock(ctx context.Context, header types.Header, data *types.Data, currentState types.State) (types.State, error) {
// Prepare transactions
rawTxs := make([][]byte, len(data.Txs))
for i, tx := range data.Txs {
rawTxs[i] = []byte(tx)
}

// Execute transactions
ctx := context.WithValue(s.ctx, types.HeaderContextKey, header)
ctx = context.WithValue(ctx, types.HeaderContextKey, header)
newAppHash, err := s.executeTxsWithRetry(ctx, rawTxs, header, currentState)
if err != nil {
s.sendCriticalError(fmt.Errorf("failed to execute transactions: %w", err))
Expand Down Expand Up @@ -705,11 +719,11 @@ func (s *Syncer) executeTxsWithRetry(ctx context.Context, rawTxs [][]byte, heade
return nil, nil
}

// validateBlock validates a synced block
// ValidateBlock validates a synced block
// NOTE: if the header was gibberish and somehow passed all validation prior but the data was correct
// or if the data was gibberish and somehow passed all validation prior but the header was correct
// we are still losing both in the pending event. This should never happen.
func (s *Syncer) validateBlock(currState types.State, data *types.Data, header *types.SignedHeader) error {
func (s *Syncer) ValidateBlock(_ context.Context, currState types.State, data *types.Data, header *types.SignedHeader) error {
// Set custom verifier for aggregator node signature
header.SetCustomVerifierForSyncNode(s.options.SyncNodeSignatureBytesProvider)

Expand Down Expand Up @@ -804,11 +818,11 @@ func (s *Syncer) getEffectiveGracePeriod() uint64 {
return uint64(max(effectivePeriod, minPeriod))
}

// verifyForcedInclusionTxs verifies that forced inclusion transactions from DA are properly handled.
// VerifyForcedInclusionTxs verifies that forced inclusion transactions from DA are properly handled.
// Note: Due to block size constraints (MaxBytes), sequencers may defer forced inclusion transactions
// to future blocks (smoothing). This is legitimate behavior within an epoch.
// However, ALL forced inclusion txs from an epoch MUST be included before the next epoch begins or grace boundary (whichever comes later).
func (s *Syncer) verifyForcedInclusionTxs(currentState types.State, data *types.Data) error {
func (s *Syncer) VerifyForcedInclusionTxs(_ context.Context, currentState types.State, data *types.Data) error {
if s.fiRetriever == nil {
return nil
}
Expand Down
26 changes: 13 additions & 13 deletions block/internal/syncing/syncer_forced_inclusion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ func TestVerifyForcedInclusionTxs_AllTransactionsIncluded(t *testing.T) {
currentState.DAHeight = 0

// Verify - should pass since all forced txs are included
err = s.verifyForcedInclusionTxs(currentState, data)
err = s.VerifyForcedInclusionTxs(context.Background(), currentState, data)
require.NoError(t, err)
}

Expand Down Expand Up @@ -486,7 +486,7 @@ func TestVerifyForcedInclusionTxs_MissingTransactions(t *testing.T) {
currentState.DAHeight = 0

// Verify - should pass since forced tx blob may be legitimately deferred within the epoch
err = s.verifyForcedInclusionTxs(currentState, data)
err = s.VerifyForcedInclusionTxs(context.Background(), currentState, data)
require.NoError(t, err)

// Mock DA for next epoch to return no forced inclusion transactions
Expand All @@ -499,7 +499,7 @@ func TestVerifyForcedInclusionTxs_MissingTransactions(t *testing.T) {
data2 := makeData(gen.ChainID, 2, 1)
data2.Txs[0] = []byte("regular_tx_3")

err = s.verifyForcedInclusionTxs(currentState, data2)
err = s.VerifyForcedInclusionTxs(context.Background(), currentState, data2)
require.NoError(t, err) // Should pass since DAHeight=1 equals grace boundary, not past it

// Mock DA for height 2 to return no forced inclusion transactions
Expand All @@ -512,7 +512,7 @@ func TestVerifyForcedInclusionTxs_MissingTransactions(t *testing.T) {
data3 := makeData(gen.ChainID, 3, 1)
data3.Txs[0] = types.Tx([]byte("regular_tx_4"))

err = s.verifyForcedInclusionTxs(currentState, data3)
err = s.VerifyForcedInclusionTxs(context.Background(), currentState, data3)
require.Error(t, err)
require.Contains(t, err.Error(), "sequencer is malicious")
require.Contains(t, err.Error(), "past grace boundary")
Expand Down Expand Up @@ -591,7 +591,7 @@ func TestVerifyForcedInclusionTxs_PartiallyIncluded(t *testing.T) {
currentState.DAHeight = 0

// Verify - should pass since dataBin2 may be legitimately deferred within the epoch
err = s.verifyForcedInclusionTxs(currentState, data)
err = s.VerifyForcedInclusionTxs(context.Background(), currentState, data)
require.NoError(t, err)

// Mock DA for next epoch to return no forced inclusion transactions
Expand All @@ -605,7 +605,7 @@ func TestVerifyForcedInclusionTxs_PartiallyIncluded(t *testing.T) {
data2.Txs[0] = types.Tx([]byte("regular_tx_3"))

// Verify - should pass since we're at the grace boundary, not past it
err = s.verifyForcedInclusionTxs(currentState, data2)
err = s.VerifyForcedInclusionTxs(context.Background(), currentState, data2)
require.NoError(t, err)

// Mock DA for height 2 (when we move to DAHeight 2)
Expand All @@ -620,7 +620,7 @@ func TestVerifyForcedInclusionTxs_PartiallyIncluded(t *testing.T) {
data3 := makeData(gen.ChainID, 3, 1)
data3.Txs[0] = types.Tx([]byte("regular_tx_4"))

err = s.verifyForcedInclusionTxs(currentState, data3)
err = s.VerifyForcedInclusionTxs(context.Background(), currentState, data3)
require.Error(t, err)
require.Contains(t, err.Error(), "sequencer is malicious")
require.Contains(t, err.Error(), "past grace boundary")
Expand Down Expand Up @@ -691,7 +691,7 @@ func TestVerifyForcedInclusionTxs_NoForcedTransactions(t *testing.T) {
currentState.DAHeight = 0

// Verify - should pass since no forced txs to verify
err = s.verifyForcedInclusionTxs(currentState, data)
err = s.VerifyForcedInclusionTxs(context.Background(), currentState, data)
require.NoError(t, err)
}

Expand Down Expand Up @@ -754,7 +754,7 @@ func TestVerifyForcedInclusionTxs_NamespaceNotConfigured(t *testing.T) {
currentState.DAHeight = 0

// Verify - should pass since namespace not configured
err = s.verifyForcedInclusionTxs(currentState, data)
err = s.VerifyForcedInclusionTxs(context.Background(), currentState, data)
require.NoError(t, err)
}

Expand Down Expand Up @@ -841,7 +841,7 @@ func TestVerifyForcedInclusionTxs_DeferralWithinEpoch(t *testing.T) {
currentState.DAHeight = 104

// Verify - should pass since dataBin2 can be deferred within epoch
err = s.verifyForcedInclusionTxs(currentState, data1)
err = s.VerifyForcedInclusionTxs(context.Background(), currentState, data1)
require.NoError(t, err)

// Verify that dataBin2 is now tracked as pending
Expand Down Expand Up @@ -870,7 +870,7 @@ func TestVerifyForcedInclusionTxs_DeferralWithinEpoch(t *testing.T) {
data2.Txs[1] = types.Tx(dataBin2) // The deferred one we're waiting for

// Verify - should pass since dataBin2 is now included and clears pending
err = s.verifyForcedInclusionTxs(currentState, data2)
err = s.VerifyForcedInclusionTxs(context.Background(), currentState, data2)
require.NoError(t, err)

// Verify that pending queue is now empty (dataBin2 was included)
Expand Down Expand Up @@ -965,7 +965,7 @@ func TestVerifyForcedInclusionTxs_MaliciousAfterEpochEnd(t *testing.T) {
currentState.DAHeight = 102

// Verify - should pass, tx can be deferred within epoch
err = s.verifyForcedInclusionTxs(currentState, data1)
err = s.VerifyForcedInclusionTxs(context.Background(), currentState, data1)
require.NoError(t, err)
}

Expand Down Expand Up @@ -1059,6 +1059,6 @@ func TestVerifyForcedInclusionTxs_SmoothingExceedsEpoch(t *testing.T) {
currentState := s.getLastState()
currentState.DAHeight = 102 // At epoch end

err = s.verifyForcedInclusionTxs(currentState, data1)
err = s.VerifyForcedInclusionTxs(context.Background(), currentState, data1)
require.NoError(t, err, "smoothing within epoch should be allowed")
}
6 changes: 3 additions & 3 deletions block/internal/syncing/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,19 +134,19 @@ func TestSyncer_validateBlock_DataHashMismatch(t *testing.T) {
data := makeData(gen.ChainID, 1, 2) // non-empty
_, header := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, nil, data, nil)

err = s.validateBlock(s.getLastState(), data, header)
err = s.ValidateBlock(context.Background(), s.getLastState(), data, header)
require.NoError(t, err)

// Create header and data with mismatched hash
data = makeData(gen.ChainID, 1, 2) // non-empty
_, header = makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, nil, nil, nil)
err = s.validateBlock(s.getLastState(), data, header)
err = s.ValidateBlock(context.Background(), s.getLastState(), data, header)
require.Error(t, err)

// Create header and empty data
data = makeData(gen.ChainID, 1, 0) // empty
_, header = makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, nil, nil, nil)
err = s.validateBlock(s.getLastState(), data, header)
err = s.ValidateBlock(context.Background(), s.getLastState(), data, header)
require.Error(t, err)
}

Expand Down
Loading
Loading