diff --git a/block/components.go b/block/components.go index 9c1a9d17f..3d276206b 100644 --- a/block/components.go +++ b/block/components.go @@ -151,6 +151,10 @@ func NewSyncComponents( errorCh, ) + if config.Instrumentation.IsTracingEnabled() { + syncer.SetBlockSyncer(syncing.WithTracingBlockSyncer(syncer)) + } + // Create submitter for sync nodes (no signer, only DA inclusion processing) daSubmitter := submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger) submitter := submitting.NewSubmitter( diff --git a/block/internal/syncing/block_syncer.go b/block/internal/syncing/block_syncer.go new file mode 100644 index 000000000..e48dd4677 --- /dev/null +++ b/block/internal/syncing/block_syncer.go @@ -0,0 +1,25 @@ +package syncing + +import ( + "context" + + "github.com/evstack/ev-node/block/internal/common" + "github.com/evstack/ev-node/types" +) + +// BlockSyncer defines operations for block synchronization that can be traced. +// The Syncer implements this interface, and a tracing decorator can wrap it +// to add OpenTelemetry spans to each operation. +type BlockSyncer interface { + // TrySyncNextBlock attempts to sync the next available block from DA or P2P. + TrySyncNextBlock(ctx context.Context, event *common.DAHeightEvent) error + + // ApplyBlock executes block transactions and returns the new state. + ApplyBlock(ctx context.Context, header types.Header, data *types.Data, currentState types.State) (types.State, error) + + // ValidateBlock validates block structure and state transitions. + ValidateBlock(ctx context.Context, currState types.State, data *types.Data, header *types.SignedHeader) error + + // VerifyForcedInclusionTxs verifies that forced inclusion transactions are properly handled. + VerifyForcedInclusionTxs(ctx context.Context, currentState types.State, data *types.Data) error +} diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 0d6d56f0d..0ea9d03dd 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -28,6 +28,8 @@ import ( "github.com/evstack/ev-node/types" ) +var _ BlockSyncer = (*Syncer)(nil) + // forcedInclusionGracePeriodConfig contains internal configuration for forced inclusion grace periods. type forcedInclusionGracePeriodConfig struct { // basePeriod is the base number of additional epochs allowed for including forced inclusion transactions @@ -118,6 +120,10 @@ type Syncer struct { // P2P wait coordination p2pWaitState atomic.Value // stores p2pWaitState + + // blockSyncer is the interface used for block sync operations. + // defaults to self, but can be wrapped with tracing. + blockSyncer BlockSyncer } // pendingForcedInclusionTx represents a forced inclusion transaction that hasn't been included yet @@ -155,7 +161,7 @@ func NewSyncer( blockFullnessEMA := &atomic.Pointer[float64]{} blockFullnessEMA.Store(&initialFullness) - return &Syncer{ + s := &Syncer{ store: store, exec: exec, cache: cache, @@ -175,6 +181,14 @@ func NewSyncer( blockFullnessEMA: blockFullnessEMA, gracePeriodConfig: newForcedInclusionGracePeriodConfig(), } + s.blockSyncer = s + return s +} + +// SetBlockSyncer sets the block syncer interface, allowing injection of +// a tracing wrapper or other decorator. +func (s *Syncer) SetBlockSyncer(bs BlockSyncer) { + s.blockSyncer = bs } // Start begins the syncing component @@ -515,7 +529,7 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) { s.cancelP2PWait(height) // Try to sync the next block - if err := s.trySyncNextBlock(event); err != nil { + if err := s.blockSyncer.TrySyncNextBlock(s.ctx, event); err != nil { s.logger.Error().Err(err).Msg("failed to sync next block") // If the error is not due to an validation error, re-store the event as pending switch { @@ -559,12 +573,12 @@ var ( errInvalidState = errors.New("invalid state") ) -// trySyncNextBlock attempts to sync the next available block +// TrySyncNextBlock attempts to sync the next available block // the event is always the next block in sequence as processHeightEvent ensures it. -func (s *Syncer) trySyncNextBlock(event *common.DAHeightEvent) error { +func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEvent) error { select { - case <-s.ctx.Done(): - return s.ctx.Err() + case <-ctx.Done(): + return ctx.Err() default: } @@ -579,7 +593,7 @@ func (s *Syncer) trySyncNextBlock(event *common.DAHeightEvent) error { // Compared to the executor logic where the current block needs to be applied first, // here only the previous block needs to be applied to proceed to the verification. // The header validation must be done before applying the block to avoid executing gibberish - if err := s.validateBlock(currentState, data, header); err != nil { + if err := s.ValidateBlock(ctx, currentState, data, header); err != nil { // remove header as da included (not per se needed, but keep cache clean) s.cache.RemoveHeaderDAIncluded(headerHash) if !errors.Is(err, errInvalidState) && !errors.Is(err, errInvalidBlock) { @@ -590,7 +604,7 @@ func (s *Syncer) trySyncNextBlock(event *common.DAHeightEvent) error { // Verify forced inclusion transactions if configured if event.Source == common.SourceDA { - if err := s.verifyForcedInclusionTxs(currentState, data); err != nil { + if err := s.VerifyForcedInclusionTxs(ctx, currentState, data); err != nil { s.logger.Error().Err(err).Uint64("height", nextHeight).Msg("forced inclusion verification failed") if errors.Is(err, errMaliciousProposer) { s.cache.RemoveHeaderDAIncluded(headerHash) @@ -600,7 +614,7 @@ func (s *Syncer) trySyncNextBlock(event *common.DAHeightEvent) error { } // Apply block - newState, err := s.applyBlock(header.Header, data, currentState) + newState, err := s.ApplyBlock(ctx, header.Header, data, currentState) if err != nil { return fmt.Errorf("failed to apply block: %w", err) } @@ -650,8 +664,8 @@ func (s *Syncer) trySyncNextBlock(event *common.DAHeightEvent) error { return nil } -// applyBlock applies a block to get the new state -func (s *Syncer) applyBlock(header types.Header, data *types.Data, currentState types.State) (types.State, error) { +// ApplyBlock applies a block to get the new state +func (s *Syncer) ApplyBlock(ctx context.Context, header types.Header, data *types.Data, currentState types.State) (types.State, error) { // Prepare transactions rawTxs := make([][]byte, len(data.Txs)) for i, tx := range data.Txs { @@ -659,7 +673,7 @@ func (s *Syncer) applyBlock(header types.Header, data *types.Data, currentState } // Execute transactions - ctx := context.WithValue(s.ctx, types.HeaderContextKey, header) + ctx = context.WithValue(ctx, types.HeaderContextKey, header) newAppHash, err := s.executeTxsWithRetry(ctx, rawTxs, header, currentState) if err != nil { s.sendCriticalError(fmt.Errorf("failed to execute transactions: %w", err)) @@ -705,11 +719,11 @@ func (s *Syncer) executeTxsWithRetry(ctx context.Context, rawTxs [][]byte, heade return nil, nil } -// validateBlock validates a synced block +// ValidateBlock validates a synced block // NOTE: if the header was gibberish and somehow passed all validation prior but the data was correct // or if the data was gibberish and somehow passed all validation prior but the header was correct // we are still losing both in the pending event. This should never happen. -func (s *Syncer) validateBlock(currState types.State, data *types.Data, header *types.SignedHeader) error { +func (s *Syncer) ValidateBlock(_ context.Context, currState types.State, data *types.Data, header *types.SignedHeader) error { // Set custom verifier for aggregator node signature header.SetCustomVerifierForSyncNode(s.options.SyncNodeSignatureBytesProvider) @@ -804,11 +818,11 @@ func (s *Syncer) getEffectiveGracePeriod() uint64 { return uint64(max(effectivePeriod, minPeriod)) } -// verifyForcedInclusionTxs verifies that forced inclusion transactions from DA are properly handled. +// VerifyForcedInclusionTxs verifies that forced inclusion transactions from DA are properly handled. // Note: Due to block size constraints (MaxBytes), sequencers may defer forced inclusion transactions // to future blocks (smoothing). This is legitimate behavior within an epoch. // However, ALL forced inclusion txs from an epoch MUST be included before the next epoch begins or grace boundary (whichever comes later). -func (s *Syncer) verifyForcedInclusionTxs(currentState types.State, data *types.Data) error { +func (s *Syncer) VerifyForcedInclusionTxs(_ context.Context, currentState types.State, data *types.Data) error { if s.fiRetriever == nil { return nil } diff --git a/block/internal/syncing/syncer_forced_inclusion_test.go b/block/internal/syncing/syncer_forced_inclusion_test.go index 61c556e83..6425b0410 100644 --- a/block/internal/syncing/syncer_forced_inclusion_test.go +++ b/block/internal/syncing/syncer_forced_inclusion_test.go @@ -410,7 +410,7 @@ func TestVerifyForcedInclusionTxs_AllTransactionsIncluded(t *testing.T) { currentState.DAHeight = 0 // Verify - should pass since all forced txs are included - err = s.verifyForcedInclusionTxs(currentState, data) + err = s.VerifyForcedInclusionTxs(context.Background(), currentState, data) require.NoError(t, err) } @@ -486,7 +486,7 @@ func TestVerifyForcedInclusionTxs_MissingTransactions(t *testing.T) { currentState.DAHeight = 0 // Verify - should pass since forced tx blob may be legitimately deferred within the epoch - err = s.verifyForcedInclusionTxs(currentState, data) + err = s.VerifyForcedInclusionTxs(context.Background(), currentState, data) require.NoError(t, err) // Mock DA for next epoch to return no forced inclusion transactions @@ -499,7 +499,7 @@ func TestVerifyForcedInclusionTxs_MissingTransactions(t *testing.T) { data2 := makeData(gen.ChainID, 2, 1) data2.Txs[0] = []byte("regular_tx_3") - err = s.verifyForcedInclusionTxs(currentState, data2) + err = s.VerifyForcedInclusionTxs(context.Background(), currentState, data2) require.NoError(t, err) // Should pass since DAHeight=1 equals grace boundary, not past it // Mock DA for height 2 to return no forced inclusion transactions @@ -512,7 +512,7 @@ func TestVerifyForcedInclusionTxs_MissingTransactions(t *testing.T) { data3 := makeData(gen.ChainID, 3, 1) data3.Txs[0] = types.Tx([]byte("regular_tx_4")) - err = s.verifyForcedInclusionTxs(currentState, data3) + err = s.VerifyForcedInclusionTxs(context.Background(), currentState, data3) require.Error(t, err) require.Contains(t, err.Error(), "sequencer is malicious") require.Contains(t, err.Error(), "past grace boundary") @@ -591,7 +591,7 @@ func TestVerifyForcedInclusionTxs_PartiallyIncluded(t *testing.T) { currentState.DAHeight = 0 // Verify - should pass since dataBin2 may be legitimately deferred within the epoch - err = s.verifyForcedInclusionTxs(currentState, data) + err = s.VerifyForcedInclusionTxs(context.Background(), currentState, data) require.NoError(t, err) // Mock DA for next epoch to return no forced inclusion transactions @@ -605,7 +605,7 @@ func TestVerifyForcedInclusionTxs_PartiallyIncluded(t *testing.T) { data2.Txs[0] = types.Tx([]byte("regular_tx_3")) // Verify - should pass since we're at the grace boundary, not past it - err = s.verifyForcedInclusionTxs(currentState, data2) + err = s.VerifyForcedInclusionTxs(context.Background(), currentState, data2) require.NoError(t, err) // Mock DA for height 2 (when we move to DAHeight 2) @@ -620,7 +620,7 @@ func TestVerifyForcedInclusionTxs_PartiallyIncluded(t *testing.T) { data3 := makeData(gen.ChainID, 3, 1) data3.Txs[0] = types.Tx([]byte("regular_tx_4")) - err = s.verifyForcedInclusionTxs(currentState, data3) + err = s.VerifyForcedInclusionTxs(context.Background(), currentState, data3) require.Error(t, err) require.Contains(t, err.Error(), "sequencer is malicious") require.Contains(t, err.Error(), "past grace boundary") @@ -691,7 +691,7 @@ func TestVerifyForcedInclusionTxs_NoForcedTransactions(t *testing.T) { currentState.DAHeight = 0 // Verify - should pass since no forced txs to verify - err = s.verifyForcedInclusionTxs(currentState, data) + err = s.VerifyForcedInclusionTxs(context.Background(), currentState, data) require.NoError(t, err) } @@ -754,7 +754,7 @@ func TestVerifyForcedInclusionTxs_NamespaceNotConfigured(t *testing.T) { currentState.DAHeight = 0 // Verify - should pass since namespace not configured - err = s.verifyForcedInclusionTxs(currentState, data) + err = s.VerifyForcedInclusionTxs(context.Background(), currentState, data) require.NoError(t, err) } @@ -841,7 +841,7 @@ func TestVerifyForcedInclusionTxs_DeferralWithinEpoch(t *testing.T) { currentState.DAHeight = 104 // Verify - should pass since dataBin2 can be deferred within epoch - err = s.verifyForcedInclusionTxs(currentState, data1) + err = s.VerifyForcedInclusionTxs(context.Background(), currentState, data1) require.NoError(t, err) // Verify that dataBin2 is now tracked as pending @@ -870,7 +870,7 @@ func TestVerifyForcedInclusionTxs_DeferralWithinEpoch(t *testing.T) { data2.Txs[1] = types.Tx(dataBin2) // The deferred one we're waiting for // Verify - should pass since dataBin2 is now included and clears pending - err = s.verifyForcedInclusionTxs(currentState, data2) + err = s.VerifyForcedInclusionTxs(context.Background(), currentState, data2) require.NoError(t, err) // Verify that pending queue is now empty (dataBin2 was included) @@ -965,7 +965,7 @@ func TestVerifyForcedInclusionTxs_MaliciousAfterEpochEnd(t *testing.T) { currentState.DAHeight = 102 // Verify - should pass, tx can be deferred within epoch - err = s.verifyForcedInclusionTxs(currentState, data1) + err = s.VerifyForcedInclusionTxs(context.Background(), currentState, data1) require.NoError(t, err) } @@ -1059,6 +1059,6 @@ func TestVerifyForcedInclusionTxs_SmoothingExceedsEpoch(t *testing.T) { currentState := s.getLastState() currentState.DAHeight = 102 // At epoch end - err = s.verifyForcedInclusionTxs(currentState, data1) + err = s.VerifyForcedInclusionTxs(context.Background(), currentState, data1) require.NoError(t, err, "smoothing within epoch should be allowed") } diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index 21b012cf2..554e80f63 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -134,19 +134,19 @@ func TestSyncer_validateBlock_DataHashMismatch(t *testing.T) { data := makeData(gen.ChainID, 1, 2) // non-empty _, header := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, nil, data, nil) - err = s.validateBlock(s.getLastState(), data, header) + err = s.ValidateBlock(context.Background(), s.getLastState(), data, header) require.NoError(t, err) // Create header and data with mismatched hash data = makeData(gen.ChainID, 1, 2) // non-empty _, header = makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, nil, nil, nil) - err = s.validateBlock(s.getLastState(), data, header) + err = s.ValidateBlock(context.Background(), s.getLastState(), data, header) require.Error(t, err) // Create header and empty data data = makeData(gen.ChainID, 1, 0) // empty _, header = makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, nil, nil, nil) - err = s.validateBlock(s.getLastState(), data, header) + err = s.ValidateBlock(context.Background(), s.getLastState(), data, header) require.Error(t, err) } diff --git a/block/internal/syncing/tracing.go b/block/internal/syncing/tracing.go new file mode 100644 index 000000000..bc4326366 --- /dev/null +++ b/block/internal/syncing/tracing.go @@ -0,0 +1,103 @@ +package syncing + +import ( + "context" + "encoding/hex" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + + "github.com/evstack/ev-node/block/internal/common" + "github.com/evstack/ev-node/types" +) + +var _ BlockSyncer = (*tracedBlockSyncer)(nil) + +// tracedBlockSyncer decorates a BlockSyncer with OpenTelemetry spans. +type tracedBlockSyncer struct { + inner BlockSyncer + tracer trace.Tracer +} + +// WithTracingBlockSyncer decorates the provided BlockSyncer with tracing spans. +func WithTracingBlockSyncer(inner BlockSyncer) BlockSyncer { + return &tracedBlockSyncer{ + inner: inner, + tracer: otel.Tracer("ev-node/block-syncer"), + } +} + +func (t *tracedBlockSyncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEvent) error { + ctx, span := t.tracer.Start(ctx, "BlockSyncer.SyncBlock", + trace.WithAttributes( + attribute.Int64("block.height", int64(event.Header.Height())), + attribute.Int64("da.height", int64(event.DaHeight)), + attribute.String("source", string(event.Source)), + ), + ) + defer span.End() + + err := t.inner.TrySyncNextBlock(ctx, event) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + return err +} + +func (t *tracedBlockSyncer) ApplyBlock(ctx context.Context, header types.Header, data *types.Data, currentState types.State) (types.State, error) { + ctx, span := t.tracer.Start(ctx, "BlockSyncer.ApplyBlock", + trace.WithAttributes( + attribute.Int64("block.height", int64(header.Height())), + attribute.Int("tx.count", len(data.Txs)), + ), + ) + defer span.End() + + state, err := t.inner.ApplyBlock(ctx, header, data, currentState) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return types.State{}, err + } + + span.SetAttributes( + attribute.String("state_root", hex.EncodeToString(state.AppHash)), + ) + return state, nil +} + +func (t *tracedBlockSyncer) ValidateBlock(ctx context.Context, currState types.State, data *types.Data, header *types.SignedHeader) error { + ctx, span := t.tracer.Start(ctx, "BlockSyncer.ValidateBlock", + trace.WithAttributes( + attribute.Int64("block.height", int64(header.Height())), + ), + ) + defer span.End() + + err := t.inner.ValidateBlock(ctx, currState, data, header) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + return err +} + +func (t *tracedBlockSyncer) VerifyForcedInclusionTxs(ctx context.Context, currentState types.State, data *types.Data) error { + ctx, span := t.tracer.Start(ctx, "BlockSyncer.VerifyForcedInclusion", + trace.WithAttributes( + attribute.Int64("block.height", int64(data.Height())), + attribute.Int64("da.height", int64(currentState.DAHeight)), + ), + ) + defer span.End() + + err := t.inner.VerifyForcedInclusionTxs(ctx, currentState, data) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + return err +} diff --git a/block/internal/syncing/tracing_test.go b/block/internal/syncing/tracing_test.go new file mode 100644 index 000000000..d0d398301 --- /dev/null +++ b/block/internal/syncing/tracing_test.go @@ -0,0 +1,329 @@ +package syncing + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + + "github.com/evstack/ev-node/block/internal/common" + "github.com/evstack/ev-node/types" +) + +// mockBlockSyncer provides function hooks for testing the tracing decorator. +type mockBlockSyncer struct { + trySyncNextBlockFn func(ctx context.Context, event *common.DAHeightEvent) error + applyBlockFn func(ctx context.Context, header types.Header, data *types.Data, currentState types.State) (types.State, error) + validateBlockFn func(ctx context.Context, currState types.State, data *types.Data, header *types.SignedHeader) error + verifyForcedInclusionFn func(ctx context.Context, currentState types.State, data *types.Data) error +} + +func (m *mockBlockSyncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEvent) error { + if m.trySyncNextBlockFn != nil { + return m.trySyncNextBlockFn(ctx, event) + } + return nil +} + +func (m *mockBlockSyncer) ApplyBlock(ctx context.Context, header types.Header, data *types.Data, currentState types.State) (types.State, error) { + if m.applyBlockFn != nil { + return m.applyBlockFn(ctx, header, data, currentState) + } + return types.State{}, nil +} + +func (m *mockBlockSyncer) ValidateBlock(ctx context.Context, currState types.State, data *types.Data, header *types.SignedHeader) error { + if m.validateBlockFn != nil { + return m.validateBlockFn(ctx, currState, data, header) + } + return nil +} + +func (m *mockBlockSyncer) VerifyForcedInclusionTxs(ctx context.Context, currentState types.State, data *types.Data) error { + if m.verifyForcedInclusionFn != nil { + return m.verifyForcedInclusionFn(ctx, currentState, data) + } + return nil +} + +func setupBlockSyncerTrace(t *testing.T, inner BlockSyncer) (BlockSyncer, *tracetest.SpanRecorder) { + t.Helper() + sr := tracetest.NewSpanRecorder() + tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + t.Cleanup(func() { _ = tp.Shutdown(context.Background()) }) + otel.SetTracerProvider(tp) + return WithTracingBlockSyncer(inner), sr +} + +func TestTracedBlockSyncer_TrySyncNextBlock_Success(t *testing.T) { + mock := &mockBlockSyncer{ + trySyncNextBlockFn: func(ctx context.Context, event *common.DAHeightEvent) error { + return nil + }, + } + syncer, sr := setupBlockSyncerTrace(t, mock) + ctx := context.Background() + + event := &common.DAHeightEvent{ + Header: &types.SignedHeader{ + Header: types.Header{ + BaseHeader: types.BaseHeader{ + Height: 100, + }, + }, + }, + DaHeight: 50, + Source: common.SourceDA, + } + + err := syncer.TrySyncNextBlock(ctx, event) + require.NoError(t, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "BlockSyncer.SyncBlock", span.Name()) + require.Equal(t, codes.Unset, span.Status().Code) + + attrs := span.Attributes() + requireAttribute(t, attrs, "block.height", int64(100)) + requireAttribute(t, attrs, "da.height", int64(50)) + requireAttribute(t, attrs, "source", string(common.SourceDA)) +} + +func TestTracedBlockSyncer_TrySyncNextBlock_Error(t *testing.T) { + mock := &mockBlockSyncer{ + trySyncNextBlockFn: func(ctx context.Context, event *common.DAHeightEvent) error { + return errors.New("sync failed") + }, + } + syncer, sr := setupBlockSyncerTrace(t, mock) + ctx := context.Background() + + event := &common.DAHeightEvent{ + Header: &types.SignedHeader{ + Header: types.Header{ + BaseHeader: types.BaseHeader{ + Height: 100, + }, + }, + }, + DaHeight: 50, + Source: common.SourceDA, + } + + err := syncer.TrySyncNextBlock(ctx, event) + require.Error(t, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, codes.Error, span.Status().Code) + require.Equal(t, "sync failed", span.Status().Description) +} + +func TestTracedBlockSyncer_ApplyBlock_Success(t *testing.T) { + mock := &mockBlockSyncer{ + applyBlockFn: func(ctx context.Context, header types.Header, data *types.Data, currentState types.State) (types.State, error) { + return types.State{ + AppHash: []byte{0xde, 0xad, 0xbe, 0xef}, + }, nil + }, + } + syncer, sr := setupBlockSyncerTrace(t, mock) + ctx := context.Background() + + header := types.Header{ + BaseHeader: types.BaseHeader{ + Height: 50, + }, + } + data := &types.Data{ + Txs: types.Txs{[]byte("tx1"), []byte("tx2")}, + } + + state, err := syncer.ApplyBlock(ctx, header, data, types.State{}) + require.NoError(t, err) + require.NotEmpty(t, state.AppHash) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "BlockSyncer.ApplyBlock", span.Name()) + require.Equal(t, codes.Unset, span.Status().Code) + + attrs := span.Attributes() + requireAttribute(t, attrs, "block.height", int64(50)) + requireAttribute(t, attrs, "tx.count", 2) + requireAttribute(t, attrs, "state_root", "deadbeef") +} + +func TestTracedBlockSyncer_ApplyBlock_Error(t *testing.T) { + mock := &mockBlockSyncer{ + applyBlockFn: func(ctx context.Context, header types.Header, data *types.Data, currentState types.State) (types.State, error) { + return types.State{}, errors.New("execution failed") + }, + } + syncer, sr := setupBlockSyncerTrace(t, mock) + ctx := context.Background() + + header := types.Header{ + BaseHeader: types.BaseHeader{ + Height: 50, + }, + } + + _, err := syncer.ApplyBlock(ctx, header, &types.Data{}, types.State{}) + require.Error(t, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, codes.Error, span.Status().Code) + require.Equal(t, "execution failed", span.Status().Description) +} + +func TestTracedBlockSyncer_ValidateBlock_Success(t *testing.T) { + mock := &mockBlockSyncer{ + validateBlockFn: func(ctx context.Context, currState types.State, data *types.Data, header *types.SignedHeader) error { + return nil + }, + } + syncer, sr := setupBlockSyncerTrace(t, mock) + ctx := context.Background() + + header := &types.SignedHeader{ + Header: types.Header{ + BaseHeader: types.BaseHeader{ + Height: 75, + }, + }, + } + + err := syncer.ValidateBlock(ctx, types.State{}, &types.Data{}, header) + require.NoError(t, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "BlockSyncer.ValidateBlock", span.Name()) + require.Equal(t, codes.Unset, span.Status().Code) + + attrs := span.Attributes() + requireAttribute(t, attrs, "block.height", int64(75)) +} + +func TestTracedBlockSyncer_ValidateBlock_Error(t *testing.T) { + mock := &mockBlockSyncer{ + validateBlockFn: func(ctx context.Context, currState types.State, data *types.Data, header *types.SignedHeader) error { + return errors.New("validation failed") + }, + } + syncer, sr := setupBlockSyncerTrace(t, mock) + ctx := context.Background() + + header := &types.SignedHeader{ + Header: types.Header{ + BaseHeader: types.BaseHeader{ + Height: 75, + }, + }, + } + + err := syncer.ValidateBlock(ctx, types.State{}, &types.Data{}, header) + require.Error(t, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, codes.Error, span.Status().Code) + require.Equal(t, "validation failed", span.Status().Description) +} + +func TestTracedBlockSyncer_VerifyForcedInclusionTxs_Success(t *testing.T) { + mock := &mockBlockSyncer{ + verifyForcedInclusionFn: func(ctx context.Context, currentState types.State, data *types.Data) error { + return nil + }, + } + syncer, sr := setupBlockSyncerTrace(t, mock) + ctx := context.Background() + + data := &types.Data{ + Metadata: &types.Metadata{ + Height: 100, + }, + } + state := types.State{ + DAHeight: 50, + } + + err := syncer.VerifyForcedInclusionTxs(ctx, state, data) + require.NoError(t, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "BlockSyncer.VerifyForcedInclusion", span.Name()) + require.Equal(t, codes.Unset, span.Status().Code) + + attrs := span.Attributes() + requireAttribute(t, attrs, "block.height", int64(100)) + requireAttribute(t, attrs, "da.height", int64(50)) +} + +func TestTracedBlockSyncer_VerifyForcedInclusionTxs_Error(t *testing.T) { + mock := &mockBlockSyncer{ + verifyForcedInclusionFn: func(ctx context.Context, currentState types.State, data *types.Data) error { + return errors.New("forced inclusion verification failed") + }, + } + syncer, sr := setupBlockSyncerTrace(t, mock) + ctx := context.Background() + + data := &types.Data{ + Metadata: &types.Metadata{ + Height: 100, + }, + } + state := types.State{ + DAHeight: 50, + } + + err := syncer.VerifyForcedInclusionTxs(ctx, state, data) + require.Error(t, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, codes.Error, span.Status().Code) + require.Equal(t, "forced inclusion verification failed", span.Status().Description) +} + +func requireAttribute(t *testing.T, attrs []attribute.KeyValue, key string, expected interface{}) { + t.Helper() + found := false + for _, attr := range attrs { + if string(attr.Key) == key { + found = true + switch v := expected.(type) { + case string: + require.Equal(t, v, attr.Value.AsString()) + case int64: + require.Equal(t, v, attr.Value.AsInt64()) + case int: + require.Equal(t, int64(v), attr.Value.AsInt64()) + default: + t.Fatalf("unsupported attribute type: %T", expected) + } + break + } + } + require.True(t, found, "attribute %s not found", key) +}