diff --git a/block/components.go b/block/components.go index eea7691a0..9c1a9d17f 100644 --- a/block/components.go +++ b/block/components.go @@ -220,6 +220,10 @@ func NewAggregatorComponents( return nil, fmt.Errorf("failed to create executor: %w", err) } + if config.Instrumentation.IsTracingEnabled() { + executor.SetBlockProducer(executing.WithTracingBlockProducer(executor)) + } + reaper, err := reaping.NewReaper( exec, sequencer, diff --git a/block/internal/executing/block_producer.go b/block/internal/executing/block_producer.go new file mode 100644 index 000000000..f98c15c6a --- /dev/null +++ b/block/internal/executing/block_producer.go @@ -0,0 +1,27 @@ +package executing + +import ( + "context" + + "github.com/evstack/ev-node/types" +) + +// BlockProducer defines operations for block production that can be traced. +// The Executor implements this interface, and a tracing decorator can wrap it +// to add OpenTelemetry spans to each operation. +type BlockProducer interface { + // ProduceBlock creates, validates, and broadcasts a new block. + ProduceBlock(ctx context.Context) error + + // RetrieveBatch gets the next batch of transactions from the sequencer. + RetrieveBatch(ctx context.Context) (*BatchData, error) + + // CreateBlock constructs a new block from the given batch data. + CreateBlock(ctx context.Context, height uint64, batchData *BatchData) (*types.SignedHeader, *types.Data, error) + + // ApplyBlock executes the block transactions and returns the new state. + ApplyBlock(ctx context.Context, header types.Header, data *types.Data) (types.State, error) + + // ValidateBlock validates block structure and state transitions. + ValidateBlock(ctx context.Context, lastState types.State, header *types.SignedHeader, data *types.Data) error +} diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index 845e18727..621b507a3 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -25,6 +25,8 @@ import ( "github.com/evstack/ev-node/types" ) +var _ BlockProducer = (*Executor)(nil) + // Executor handles block production, transaction processing, and state management type Executor struct { // Core components @@ -60,6 +62,10 @@ type Executor struct { ctx context.Context cancel context.CancelFunc wg sync.WaitGroup + + // blockProducer is the interface used for block production operations. + // defaults to self, but can be wrapped with tracing. + blockProducer BlockProducer } // NewExecutor creates a new block executor. @@ -101,7 +107,7 @@ func NewExecutor( } } - return &Executor{ + e := &Executor{ store: store, exec: exec, sequencer: sequencer, @@ -117,7 +123,15 @@ func NewExecutor( txNotifyCh: make(chan struct{}, 1), errorCh: errorCh, logger: logger.With().Str("component", "executor").Logger(), - }, nil + } + e.blockProducer = e + return e, nil +} + +// SetBlockProducer sets the block producer interface, allowing injection of +// a tracing wrapper or other decorator. +func (e *Executor) SetBlockProducer(bp BlockProducer) { + e.blockProducer = bp } // Start begins the execution component @@ -279,7 +293,7 @@ func (e *Executor) executionLoop() { continue } - if err := e.produceBlock(); err != nil { + if err := e.blockProducer.ProduceBlock(e.ctx); err != nil { e.logger.Error().Err(err).Msg("failed to produce block") } txsAvailable = false @@ -288,7 +302,7 @@ func (e *Executor) executionLoop() { case <-lazyTimerCh: e.logger.Debug().Msg("Lazy timer triggered block production") - if err := e.produceBlock(); err != nil { + if err := e.blockProducer.ProduceBlock(e.ctx); err != nil { e.logger.Error().Err(err).Msg("failed to produce block from lazy timer") } // Reset lazy timer @@ -300,8 +314,8 @@ func (e *Executor) executionLoop() { } } -// produceBlock creates, validates, and stores a new block -func (e *Executor) produceBlock() error { +// ProduceBlock creates, validates, and stores a new block. +func (e *Executor) ProduceBlock(ctx context.Context) error { start := time.Now() defer func() { if e.metrics.OperationDuration["block_production"] != nil { @@ -338,7 +352,7 @@ func (e *Executor) produceBlock() error { // Check if there's an already stored block at the newHeight // If there is use that instead of creating a new block - pendingHeader, pendingData, err := e.store.GetBlockData(e.ctx, newHeight) + pendingHeader, pendingData, err := e.store.GetBlockData(ctx, newHeight) if err == nil { e.logger.Info().Uint64("height", newHeight).Msg("using pending block") header = pendingHeader @@ -347,7 +361,7 @@ func (e *Executor) produceBlock() error { return fmt.Errorf("failed to get block data: %w", err) } else { // get batch from sequencer - batchData, err = e.retrieveBatch(e.ctx) + batchData, err = e.blockProducer.RetrieveBatch(ctx) if errors.Is(err, common.ErrNoBatch) { e.logger.Debug().Msg("no batch available") return nil @@ -357,13 +371,13 @@ func (e *Executor) produceBlock() error { return fmt.Errorf("failed to retrieve batch: %w", err) } - header, data, err = e.createBlock(e.ctx, newHeight, batchData) + header, data, err = e.blockProducer.CreateBlock(ctx, newHeight, batchData) if err != nil { return fmt.Errorf("failed to create block: %w", err) } // saved early for crash recovery, will be overwritten later with the final signature - batch, err := e.store.NewBatch(e.ctx) + batch, err := e.store.NewBatch(ctx) if err != nil { return fmt.Errorf("failed to create batch for early save: %w", err) } @@ -378,12 +392,12 @@ func (e *Executor) produceBlock() error { // Pass force-included mask through context for execution optimization // Force-included txs (from DA) MUST be validated as they're from untrusted sources // Mempool txs can skip validation as they were validated when added to mempool - ctx := e.ctx + applyCtx := ctx if batchData != nil && batchData.Batch != nil && batchData.ForceIncludedMask != nil { - ctx = coreexecutor.WithForceIncludedMask(ctx, batchData.ForceIncludedMask) + applyCtx = coreexecutor.WithForceIncludedMask(applyCtx, batchData.ForceIncludedMask) } - newState, err := e.applyBlock(ctx, header.Header, data) + newState, err := e.blockProducer.ApplyBlock(applyCtx, header.Header, data) if err != nil { return fmt.Errorf("failed to apply block: %w", err) } @@ -400,13 +414,13 @@ func (e *Executor) produceBlock() error { } header.Signature = signature - if err := e.validateBlock(currentState, header, data); err != nil { + if err := e.blockProducer.ValidateBlock(ctx, currentState, header, data); err != nil { e.sendCriticalError(fmt.Errorf("failed to validate block: %w", err)) e.logger.Error().Err(err).Msg("CRITICAL: Permanent block validation error - halting block production") return fmt.Errorf("failed to validate block: %w", err) } - batch, err := e.store.NewBatch(e.ctx) + batch, err := e.store.NewBatch(ctx) if err != nil { return fmt.Errorf("failed to create batch: %w", err) } @@ -431,9 +445,9 @@ func (e *Executor) produceBlock() error { e.setLastState(newState) // broadcast header and data to P2P network - g, ctx := errgroup.WithContext(e.ctx) - g.Go(func() error { return e.headerBroadcaster.WriteToStoreAndBroadcast(ctx, header) }) - g.Go(func() error { return e.dataBroadcaster.WriteToStoreAndBroadcast(ctx, data) }) + g, broadcastCtx := errgroup.WithContext(ctx) + g.Go(func() error { return e.headerBroadcaster.WriteToStoreAndBroadcast(broadcastCtx, header) }) + g.Go(func() error { return e.dataBroadcaster.WriteToStoreAndBroadcast(broadcastCtx, data) }) if err := g.Wait(); err != nil { e.logger.Error().Err(err).Msg("failed to broadcast header and/data") // don't fail block production on broadcast error @@ -449,8 +463,8 @@ func (e *Executor) produceBlock() error { return nil } -// retrieveBatch gets the next batch of transactions from the sequencer -func (e *Executor) retrieveBatch(ctx context.Context) (*BatchData, error) { +// RetrieveBatch gets the next batch of transactions from the sequencer. +func (e *Executor) RetrieveBatch(ctx context.Context) (*BatchData, error) { req := coresequencer.GetNextBatchRequest{ Id: []byte(e.genesis.ChainID), MaxBytes: common.DefaultMaxBlobSize, @@ -481,8 +495,8 @@ func (e *Executor) retrieveBatch(ctx context.Context) (*BatchData, error) { }, nil } -// createBlock creates a new block from the given batch -func (e *Executor) createBlock(ctx context.Context, height uint64, batchData *BatchData) (*types.SignedHeader, *types.Data, error) { +// CreateBlock creates a new block from the given batch. +func (e *Executor) CreateBlock(ctx context.Context, height uint64, batchData *BatchData) (*types.SignedHeader, *types.Data, error) { currentState := e.getLastState() headerTime := uint64(e.genesis.StartTime.UnixNano()) @@ -581,8 +595,8 @@ func (e *Executor) createBlock(ctx context.Context, height uint64, batchData *Ba return header, data, nil } -// applyBlock applies the block to get the new state -func (e *Executor) applyBlock(ctx context.Context, header types.Header, data *types.Data) (types.State, error) { +// ApplyBlock applies the block to get the new state. +func (e *Executor) ApplyBlock(ctx context.Context, header types.Header, data *types.Data) (types.State, error) { currentState := e.getLastState() // Prepare transactions @@ -654,8 +668,8 @@ func (e *Executor) executeTxsWithRetry(ctx context.Context, rawTxs [][]byte, hea return nil, nil } -// validateBlock validates the created block -func (e *Executor) validateBlock(lastState types.State, header *types.SignedHeader, data *types.Data) error { +// ValidateBlock validates the created block. +func (e *Executor) ValidateBlock(_ context.Context, lastState types.State, header *types.SignedHeader, data *types.Data) error { // Set custom verifier for aggregator node signature header.SetCustomVerifierForAggregator(e.options.AggregatorNodeSignatureBytesProvider) diff --git a/block/internal/executing/executor_lazy_test.go b/block/internal/executing/executor_lazy_test.go index a11cf6a1c..fdc29bb86 100644 --- a/block/internal/executing/executor_lazy_test.go +++ b/block/internal/executing/executor_lazy_test.go @@ -94,8 +94,8 @@ func TestLazyMode_ProduceBlockLogic(t *testing.T) { mockSeq.EXPECT().GetDAHeight().Return(uint64(0)).Once() - // Direct call to produceBlock should work (this is what lazy timer does) - err = exec.produceBlock() + // Direct call to ProduceBlock should work (this is what lazy timer does) + err = exec.ProduceBlock(exec.ctx) require.NoError(t, err) h1, err := memStore.Height(context.Background()) @@ -118,7 +118,7 @@ func TestLazyMode_ProduceBlockLogic(t *testing.T) { mockSeq.EXPECT().GetDAHeight().Return(uint64(0)).Once() - err = exec.produceBlock() + err = exec.ProduceBlock(exec.ctx) require.NoError(t, err) h2, err := memStore.Height(context.Background()) @@ -209,7 +209,7 @@ func TestRegularMode_ProduceBlockLogic(t *testing.T) { mockSeq.EXPECT().GetDAHeight().Return(uint64(0)).Once() - err = exec.produceBlock() + err = exec.ProduceBlock(exec.ctx) require.NoError(t, err) h1, err := memStore.Height(context.Background()) diff --git a/block/internal/executing/executor_logic_test.go b/block/internal/executing/executor_logic_test.go index 6029186e8..fe5cfa098 100644 --- a/block/internal/executing/executor_logic_test.go +++ b/block/internal/executing/executor_logic_test.go @@ -117,7 +117,7 @@ func TestProduceBlock_EmptyBatch_SetsEmptyDataHash(t *testing.T) { mockSeq.EXPECT().GetDAHeight().Return(uint64(0)).Once() // produce one block - err = exec.produceBlock() + err = exec.ProduceBlock(exec.ctx) require.NoError(t, err) // Verify height and stored block @@ -202,14 +202,14 @@ func TestPendingLimit_SkipsProduction(t *testing.T) { mockSeq.EXPECT().GetDAHeight().Return(uint64(0)).Once() - require.NoError(t, exec.produceBlock()) + require.NoError(t, exec.ProduceBlock(exec.ctx)) h1, err := memStore.Height(context.Background()) require.NoError(t, err) assert.Equal(t, uint64(1), h1) // With limit=1 and lastSubmitted default 0, pending >= 1 so next production should be skipped - // No new expectations; produceBlock should return early before hitting sequencer - require.NoError(t, exec.produceBlock()) + // No new expectations; ProduceBlock should return early before hitting sequencer + require.NoError(t, exec.ProduceBlock(exec.ctx)) h2, err := memStore.Height(context.Background()) require.NoError(t, err) assert.Equal(t, h1, h2, "height should not change when production is skipped") diff --git a/block/internal/executing/executor_restart_test.go b/block/internal/executing/executor_restart_test.go index 14daccddc..0df53d8e9 100644 --- a/block/internal/executing/executor_restart_test.go +++ b/block/internal/executing/executor_restart_test.go @@ -95,7 +95,7 @@ func TestExecutor_RestartUsesPendingHeader(t *testing.T) { mockSeq1.EXPECT().GetDAHeight().Return(uint64(0)).Once() - err = exec1.produceBlock() + err = exec1.ProduceBlock(exec1.ctx) require.NoError(t, err) // Verify first block was produced @@ -214,7 +214,7 @@ func TestExecutor_RestartUsesPendingHeader(t *testing.T) { // Note: mockSeq2 should NOT receive GetNextBatch calls because pending block should be used - err = exec2.produceBlock() + err = exec2.ProduceBlock(exec2.ctx) require.NoError(t, err) // Verify height advanced to 2 @@ -316,7 +316,7 @@ func TestExecutor_RestartNoPendingHeader(t *testing.T) { mockSeq1.EXPECT().GetDAHeight().Return(uint64(0)).Once() - err = exec1.produceBlock() + err = exec1.ProduceBlock(exec1.ctx) require.NoError(t, err) // Stop first executor @@ -372,7 +372,7 @@ func TestExecutor_RestartNoPendingHeader(t *testing.T) { mockSeq2.EXPECT().GetDAHeight().Return(uint64(0)).Once() - err = exec2.produceBlock() + err = exec2.ProduceBlock(exec2.ctx) require.NoError(t, err) // Verify normal operation diff --git a/block/internal/executing/tracing.go b/block/internal/executing/tracing.go new file mode 100644 index 000000000..fe5fae0ed --- /dev/null +++ b/block/internal/executing/tracing.go @@ -0,0 +1,121 @@ +package executing + +import ( + "context" + "encoding/hex" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + + "github.com/evstack/ev-node/types" +) + +var _ BlockProducer = (*tracedBlockProducer)(nil) + +// tracedBlockProducer decorates a BlockProducer with OpenTelemetry spans. +type tracedBlockProducer struct { + inner BlockProducer + tracer trace.Tracer +} + +// WithTracingBlockProducer decorates the provided BlockProducer with tracing spans. +func WithTracingBlockProducer(inner BlockProducer) BlockProducer { + return &tracedBlockProducer{ + inner: inner, + tracer: otel.Tracer("ev-node/block-executor"), + } +} + +func (t *tracedBlockProducer) ProduceBlock(ctx context.Context) error { + ctx, span := t.tracer.Start(ctx, "BlockExecutor.ProduceBlock") + defer span.End() + + err := t.inner.ProduceBlock(ctx) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + return err +} + +func (t *tracedBlockProducer) RetrieveBatch(ctx context.Context) (*BatchData, error) { + ctx, span := t.tracer.Start(ctx, "BlockExecutor.RetrieveBatch") + defer span.End() + + batchData, err := t.inner.RetrieveBatch(ctx) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + + if batchData != nil && batchData.Batch != nil { + span.SetAttributes( + attribute.Int("batch.tx_count", len(batchData.Transactions)), + ) + } + return batchData, nil +} + +func (t *tracedBlockProducer) CreateBlock(ctx context.Context, height uint64, batchData *BatchData) (*types.SignedHeader, *types.Data, error) { + txCount := 0 + if batchData != nil && batchData.Batch != nil { + txCount = len(batchData.Transactions) + } + + ctx, span := t.tracer.Start(ctx, "BlockExecutor.CreateBlock", + trace.WithAttributes( + attribute.Int64("block.height", int64(height)), + attribute.Int("tx.count", txCount), + ), + ) + defer span.End() + + header, data, err := t.inner.CreateBlock(ctx, height, batchData) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, nil, err + } + return header, data, nil +} + +func (t *tracedBlockProducer) ApplyBlock(ctx context.Context, header types.Header, data *types.Data) (types.State, error) { + ctx, span := t.tracer.Start(ctx, "BlockExecutor.ApplyBlock", + trace.WithAttributes( + attribute.Int64("block.height", int64(header.Height())), + attribute.Int("tx.count", len(data.Txs)), + ), + ) + defer span.End() + + state, err := t.inner.ApplyBlock(ctx, header, data) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return types.State{}, err + } + + span.SetAttributes( + attribute.String("state_root", hex.EncodeToString(state.AppHash)), + ) + return state, nil +} + +func (t *tracedBlockProducer) ValidateBlock(ctx context.Context, lastState types.State, header *types.SignedHeader, data *types.Data) error { + ctx, span := t.tracer.Start(ctx, "BlockExecutor.ValidateBlock", + trace.WithAttributes( + attribute.Int64("block.height", int64(header.Height())), + ), + ) + defer span.End() + + err := t.inner.ValidateBlock(ctx, lastState, header, data) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + return err +} diff --git a/block/internal/executing/tracing_test.go b/block/internal/executing/tracing_test.go new file mode 100644 index 000000000..c5c08ec74 --- /dev/null +++ b/block/internal/executing/tracing_test.go @@ -0,0 +1,344 @@ +package executing + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + + coresequencer "github.com/evstack/ev-node/core/sequencer" + "github.com/evstack/ev-node/types" +) + +// mockBlockProducer provides function hooks for testing the tracing decorator. +type mockBlockProducer struct { + produceBlockFn func(ctx context.Context) error + retrieveBatchFn func(ctx context.Context) (*BatchData, error) + createBlockFn func(ctx context.Context, height uint64, batchData *BatchData) (*types.SignedHeader, *types.Data, error) + applyBlockFn func(ctx context.Context, header types.Header, data *types.Data) (types.State, error) + validateBlockFn func(ctx context.Context, lastState types.State, header *types.SignedHeader, data *types.Data) error +} + +func (m *mockBlockProducer) ProduceBlock(ctx context.Context) error { + if m.produceBlockFn != nil { + return m.produceBlockFn(ctx) + } + return nil +} + +func (m *mockBlockProducer) RetrieveBatch(ctx context.Context) (*BatchData, error) { + if m.retrieveBatchFn != nil { + return m.retrieveBatchFn(ctx) + } + return nil, nil +} + +func (m *mockBlockProducer) CreateBlock(ctx context.Context, height uint64, batchData *BatchData) (*types.SignedHeader, *types.Data, error) { + if m.createBlockFn != nil { + return m.createBlockFn(ctx, height, batchData) + } + return nil, nil, nil +} + +func (m *mockBlockProducer) ApplyBlock(ctx context.Context, header types.Header, data *types.Data) (types.State, error) { + if m.applyBlockFn != nil { + return m.applyBlockFn(ctx, header, data) + } + return types.State{}, nil +} + +func (m *mockBlockProducer) ValidateBlock(ctx context.Context, lastState types.State, header *types.SignedHeader, data *types.Data) error { + if m.validateBlockFn != nil { + return m.validateBlockFn(ctx, lastState, header, data) + } + return nil +} + +func setupBlockProducerTrace(t *testing.T, inner BlockProducer) (BlockProducer, *tracetest.SpanRecorder) { + t.Helper() + sr := tracetest.NewSpanRecorder() + tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + t.Cleanup(func() { _ = tp.Shutdown(context.Background()) }) + otel.SetTracerProvider(tp) + return WithTracingBlockProducer(inner), sr +} + +func TestTracedBlockProducer_ProduceBlock_Success(t *testing.T) { + mock := &mockBlockProducer{ + produceBlockFn: func(ctx context.Context) error { + return nil + }, + } + producer, sr := setupBlockProducerTrace(t, mock) + ctx := context.Background() + + err := producer.ProduceBlock(ctx) + require.NoError(t, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "BlockExecutor.ProduceBlock", span.Name()) + require.Equal(t, codes.Unset, span.Status().Code) +} + +func TestTracedBlockProducer_ProduceBlock_Error(t *testing.T) { + mock := &mockBlockProducer{ + produceBlockFn: func(ctx context.Context) error { + return errors.New("production failed") + }, + } + producer, sr := setupBlockProducerTrace(t, mock) + ctx := context.Background() + + err := producer.ProduceBlock(ctx) + require.Error(t, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, codes.Error, span.Status().Code) + require.Equal(t, "production failed", span.Status().Description) +} + +func TestTracedBlockProducer_RetrieveBatch_Success(t *testing.T) { + mock := &mockBlockProducer{ + retrieveBatchFn: func(ctx context.Context) (*BatchData, error) { + return &BatchData{ + Batch: &coresequencer.Batch{ + Transactions: [][]byte{[]byte("tx1"), []byte("tx2")}, + }, + }, nil + }, + } + producer, sr := setupBlockProducerTrace(t, mock) + ctx := context.Background() + + batch, err := producer.RetrieveBatch(ctx) + require.NoError(t, err) + require.NotNil(t, batch) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "BlockExecutor.RetrieveBatch", span.Name()) + require.Equal(t, codes.Unset, span.Status().Code) + + attrs := span.Attributes() + requireAttribute(t, attrs, "batch.tx_count", 2) +} + +func TestTracedBlockProducer_RetrieveBatch_Error(t *testing.T) { + mock := &mockBlockProducer{ + retrieveBatchFn: func(ctx context.Context) (*BatchData, error) { + return nil, errors.New("no batch available") + }, + } + producer, sr := setupBlockProducerTrace(t, mock) + ctx := context.Background() + + _, err := producer.RetrieveBatch(ctx) + require.Error(t, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, codes.Error, span.Status().Code) + require.Equal(t, "no batch available", span.Status().Description) +} + +func TestTracedBlockProducer_CreateBlock_Success(t *testing.T) { + mock := &mockBlockProducer{ + createBlockFn: func(ctx context.Context, height uint64, batchData *BatchData) (*types.SignedHeader, *types.Data, error) { + return &types.SignedHeader{}, &types.Data{}, nil + }, + } + producer, sr := setupBlockProducerTrace(t, mock) + ctx := context.Background() + + batchData := &BatchData{ + Batch: &coresequencer.Batch{ + Transactions: [][]byte{[]byte("tx1"), []byte("tx2"), []byte("tx3")}, + }, + } + + header, data, err := producer.CreateBlock(ctx, 100, batchData) + require.NoError(t, err) + require.NotNil(t, header) + require.NotNil(t, data) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "BlockExecutor.CreateBlock", span.Name()) + require.Equal(t, codes.Unset, span.Status().Code) + + attrs := span.Attributes() + requireAttribute(t, attrs, "block.height", int64(100)) + requireAttribute(t, attrs, "tx.count", 3) +} + +func TestTracedBlockProducer_CreateBlock_Error(t *testing.T) { + mock := &mockBlockProducer{ + createBlockFn: func(ctx context.Context, height uint64, batchData *BatchData) (*types.SignedHeader, *types.Data, error) { + return nil, nil, errors.New("failed to create block") + }, + } + producer, sr := setupBlockProducerTrace(t, mock) + ctx := context.Background() + + _, _, err := producer.CreateBlock(ctx, 100, nil) + require.Error(t, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, codes.Error, span.Status().Code) + require.Equal(t, "failed to create block", span.Status().Description) +} + +func TestTracedBlockProducer_ApplyBlock_Success(t *testing.T) { + mock := &mockBlockProducer{ + applyBlockFn: func(ctx context.Context, header types.Header, data *types.Data) (types.State, error) { + return types.State{ + AppHash: []byte{0xde, 0xad, 0xbe, 0xef}, + }, nil + }, + } + producer, sr := setupBlockProducerTrace(t, mock) + ctx := context.Background() + + header := types.Header{ + BaseHeader: types.BaseHeader{ + Height: 50, + }, + } + data := &types.Data{ + Txs: types.Txs{[]byte("tx1"), []byte("tx2")}, + } + + state, err := producer.ApplyBlock(ctx, header, data) + require.NoError(t, err) + require.NotEmpty(t, state.AppHash) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "BlockExecutor.ApplyBlock", span.Name()) + require.Equal(t, codes.Unset, span.Status().Code) + + attrs := span.Attributes() + requireAttribute(t, attrs, "block.height", int64(50)) + requireAttribute(t, attrs, "tx.count", 2) + requireAttribute(t, attrs, "state_root", "deadbeef") +} + +func TestTracedBlockProducer_ApplyBlock_Error(t *testing.T) { + mock := &mockBlockProducer{ + applyBlockFn: func(ctx context.Context, header types.Header, data *types.Data) (types.State, error) { + return types.State{}, errors.New("execution failed") + }, + } + producer, sr := setupBlockProducerTrace(t, mock) + ctx := context.Background() + + header := types.Header{ + BaseHeader: types.BaseHeader{ + Height: 50, + }, + } + + _, err := producer.ApplyBlock(ctx, header, &types.Data{}) + require.Error(t, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, codes.Error, span.Status().Code) + require.Equal(t, "execution failed", span.Status().Description) +} + +func TestTracedBlockProducer_ValidateBlock_Success(t *testing.T) { + mock := &mockBlockProducer{ + validateBlockFn: func(ctx context.Context, lastState types.State, header *types.SignedHeader, data *types.Data) error { + return nil + }, + } + producer, sr := setupBlockProducerTrace(t, mock) + ctx := context.Background() + + header := &types.SignedHeader{ + Header: types.Header{ + BaseHeader: types.BaseHeader{ + Height: 75, + }, + }, + } + + err := producer.ValidateBlock(ctx, types.State{}, header, &types.Data{}) + require.NoError(t, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "BlockExecutor.ValidateBlock", span.Name()) + require.Equal(t, codes.Unset, span.Status().Code) + + attrs := span.Attributes() + requireAttribute(t, attrs, "block.height", int64(75)) +} + +func TestTracedBlockProducer_ValidateBlock_Error(t *testing.T) { + mock := &mockBlockProducer{ + validateBlockFn: func(ctx context.Context, lastState types.State, header *types.SignedHeader, data *types.Data) error { + return errors.New("validation failed") + }, + } + producer, sr := setupBlockProducerTrace(t, mock) + ctx := context.Background() + + header := &types.SignedHeader{ + Header: types.Header{ + BaseHeader: types.BaseHeader{ + Height: 75, + }, + }, + } + + err := producer.ValidateBlock(ctx, types.State{}, header, &types.Data{}) + require.Error(t, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, codes.Error, span.Status().Code) + require.Equal(t, "validation failed", span.Status().Description) +} + +func requireAttribute(t *testing.T, attrs []attribute.KeyValue, key string, expected interface{}) { + t.Helper() + found := false + for _, attr := range attrs { + if string(attr.Key) == key { + found = true + switch v := expected.(type) { + case string: + require.Equal(t, v, attr.Value.AsString()) + case int64: + require.Equal(t, v, attr.Value.AsInt64()) + case int: + require.Equal(t, int64(v), attr.Value.AsInt64()) + default: + t.Fatalf("unsupported attribute type: %T", expected) + } + break + } + } + require.True(t, found, "attribute %s not found", key) +}