From 200f4d6ff1688c4342549fa531188a3f3d511d20 Mon Sep 17 00:00:00 2001 From: chatton Date: Wed, 14 Jan 2026 16:20:52 +0000 Subject: [PATCH 1/5] feat: add sequencer tracing instrumentation Add OpenTelemetry tracing for the core Sequencer interface. This traces all three main operations: - SubmitBatchTxs: tracks tx count and batch size - GetNextBatch: tracks tx count, forced inclusion count, batch size - VerifyBatch: tracks batch data count and verification result The tracing wrapper can be used with any Sequencer implementation (single, based, etc.) via WithTracingSequencer(). --- block/components.go | 6 + pkg/telemetry/sequencer_tracing.go | 128 +++++++++++ pkg/telemetry/sequencer_tracing_test.go | 293 ++++++++++++++++++++++++ 3 files changed, 427 insertions(+) create mode 100644 pkg/telemetry/sequencer_tracing.go create mode 100644 pkg/telemetry/sequencer_tracing_test.go diff --git a/block/components.go b/block/components.go index 3d276206b..f37c10737 100644 --- a/block/components.go +++ b/block/components.go @@ -17,6 +17,7 @@ import ( coreexecutor "github.com/evstack/ev-node/core/execution" coresequencer "github.com/evstack/ev-node/core/sequencer" "github.com/evstack/ev-node/pkg/config" + "github.com/evstack/ev-node/pkg/telemetry" "github.com/evstack/ev-node/pkg/genesis" "github.com/evstack/ev-node/pkg/signer" "github.com/evstack/ev-node/pkg/store" @@ -205,6 +206,11 @@ func NewAggregatorComponents( // error channel for critical failures errorCh := make(chan error, 1) + // wrap sequencer with tracing if enabled + if config.Instrumentation.IsTracingEnabled() { + sequencer = telemetry.WithTracingSequencer(sequencer) + } + executor, err := executing.NewExecutor( store, exec, diff --git a/pkg/telemetry/sequencer_tracing.go b/pkg/telemetry/sequencer_tracing.go new file mode 100644 index 000000000..ce6902e67 --- /dev/null +++ b/pkg/telemetry/sequencer_tracing.go @@ -0,0 +1,128 @@ +package telemetry + +import ( + "context" + "encoding/hex" + + coresequencer "github.com/evstack/ev-node/core/sequencer" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" +) + +var _ coresequencer.Sequencer = (*tracedSequencer)(nil) + +// tracedSequencer decorates a Sequencer with OpenTelemetry spans. +type tracedSequencer struct { + inner coresequencer.Sequencer + tracer trace.Tracer +} + +// WithTracingSequencer decorates the provided Sequencer with tracing spans. +func WithTracingSequencer(inner coresequencer.Sequencer) coresequencer.Sequencer { + return &tracedSequencer{ + inner: inner, + tracer: otel.Tracer("ev-node/sequencer"), + } +} + +func (t *tracedSequencer) SubmitBatchTxs(ctx context.Context, req coresequencer.SubmitBatchTxsRequest) (*coresequencer.SubmitBatchTxsResponse, error) { + txCount := 0 + totalBytes := 0 + if req.Batch != nil { + txCount = len(req.Batch.Transactions) + for _, tx := range req.Batch.Transactions { + totalBytes += len(tx) + } + } + + ctx, span := t.tracer.Start(ctx, "Sequencer.SubmitBatchTxs", + trace.WithAttributes( + attribute.String("chain.id", hex.EncodeToString(req.Id)), + attribute.Int("tx.count", txCount), + attribute.Int("batch.size_bytes", totalBytes), + ), + ) + defer span.End() + + res, err := t.inner.SubmitBatchTxs(ctx, req) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + + return res, nil +} + +func (t *tracedSequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextBatchRequest) (*coresequencer.GetNextBatchResponse, error) { + ctx, span := t.tracer.Start(ctx, "Sequencer.GetNextBatch", + trace.WithAttributes( + attribute.String("chain.id", hex.EncodeToString(req.Id)), + attribute.Int64("max_bytes", int64(req.MaxBytes)), + ), + ) + defer span.End() + + res, err := t.inner.GetNextBatch(ctx, req) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + + if res.Batch != nil { + txCount := len(res.Batch.Transactions) + forcedCount := 0 + for _, forced := range res.Batch.ForceIncludedMask { + if forced { + forcedCount++ + } + } + totalBytes := 0 + for _, tx := range res.Batch.Transactions { + totalBytes += len(tx) + } + + span.SetAttributes( + attribute.Int("tx.count", txCount), + attribute.Int("forced_inclusion.count", forcedCount), + attribute.Int("batch.size_bytes", totalBytes), + attribute.Int64("timestamp", res.Timestamp.Unix()), + ) + } + + return res, nil +} + +func (t *tracedSequencer) VerifyBatch(ctx context.Context, req coresequencer.VerifyBatchRequest) (*coresequencer.VerifyBatchResponse, error) { + ctx, span := t.tracer.Start(ctx, "Sequencer.VerifyBatch", + trace.WithAttributes( + attribute.String("chain.id", hex.EncodeToString(req.Id)), + attribute.Int("batch_data.count", len(req.BatchData)), + ), + ) + defer span.End() + + res, err := t.inner.VerifyBatch(ctx, req) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + + span.SetAttributes( + attribute.Bool("verified", res.Status), + ) + + return res, nil +} + +func (t *tracedSequencer) SetDAHeight(height uint64) { + t.inner.SetDAHeight(height) +} + +func (t *tracedSequencer) GetDAHeight() uint64 { + return t.inner.GetDAHeight() +} diff --git a/pkg/telemetry/sequencer_tracing_test.go b/pkg/telemetry/sequencer_tracing_test.go new file mode 100644 index 000000000..244024075 --- /dev/null +++ b/pkg/telemetry/sequencer_tracing_test.go @@ -0,0 +1,293 @@ +package telemetry + +import ( + "context" + "errors" + "testing" + "time" + + coresequencer "github.com/evstack/ev-node/core/sequencer" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" +) + +type mockSequencer struct { + submitBatchTxsFn func(context.Context, coresequencer.SubmitBatchTxsRequest) (*coresequencer.SubmitBatchTxsResponse, error) + getNextBatchFn func(context.Context, coresequencer.GetNextBatchRequest) (*coresequencer.GetNextBatchResponse, error) + verifyBatchFn func(context.Context, coresequencer.VerifyBatchRequest) (*coresequencer.VerifyBatchResponse, error) + daHeight uint64 +} + +func (m *mockSequencer) SubmitBatchTxs(ctx context.Context, req coresequencer.SubmitBatchTxsRequest) (*coresequencer.SubmitBatchTxsResponse, error) { + if m.submitBatchTxsFn != nil { + return m.submitBatchTxsFn(ctx, req) + } + return &coresequencer.SubmitBatchTxsResponse{}, nil +} + +func (m *mockSequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextBatchRequest) (*coresequencer.GetNextBatchResponse, error) { + if m.getNextBatchFn != nil { + return m.getNextBatchFn(ctx, req) + } + return &coresequencer.GetNextBatchResponse{ + Batch: &coresequencer.Batch{}, + Timestamp: time.Now(), + }, nil +} + +func (m *mockSequencer) VerifyBatch(ctx context.Context, req coresequencer.VerifyBatchRequest) (*coresequencer.VerifyBatchResponse, error) { + if m.verifyBatchFn != nil { + return m.verifyBatchFn(ctx, req) + } + return &coresequencer.VerifyBatchResponse{Status: true}, nil +} + +func (m *mockSequencer) SetDAHeight(height uint64) { + m.daHeight = height +} + +func (m *mockSequencer) GetDAHeight() uint64 { + return m.daHeight +} + +var _ coresequencer.Sequencer = (*mockSequencer)(nil) + +func setupSequencerTrace(t *testing.T, inner coresequencer.Sequencer) (coresequencer.Sequencer, *tracetest.SpanRecorder) { + t.Helper() + sr := tracetest.NewSpanRecorder() + tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + t.Cleanup(func() { _ = tp.Shutdown(context.Background()) }) + otel.SetTracerProvider(tp) + return WithTracingSequencer(inner), sr +} + +func TestTracedSequencer_SubmitBatchTxs_Success(t *testing.T) { + mock := &mockSequencer{ + submitBatchTxsFn: func(ctx context.Context, req coresequencer.SubmitBatchTxsRequest) (*coresequencer.SubmitBatchTxsResponse, error) { + return &coresequencer.SubmitBatchTxsResponse{}, nil + }, + } + seq, sr := setupSequencerTrace(t, mock) + ctx := context.Background() + + req := coresequencer.SubmitBatchTxsRequest{ + Id: []byte("test-chain"), + Batch: &coresequencer.Batch{ + Transactions: [][]byte{[]byte("tx1"), []byte("tx2"), []byte("tx3")}, + }, + } + + res, err := seq.SubmitBatchTxs(ctx, req) + require.NoError(t, err) + require.NotNil(t, res) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "Sequencer.SubmitBatchTxs", span.Name()) + require.Equal(t, codes.Unset, span.Status().Code) + + attrs := span.Attributes() + requireSequencerAttribute(t, attrs, "tx.count", 3) + requireSequencerAttribute(t, attrs, "batch.size_bytes", 9) // "tx1" + "tx2" + "tx3" = 9 bytes +} + +func TestTracedSequencer_SubmitBatchTxs_Error(t *testing.T) { + mock := &mockSequencer{ + submitBatchTxsFn: func(ctx context.Context, req coresequencer.SubmitBatchTxsRequest) (*coresequencer.SubmitBatchTxsResponse, error) { + return nil, errors.New("queue full") + }, + } + seq, sr := setupSequencerTrace(t, mock) + ctx := context.Background() + + req := coresequencer.SubmitBatchTxsRequest{ + Id: []byte("test-chain"), + Batch: &coresequencer.Batch{Transactions: [][]byte{[]byte("tx1")}}, + } + + _, err := seq.SubmitBatchTxs(ctx, req) + require.Error(t, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, codes.Error, span.Status().Code) +} + +func TestTracedSequencer_GetNextBatch_Success(t *testing.T) { + mock := &mockSequencer{ + getNextBatchFn: func(ctx context.Context, req coresequencer.GetNextBatchRequest) (*coresequencer.GetNextBatchResponse, error) { + return &coresequencer.GetNextBatchResponse{ + Batch: &coresequencer.Batch{ + Transactions: [][]byte{[]byte("tx1"), []byte("forced-tx")}, + ForceIncludedMask: []bool{false, true}, + }, + Timestamp: time.Unix(1700000000, 0), + }, nil + }, + } + seq, sr := setupSequencerTrace(t, mock) + ctx := context.Background() + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000, + } + + res, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + require.NotNil(t, res) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "Sequencer.GetNextBatch", span.Name()) + require.Equal(t, codes.Unset, span.Status().Code) + + attrs := span.Attributes() + requireSequencerAttribute(t, attrs, "tx.count", 2) + requireSequencerAttribute(t, attrs, "forced_inclusion.count", 1) + requireSequencerAttribute(t, attrs, "max_bytes", int64(1000)) +} + +func TestTracedSequencer_GetNextBatch_Error(t *testing.T) { + mock := &mockSequencer{ + getNextBatchFn: func(ctx context.Context, req coresequencer.GetNextBatchRequest) (*coresequencer.GetNextBatchResponse, error) { + return nil, errors.New("failed to fetch from DA") + }, + } + seq, sr := setupSequencerTrace(t, mock) + ctx := context.Background() + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000, + } + + _, err := seq.GetNextBatch(ctx, req) + require.Error(t, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, codes.Error, span.Status().Code) +} + +func TestTracedSequencer_VerifyBatch_Success(t *testing.T) { + mock := &mockSequencer{ + verifyBatchFn: func(ctx context.Context, req coresequencer.VerifyBatchRequest) (*coresequencer.VerifyBatchResponse, error) { + return &coresequencer.VerifyBatchResponse{Status: true}, nil + }, + } + seq, sr := setupSequencerTrace(t, mock) + ctx := context.Background() + + req := coresequencer.VerifyBatchRequest{ + Id: []byte("test-chain"), + BatchData: [][]byte{[]byte("proof1"), []byte("proof2")}, + } + + res, err := seq.VerifyBatch(ctx, req) + require.NoError(t, err) + require.NotNil(t, res) + require.True(t, res.Status) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "Sequencer.VerifyBatch", span.Name()) + + attrs := span.Attributes() + requireSequencerAttribute(t, attrs, "batch_data.count", 2) + requireSequencerAttribute(t, attrs, "verified", true) +} + +func TestTracedSequencer_VerifyBatch_Failure(t *testing.T) { + mock := &mockSequencer{ + verifyBatchFn: func(ctx context.Context, req coresequencer.VerifyBatchRequest) (*coresequencer.VerifyBatchResponse, error) { + return &coresequencer.VerifyBatchResponse{Status: false}, nil + }, + } + seq, sr := setupSequencerTrace(t, mock) + ctx := context.Background() + + req := coresequencer.VerifyBatchRequest{ + Id: []byte("test-chain"), + BatchData: [][]byte{[]byte("invalid-proof")}, + } + + res, err := seq.VerifyBatch(ctx, req) + require.NoError(t, err) + require.NotNil(t, res) + require.False(t, res.Status) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + + attrs := span.Attributes() + requireSequencerAttribute(t, attrs, "verified", false) +} + +func TestTracedSequencer_VerifyBatch_Error(t *testing.T) { + mock := &mockSequencer{ + verifyBatchFn: func(ctx context.Context, req coresequencer.VerifyBatchRequest) (*coresequencer.VerifyBatchResponse, error) { + return nil, errors.New("failed to get proofs") + }, + } + seq, sr := setupSequencerTrace(t, mock) + ctx := context.Background() + + req := coresequencer.VerifyBatchRequest{ + Id: []byte("test-chain"), + BatchData: [][]byte{[]byte("proof")}, + } + + _, err := seq.VerifyBatch(ctx, req) + require.Error(t, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, codes.Error, span.Status().Code) +} + +func TestTracedSequencer_DAHeightPassthrough(t *testing.T) { + mock := &mockSequencer{} + seq, _ := setupSequencerTrace(t, mock) + + seq.SetDAHeight(100) + require.Equal(t, uint64(100), seq.GetDAHeight()) + + seq.SetDAHeight(200) + require.Equal(t, uint64(200), seq.GetDAHeight()) +} + +func requireSequencerAttribute(t *testing.T, attrs []attribute.KeyValue, key string, expected interface{}) { + t.Helper() + found := false + for _, attr := range attrs { + if string(attr.Key) == key { + found = true + switch v := expected.(type) { + case string: + require.Equal(t, v, attr.Value.AsString()) + case int64: + require.Equal(t, v, attr.Value.AsInt64()) + case int: + require.Equal(t, int64(v), attr.Value.AsInt64()) + case bool: + require.Equal(t, v, attr.Value.AsBool()) + default: + t.Fatalf("unsupported attribute type: %T", expected) + } + break + } + } + require.True(t, found, "attribute %s not found", key) +} From 1ef693a891c38fa9f3e8efd31d1a23fcff7fb220 Mon Sep 17 00:00:00 2001 From: chatton Date: Mon, 19 Jan 2026 08:47:49 +0000 Subject: [PATCH 2/5] chore: using helper fn instead of having it duplicated --- block/components.go | 2 +- block/internal/da/tracing_test.go | 37 +++------------ block/internal/executing/tracing_test.go | 38 ++++------------ block/internal/syncing/tracing_test.go | 42 ++++------------- execution/evm/eth_rpc_tracing_test.go | 48 ++++++-------------- pkg/rpc/server/tracing_test.go | 58 +++++++----------------- pkg/telemetry/executor_tracing_test.go | 46 +++++-------------- pkg/telemetry/sequencer_tracing_test.go | 42 ++++------------- pkg/telemetry/testutil/attributes.go | 34 ++++++++++++++ 9 files changed, 110 insertions(+), 237 deletions(-) create mode 100644 pkg/telemetry/testutil/attributes.go diff --git a/block/components.go b/block/components.go index f37c10737..a7a883bb3 100644 --- a/block/components.go +++ b/block/components.go @@ -17,10 +17,10 @@ import ( coreexecutor "github.com/evstack/ev-node/core/execution" coresequencer "github.com/evstack/ev-node/core/sequencer" "github.com/evstack/ev-node/pkg/config" - "github.com/evstack/ev-node/pkg/telemetry" "github.com/evstack/ev-node/pkg/genesis" "github.com/evstack/ev-node/pkg/signer" "github.com/evstack/ev-node/pkg/store" + "github.com/evstack/ev-node/pkg/telemetry" "github.com/evstack/ev-node/types" ) diff --git a/block/internal/da/tracing_test.go b/block/internal/da/tracing_test.go index ca288770c..ea01c9e42 100644 --- a/block/internal/da/tracing_test.go +++ b/block/internal/da/tracing_test.go @@ -7,12 +7,12 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" datypes "github.com/evstack/ev-node/pkg/da/types" + "github.com/evstack/ev-node/pkg/telemetry/testutil" ) // mockFullClient provides function hooks for testing the tracing decorator. @@ -87,8 +87,8 @@ func TestTracedDA_Submit_Success(t *testing.T) { require.Equal(t, codes.Unset, span.Status().Code) attrs := span.Attributes() - requireAttribute(t, attrs, "blob.count", 2) - requireAttribute(t, attrs, "blob.total_size_bytes", 3) + testutil.RequireAttribute(t, attrs, "blob.count", 2) + testutil.RequireAttribute(t, attrs, "blob.total_size_bytes", 3) // namespace hex string length assertion // 2 bytes = 4 hex characters foundNS := false @@ -134,8 +134,8 @@ func TestTracedDA_Retrieve_Success(t *testing.T) { span := spans[0] require.Equal(t, "DA.Retrieve", span.Name()) attrs := span.Attributes() - requireAttribute(t, attrs, "ns.length", 1) - requireAttribute(t, attrs, "blob.count", 2) + testutil.RequireAttribute(t, attrs, "ns.length", 1) + testutil.RequireAttribute(t, attrs, "blob.count", 2) } func TestTracedDA_Retrieve_Error(t *testing.T) { @@ -174,8 +174,8 @@ func TestTracedDA_Get_Success(t *testing.T) { span := spans[0] require.Equal(t, "DA.Get", span.Name()) attrs := span.Attributes() - requireAttribute(t, attrs, "id.count", 2) - requireAttribute(t, attrs, "blob.count", 2) + testutil.RequireAttribute(t, attrs, "id.count", 2) + testutil.RequireAttribute(t, attrs, "blob.count", 2) } func TestTracedDA_Get_Error(t *testing.T) { @@ -197,26 +197,3 @@ func TestTracedDA_Get_Error(t *testing.T) { require.Equal(t, codes.Error, span.Status().Code) require.Equal(t, "get failed", span.Status().Description) } - -// helper copied from eth tracing tests -func requireAttribute(t *testing.T, attrs []attribute.KeyValue, key string, expected interface{}) { - t.Helper() - found := false - for _, attr := range attrs { - if string(attr.Key) == key { - found = true - switch v := expected.(type) { - case string: - require.Equal(t, v, attr.Value.AsString()) - case int64: - require.Equal(t, v, attr.Value.AsInt64()) - case int: - require.Equal(t, int64(v), attr.Value.AsInt64()) - default: - t.Fatalf("unsupported attribute type: %T", expected) - } - break - } - } - require.True(t, found, "attribute %s not found", key) -} diff --git a/block/internal/executing/tracing_test.go b/block/internal/executing/tracing_test.go index c5c08ec74..fa5f4bebc 100644 --- a/block/internal/executing/tracing_test.go +++ b/block/internal/executing/tracing_test.go @@ -7,12 +7,12 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" coresequencer "github.com/evstack/ev-node/core/sequencer" + "github.com/evstack/ev-node/pkg/telemetry/testutil" "github.com/evstack/ev-node/types" ) @@ -131,7 +131,7 @@ func TestTracedBlockProducer_RetrieveBatch_Success(t *testing.T) { require.Equal(t, codes.Unset, span.Status().Code) attrs := span.Attributes() - requireAttribute(t, attrs, "batch.tx_count", 2) + testutil.RequireAttribute(t, attrs, "batch.tx_count", 2) } func TestTracedBlockProducer_RetrieveBatch_Error(t *testing.T) { @@ -180,8 +180,8 @@ func TestTracedBlockProducer_CreateBlock_Success(t *testing.T) { require.Equal(t, codes.Unset, span.Status().Code) attrs := span.Attributes() - requireAttribute(t, attrs, "block.height", int64(100)) - requireAttribute(t, attrs, "tx.count", 3) + testutil.RequireAttribute(t, attrs, "block.height", int64(100)) + testutil.RequireAttribute(t, attrs, "tx.count", 3) } func TestTracedBlockProducer_CreateBlock_Error(t *testing.T) { @@ -234,9 +234,9 @@ func TestTracedBlockProducer_ApplyBlock_Success(t *testing.T) { require.Equal(t, codes.Unset, span.Status().Code) attrs := span.Attributes() - requireAttribute(t, attrs, "block.height", int64(50)) - requireAttribute(t, attrs, "tx.count", 2) - requireAttribute(t, attrs, "state_root", "deadbeef") + testutil.RequireAttribute(t, attrs, "block.height", int64(50)) + testutil.RequireAttribute(t, attrs, "tx.count", 2) + testutil.RequireAttribute(t, attrs, "state_root", "deadbeef") } func TestTracedBlockProducer_ApplyBlock_Error(t *testing.T) { @@ -291,7 +291,7 @@ func TestTracedBlockProducer_ValidateBlock_Success(t *testing.T) { require.Equal(t, codes.Unset, span.Status().Code) attrs := span.Attributes() - requireAttribute(t, attrs, "block.height", int64(75)) + testutil.RequireAttribute(t, attrs, "block.height", int64(75)) } func TestTracedBlockProducer_ValidateBlock_Error(t *testing.T) { @@ -320,25 +320,3 @@ func TestTracedBlockProducer_ValidateBlock_Error(t *testing.T) { require.Equal(t, codes.Error, span.Status().Code) require.Equal(t, "validation failed", span.Status().Description) } - -func requireAttribute(t *testing.T, attrs []attribute.KeyValue, key string, expected interface{}) { - t.Helper() - found := false - for _, attr := range attrs { - if string(attr.Key) == key { - found = true - switch v := expected.(type) { - case string: - require.Equal(t, v, attr.Value.AsString()) - case int64: - require.Equal(t, v, attr.Value.AsInt64()) - case int: - require.Equal(t, int64(v), attr.Value.AsInt64()) - default: - t.Fatalf("unsupported attribute type: %T", expected) - } - break - } - } - require.True(t, found, "attribute %s not found", key) -} diff --git a/block/internal/syncing/tracing_test.go b/block/internal/syncing/tracing_test.go index d0d398301..679f3f7a3 100644 --- a/block/internal/syncing/tracing_test.go +++ b/block/internal/syncing/tracing_test.go @@ -7,12 +7,12 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" "github.com/evstack/ev-node/block/internal/common" + "github.com/evstack/ev-node/pkg/telemetry/testutil" "github.com/evstack/ev-node/types" ) @@ -92,9 +92,9 @@ func TestTracedBlockSyncer_TrySyncNextBlock_Success(t *testing.T) { require.Equal(t, codes.Unset, span.Status().Code) attrs := span.Attributes() - requireAttribute(t, attrs, "block.height", int64(100)) - requireAttribute(t, attrs, "da.height", int64(50)) - requireAttribute(t, attrs, "source", string(common.SourceDA)) + testutil.RequireAttribute(t, attrs, "block.height", int64(100)) + testutil.RequireAttribute(t, attrs, "da.height", int64(50)) + testutil.RequireAttribute(t, attrs, "source", string(common.SourceDA)) } func TestTracedBlockSyncer_TrySyncNextBlock_Error(t *testing.T) { @@ -159,9 +159,9 @@ func TestTracedBlockSyncer_ApplyBlock_Success(t *testing.T) { require.Equal(t, codes.Unset, span.Status().Code) attrs := span.Attributes() - requireAttribute(t, attrs, "block.height", int64(50)) - requireAttribute(t, attrs, "tx.count", 2) - requireAttribute(t, attrs, "state_root", "deadbeef") + testutil.RequireAttribute(t, attrs, "block.height", int64(50)) + testutil.RequireAttribute(t, attrs, "tx.count", 2) + testutil.RequireAttribute(t, attrs, "state_root", "deadbeef") } func TestTracedBlockSyncer_ApplyBlock_Error(t *testing.T) { @@ -216,7 +216,7 @@ func TestTracedBlockSyncer_ValidateBlock_Success(t *testing.T) { require.Equal(t, codes.Unset, span.Status().Code) attrs := span.Attributes() - requireAttribute(t, attrs, "block.height", int64(75)) + testutil.RequireAttribute(t, attrs, "block.height", int64(75)) } func TestTracedBlockSyncer_ValidateBlock_Error(t *testing.T) { @@ -274,8 +274,8 @@ func TestTracedBlockSyncer_VerifyForcedInclusionTxs_Success(t *testing.T) { require.Equal(t, codes.Unset, span.Status().Code) attrs := span.Attributes() - requireAttribute(t, attrs, "block.height", int64(100)) - requireAttribute(t, attrs, "da.height", int64(50)) + testutil.RequireAttribute(t, attrs, "block.height", int64(100)) + testutil.RequireAttribute(t, attrs, "da.height", int64(50)) } func TestTracedBlockSyncer_VerifyForcedInclusionTxs_Error(t *testing.T) { @@ -305,25 +305,3 @@ func TestTracedBlockSyncer_VerifyForcedInclusionTxs_Error(t *testing.T) { require.Equal(t, codes.Error, span.Status().Code) require.Equal(t, "forced inclusion verification failed", span.Status().Description) } - -func requireAttribute(t *testing.T, attrs []attribute.KeyValue, key string, expected interface{}) { - t.Helper() - found := false - for _, attr := range attrs { - if string(attr.Key) == key { - found = true - switch v := expected.(type) { - case string: - require.Equal(t, v, attr.Value.AsString()) - case int64: - require.Equal(t, v, attr.Value.AsInt64()) - case int: - require.Equal(t, int64(v), attr.Value.AsInt64()) - default: - t.Fatalf("unsupported attribute type: %T", expected) - } - break - } - } - require.True(t, found, "attribute %s not found", key) -} diff --git a/execution/evm/eth_rpc_tracing_test.go b/execution/evm/eth_rpc_tracing_test.go index 832a03fef..4d33e899b 100644 --- a/execution/evm/eth_rpc_tracing_test.go +++ b/execution/evm/eth_rpc_tracing_test.go @@ -9,10 +9,11 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" + + "github.com/evstack/ev-node/pkg/telemetry/testutil" ) // setupTestEthRPCTracing creates a traced eth RPC client with an in-memory span recorder @@ -90,13 +91,13 @@ func TestTracedEthRPCClient_HeaderByNumber_Success(t *testing.T) { // verify attributes attrs := span.Attributes() - requireAttribute(t, attrs, "method", "eth_getBlockByNumber") - requireAttribute(t, attrs, "block_number", "100") - requireAttribute(t, attrs, "block_hash", expectedHeader.Hash().Hex()) - requireAttribute(t, attrs, "state_root", expectedHeader.Root.Hex()) - requireAttribute(t, attrs, "gas_limit", int64(expectedHeader.GasLimit)) - requireAttribute(t, attrs, "gas_used", int64(expectedHeader.GasUsed)) - requireAttribute(t, attrs, "timestamp", int64(expectedHeader.Time)) + testutil.RequireAttribute(t, attrs, "method", "eth_getBlockByNumber") + testutil.RequireAttribute(t, attrs, "block_number", "100") + testutil.RequireAttribute(t, attrs, "block_hash", expectedHeader.Hash().Hex()) + testutil.RequireAttribute(t, attrs, "state_root", expectedHeader.Root.Hex()) + testutil.RequireAttribute(t, attrs, "gas_limit", int64(expectedHeader.GasLimit)) + testutil.RequireAttribute(t, attrs, "gas_used", int64(expectedHeader.GasUsed)) + testutil.RequireAttribute(t, attrs, "timestamp", int64(expectedHeader.Time)) } func TestTracedEthRPCClient_HeaderByNumber_Latest(t *testing.T) { @@ -131,7 +132,7 @@ func TestTracedEthRPCClient_HeaderByNumber_Latest(t *testing.T) { // verify block_number is "latest" when nil attrs := span.Attributes() - requireAttribute(t, attrs, "block_number", "latest") + testutil.RequireAttribute(t, attrs, "block_number", "latest") } func TestTracedEthRPCClient_HeaderByNumber_Error(t *testing.T) { @@ -206,8 +207,8 @@ func TestTracedEthRPCClient_GetTxs_Success(t *testing.T) { // verify attributes attrs := span.Attributes() - requireAttribute(t, attrs, "method", "txpoolExt_getTxs") - requireAttribute(t, attrs, "tx_count", len(expectedTxs)) + testutil.RequireAttribute(t, attrs, "method", "txpoolExt_getTxs") + testutil.RequireAttribute(t, attrs, "tx_count", len(expectedTxs)) } func TestTracedEthRPCClient_GetTxs_EmptyPool(t *testing.T) { @@ -235,7 +236,7 @@ func TestTracedEthRPCClient_GetTxs_EmptyPool(t *testing.T) { // verify tx_count is 0 attrs := span.Attributes() - requireAttribute(t, attrs, "tx_count", 0) + testutil.RequireAttribute(t, attrs, "tx_count", 0) } func TestTracedEthRPCClient_GetTxs_Error(t *testing.T) { @@ -276,26 +277,3 @@ func TestTracedEthRPCClient_GetTxs_Error(t *testing.T) { require.NotEqual(t, "tx_count", string(attr.Key)) } } - -// requireAttribute is a helper to check span attributes -func requireAttribute(t *testing.T, attrs []attribute.KeyValue, key string, expected interface{}) { - t.Helper() - found := false - for _, attr := range attrs { - if string(attr.Key) == key { - found = true - switch v := expected.(type) { - case string: - require.Equal(t, v, attr.Value.AsString()) - case int64: - require.Equal(t, v, attr.Value.AsInt64()) - case int: - require.Equal(t, int64(v), attr.Value.AsInt64()) - default: - t.Fatalf("unsupported attribute type: %T", expected) - } - break - } - } - require.True(t, found, "attribute %s not found", key) -} diff --git a/pkg/rpc/server/tracing_test.go b/pkg/rpc/server/tracing_test.go index f950f9e28..a64180d9f 100644 --- a/pkg/rpc/server/tracing_test.go +++ b/pkg/rpc/server/tracing_test.go @@ -9,13 +9,13 @@ import ( "connectrpc.com/connect" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" "google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/timestamppb" + "github.com/evstack/ev-node/pkg/telemetry/testutil" pb "github.com/evstack/ev-node/types/pb/evnode/v1" "github.com/evstack/ev-node/types/pb/evnode/v1/v1connect" ) @@ -174,9 +174,9 @@ func TestTracedStoreService_GetBlock_Success(t *testing.T) { require.Equal(t, codes.Unset, span.Status().Code) attrs := span.Attributes() - requireAttribute(t, attrs, "height", int64(10)) - requireAttribute(t, attrs, "found", true) - requireAttribute(t, attrs, "tx_count", 2) + testutil.RequireAttribute(t, attrs, "height", int64(10)) + testutil.RequireAttribute(t, attrs, "found", true) + testutil.RequireAttribute(t, attrs, "tx_count", 2) } func TestTracedStoreService_GetBlock_Error(t *testing.T) { @@ -228,9 +228,9 @@ func TestTracedStoreService_GetState_Success(t *testing.T) { require.Equal(t, "StoreService.GetState", span.Name()) attrs := span.Attributes() - requireAttribute(t, attrs, "height", int64(100)) - requireAttribute(t, attrs, "app_hash", "aabb") - requireAttribute(t, attrs, "da_height", int64(50)) + testutil.RequireAttribute(t, attrs, "height", int64(100)) + testutil.RequireAttribute(t, attrs, "app_hash", "aabb") + testutil.RequireAttribute(t, attrs, "da_height", int64(50)) } func TestTracedStoreService_GetMetadata_Success(t *testing.T) { @@ -258,8 +258,8 @@ func TestTracedStoreService_GetMetadata_Success(t *testing.T) { require.Equal(t, "StoreService.GetMetadata", span.Name()) attrs := span.Attributes() - requireAttribute(t, attrs, "key", "test_key") - requireAttribute(t, attrs, "value_size_bytes", 14) + testutil.RequireAttribute(t, attrs, "key", "test_key") + testutil.RequireAttribute(t, attrs, "value_size_bytes", 14) } func TestTracedStoreService_GetGenesisDaHeight_Success(t *testing.T) { @@ -284,7 +284,7 @@ func TestTracedStoreService_GetGenesisDaHeight_Success(t *testing.T) { require.Equal(t, "StoreService.GetGenesisDaHeight", span.Name()) attrs := span.Attributes() - requireAttribute(t, attrs, "genesis_da_height", int64(1000)) + testutil.RequireAttribute(t, attrs, "genesis_da_height", int64(1000)) } func TestTracedStoreService_GetP2PStoreInfo_Success(t *testing.T) { @@ -312,7 +312,7 @@ func TestTracedStoreService_GetP2PStoreInfo_Success(t *testing.T) { require.Equal(t, "StoreService.GetP2PStoreInfo", span.Name()) attrs := span.Attributes() - requireAttribute(t, attrs, "store_count", 2) + testutil.RequireAttribute(t, attrs, "store_count", 2) } // P2PService tests @@ -342,7 +342,7 @@ func TestTracedP2PService_GetPeerInfo_Success(t *testing.T) { require.Equal(t, "P2PService.GetPeerInfo", span.Name()) attrs := span.Attributes() - requireAttribute(t, attrs, "peer_count", 2) + testutil.RequireAttribute(t, attrs, "peer_count", 2) } func TestTracedP2PService_GetPeerInfo_Error(t *testing.T) { @@ -389,8 +389,8 @@ func TestTracedP2PService_GetNetInfo_Success(t *testing.T) { require.Equal(t, "P2PService.GetNetInfo", span.Name()) attrs := span.Attributes() - requireAttribute(t, attrs, "node_id", "node123") - requireAttribute(t, attrs, "listen_address_count", 1) + testutil.RequireAttribute(t, attrs, "node_id", "node123") + testutil.RequireAttribute(t, attrs, "listen_address_count", 1) } // ConfigService tests @@ -418,8 +418,8 @@ func TestTracedConfigService_GetNamespace_Success(t *testing.T) { require.Equal(t, "ConfigService.GetNamespace", span.Name()) attrs := span.Attributes() - requireAttribute(t, attrs, "header_namespace", "0x0001020304050607") - requireAttribute(t, attrs, "data_namespace", "0x08090a0b0c0d0e0f") + testutil.RequireAttribute(t, attrs, "header_namespace", "0x0001020304050607") + testutil.RequireAttribute(t, attrs, "data_namespace", "0x08090a0b0c0d0e0f") } func TestTracedConfigService_GetNamespace_Error(t *testing.T) { @@ -463,7 +463,7 @@ func TestTracedConfigService_GetSignerInfo_Success(t *testing.T) { require.Equal(t, "ConfigService.GetSignerInfo", span.Name()) attrs := span.Attributes() - requireAttribute(t, attrs, "signer_address", "01020304") + testutil.RequireAttribute(t, attrs, "signer_address", "01020304") } func TestTracedConfigService_GetSignerInfo_Error(t *testing.T) { @@ -484,27 +484,3 @@ func TestTracedConfigService_GetSignerInfo_Error(t *testing.T) { span := spans[0] require.Equal(t, codes.Error, span.Status().Code) } - -func requireAttribute(t *testing.T, attrs []attribute.KeyValue, key string, expected interface{}) { - t.Helper() - found := false - for _, attr := range attrs { - if string(attr.Key) == key { - found = true - switch v := expected.(type) { - case string: - require.Equal(t, v, attr.Value.AsString()) - case int64: - require.Equal(t, v, attr.Value.AsInt64()) - case int: - require.Equal(t, int64(v), attr.Value.AsInt64()) - case bool: - require.Equal(t, v, attr.Value.AsBool()) - default: - t.Fatalf("unsupported attribute type: %T", expected) - } - break - } - } - require.True(t, found, "attribute %s not found", key) -} diff --git a/pkg/telemetry/executor_tracing_test.go b/pkg/telemetry/executor_tracing_test.go index 9a79ba2f6..472ba9852 100644 --- a/pkg/telemetry/executor_tracing_test.go +++ b/pkg/telemetry/executor_tracing_test.go @@ -6,16 +6,15 @@ import ( "testing" "time" + coreexec "github.com/evstack/ev-node/core/execution" + "github.com/evstack/ev-node/pkg/telemetry/testutil" + "github.com/evstack/ev-node/test/mocks" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" - - coreexec "github.com/evstack/ev-node/core/execution" - "github.com/evstack/ev-node/test/mocks" ) // setupTestTracing creates a traced executor with an in-memory span recorder for testing @@ -72,9 +71,9 @@ func TestWithTracingExecutor_InitChain_Success(t *testing.T) { // verify attributes attrs := span.Attributes() require.Len(t, attrs, 3) - requireAttribute(t, attrs, "chain.id", chainID) - requireAttribute(t, attrs, "initial.height", int64(initialHeight)) - requireAttribute(t, attrs, "genesis.time_unix", genesisTime.Unix()) + testutil.RequireAttribute(t, attrs, "chain.id", chainID) + testutil.RequireAttribute(t, attrs, "initial.height", int64(initialHeight)) + testutil.RequireAttribute(t, attrs, "genesis.time_unix", genesisTime.Unix()) } func TestWithTracingExecutor_InitChain_Error(t *testing.T) { @@ -137,7 +136,7 @@ func TestWithTracingExecutor_GetTxs_Success(t *testing.T) { // verify tx.count attribute attrs := span.Attributes() - requireAttribute(t, attrs, "tx.count", len(expectedTxs)) + testutil.RequireAttribute(t, attrs, "tx.count", len(expectedTxs)) } func TestWithTracingExecutor_GetTxs_Error(t *testing.T) { @@ -202,9 +201,9 @@ func TestWithTracingExecutor_ExecuteTxs_Success(t *testing.T) { // verify attributes attrs := span.Attributes() - requireAttribute(t, attrs, "tx.count", len(txs)) - requireAttribute(t, attrs, "block.height", int64(blockHeight)) - requireAttribute(t, attrs, "timestamp", timestamp.Unix()) + testutil.RequireAttribute(t, attrs, "tx.count", len(txs)) + testutil.RequireAttribute(t, attrs, "block.height", int64(blockHeight)) + testutil.RequireAttribute(t, attrs, "timestamp", timestamp.Unix()) } func TestWithTracingExecutor_ExecuteTxs_Error(t *testing.T) { @@ -260,7 +259,7 @@ func TestWithTracingExecutor_SetFinal_Success(t *testing.T) { require.Equal(t, codes.Unset, span.Status().Code) attrs := span.Attributes() - requireAttribute(t, attrs, "block.height", int64(blockHeight)) + testutil.RequireAttribute(t, attrs, "block.height", int64(blockHeight)) } func TestWithTracingExecutor_SetFinal_Error(t *testing.T) { @@ -371,26 +370,3 @@ type mockExecutorWithHeight struct { func (m *mockExecutorWithHeight) GetLatestHeight(ctx context.Context) (uint64, error) { return m.height, m.err } - -// requireAttribute is a helper to check span attributes -func requireAttribute(t *testing.T, attrs []attribute.KeyValue, key string, expected interface{}) { - t.Helper() - found := false - for _, attr := range attrs { - if string(attr.Key) == key { - found = true - switch v := expected.(type) { - case string: - require.Equal(t, v, attr.Value.AsString()) - case int64: - require.Equal(t, v, attr.Value.AsInt64()) - case int: - require.Equal(t, int64(v), attr.Value.AsInt64()) - default: - t.Fatalf("unsupported attribute type: %T", expected) - } - break - } - } - require.True(t, found, "attribute %s not found", key) -} diff --git a/pkg/telemetry/sequencer_tracing_test.go b/pkg/telemetry/sequencer_tracing_test.go index 244024075..bea229f74 100644 --- a/pkg/telemetry/sequencer_tracing_test.go +++ b/pkg/telemetry/sequencer_tracing_test.go @@ -7,9 +7,9 @@ import ( "time" coresequencer "github.com/evstack/ev-node/core/sequencer" + "github.com/evstack/ev-node/pkg/telemetry/testutil" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" @@ -92,8 +92,8 @@ func TestTracedSequencer_SubmitBatchTxs_Success(t *testing.T) { require.Equal(t, codes.Unset, span.Status().Code) attrs := span.Attributes() - requireSequencerAttribute(t, attrs, "tx.count", 3) - requireSequencerAttribute(t, attrs, "batch.size_bytes", 9) // "tx1" + "tx2" + "tx3" = 9 bytes + testutil.RequireAttribute(t, attrs, "tx.count", 3) + testutil.RequireAttribute(t, attrs, "batch.size_bytes", 9) // "tx1" + "tx2" + "tx3" = 9 bytes } func TestTracedSequencer_SubmitBatchTxs_Error(t *testing.T) { @@ -150,9 +150,9 @@ func TestTracedSequencer_GetNextBatch_Success(t *testing.T) { require.Equal(t, codes.Unset, span.Status().Code) attrs := span.Attributes() - requireSequencerAttribute(t, attrs, "tx.count", 2) - requireSequencerAttribute(t, attrs, "forced_inclusion.count", 1) - requireSequencerAttribute(t, attrs, "max_bytes", int64(1000)) + testutil.RequireAttribute(t, attrs, "tx.count", 2) + testutil.RequireAttribute(t, attrs, "forced_inclusion.count", 1) + testutil.RequireAttribute(t, attrs, "max_bytes", int64(1000)) } func TestTracedSequencer_GetNextBatch_Error(t *testing.T) { @@ -203,8 +203,8 @@ func TestTracedSequencer_VerifyBatch_Success(t *testing.T) { require.Equal(t, "Sequencer.VerifyBatch", span.Name()) attrs := span.Attributes() - requireSequencerAttribute(t, attrs, "batch_data.count", 2) - requireSequencerAttribute(t, attrs, "verified", true) + testutil.RequireAttribute(t, attrs, "batch_data.count", 2) + testutil.RequireAttribute(t, attrs, "verified", true) } func TestTracedSequencer_VerifyBatch_Failure(t *testing.T) { @@ -231,7 +231,7 @@ func TestTracedSequencer_VerifyBatch_Failure(t *testing.T) { span := spans[0] attrs := span.Attributes() - requireSequencerAttribute(t, attrs, "verified", false) + testutil.RequireAttribute(t, attrs, "verified", false) } func TestTracedSequencer_VerifyBatch_Error(t *testing.T) { @@ -267,27 +267,3 @@ func TestTracedSequencer_DAHeightPassthrough(t *testing.T) { seq.SetDAHeight(200) require.Equal(t, uint64(200), seq.GetDAHeight()) } - -func requireSequencerAttribute(t *testing.T, attrs []attribute.KeyValue, key string, expected interface{}) { - t.Helper() - found := false - for _, attr := range attrs { - if string(attr.Key) == key { - found = true - switch v := expected.(type) { - case string: - require.Equal(t, v, attr.Value.AsString()) - case int64: - require.Equal(t, v, attr.Value.AsInt64()) - case int: - require.Equal(t, int64(v), attr.Value.AsInt64()) - case bool: - require.Equal(t, v, attr.Value.AsBool()) - default: - t.Fatalf("unsupported attribute type: %T", expected) - } - break - } - } - require.True(t, found, "attribute %s not found", key) -} diff --git a/pkg/telemetry/testutil/attributes.go b/pkg/telemetry/testutil/attributes.go new file mode 100644 index 000000000..6a6ede4cd --- /dev/null +++ b/pkg/telemetry/testutil/attributes.go @@ -0,0 +1,34 @@ +// Package testutil provides test utilities for OpenTelemetry tracing tests. +package testutil + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" +) + +// RequireAttribute asserts that an attribute with the given key exists and has the expected value. +func RequireAttribute(t *testing.T, attrs []attribute.KeyValue, key string, expected interface{}) { + t.Helper() + found := false + for _, attr := range attrs { + if string(attr.Key) == key { + found = true + switch v := expected.(type) { + case string: + require.Equal(t, v, attr.Value.AsString()) + case int64: + require.Equal(t, v, attr.Value.AsInt64()) + case int: + require.Equal(t, int64(v), attr.Value.AsInt64()) + case bool: + require.Equal(t, v, attr.Value.AsBool()) + default: + t.Fatalf("unsupported attribute type: %T", expected) + } + break + } + } + require.True(t, found, "attribute %s not found", key) +} From ef202f428ade529e6f0db1cf8602b533d5cc697c Mon Sep 17 00:00:00 2001 From: chatton Date: Mon, 19 Jan 2026 09:50:50 +0000 Subject: [PATCH 3/5] chore: adding da retreiver syncing --- .../internal/syncing/da_retriever_tracing.go | 57 ++++++++ .../syncing/da_retriever_tracing_test.go | 123 ++++++++++++++++++ block/internal/syncing/syncer.go | 3 + 3 files changed, 183 insertions(+) create mode 100644 block/internal/syncing/da_retriever_tracing.go create mode 100644 block/internal/syncing/da_retriever_tracing_test.go diff --git a/block/internal/syncing/da_retriever_tracing.go b/block/internal/syncing/da_retriever_tracing.go new file mode 100644 index 000000000..894fc67ba --- /dev/null +++ b/block/internal/syncing/da_retriever_tracing.go @@ -0,0 +1,57 @@ +package syncing + +import ( + "context" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + + "github.com/evstack/ev-node/block/internal/common" +) + +var _ DARetriever = (*tracedDARetriever)(nil) + +// tracedDARetriever wraps a DARetriever with OpenTelemetry tracing. +type tracedDARetriever struct { + inner DARetriever + tracer trace.Tracer +} + +// WithTracingDARetriever wraps a DARetriever with OpenTelemetry tracing. +func WithTracingDARetriever(inner DARetriever) DARetriever { + return &tracedDARetriever{ + inner: inner, + tracer: otel.Tracer("ev-node/da-retriever"), + } +} + +func (t *tracedDARetriever) RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) { + ctx, span := t.tracer.Start(ctx, "DARetriever.RetrieveFromDA", + trace.WithAttributes( + attribute.Int64("da.height", int64(daHeight)), + ), + ) + defer span.End() + + events, err := t.inner.RetrieveFromDA(ctx, daHeight) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return events, err + } + + span.SetAttributes(attribute.Int("event.count", len(events))) + + // add block heights from events + if len(events) > 0 { + heights := make([]int64, len(events)) + for i, event := range events { + heights[i] = int64(event.Header.Height()) + } + span.SetAttributes(attribute.Int64Slice("block.heights", heights)) + } + + return events, nil +} diff --git a/block/internal/syncing/da_retriever_tracing_test.go b/block/internal/syncing/da_retriever_tracing_test.go new file mode 100644 index 000000000..d83ed99d2 --- /dev/null +++ b/block/internal/syncing/da_retriever_tracing_test.go @@ -0,0 +1,123 @@ +package syncing + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/codes" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + + "github.com/evstack/ev-node/block/internal/common" + "github.com/evstack/ev-node/pkg/telemetry/testutil" + "github.com/evstack/ev-node/types" +) + +type mockDARetriever struct { + retrieveFromDAFn func(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) +} + +func (m *mockDARetriever) RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) { + if m.retrieveFromDAFn != nil { + return m.retrieveFromDAFn(ctx, daHeight) + } + return nil, nil +} + +func setupDARetrieverTrace(t *testing.T, inner DARetriever) (DARetriever, *tracetest.SpanRecorder) { + t.Helper() + sr := tracetest.NewSpanRecorder() + tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + t.Cleanup(func() { _ = tp.Shutdown(context.Background()) }) + otel.SetTracerProvider(tp) + return WithTracingDARetriever(inner), sr +} + +func TestTracedDARetriever_RetrieveFromDA_Success(t *testing.T) { + mock := &mockDARetriever{ + retrieveFromDAFn: func(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) { + return []common.DAHeightEvent{ + { + Header: &types.SignedHeader{ + Header: types.Header{ + BaseHeader: types.BaseHeader{Height: 100}, + }, + }, + DaHeight: daHeight, + Source: common.SourceDA, + }, + { + Header: &types.SignedHeader{ + Header: types.Header{ + BaseHeader: types.BaseHeader{Height: 101}, + }, + }, + DaHeight: daHeight, + Source: common.SourceDA, + }, + }, nil + }, + } + retriever, sr := setupDARetrieverTrace(t, mock) + ctx := context.Background() + + events, err := retriever.RetrieveFromDA(ctx, 50) + require.NoError(t, err) + require.Len(t, events, 2) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "DARetriever.RetrieveFromDA", span.Name()) + require.Equal(t, codes.Unset, span.Status().Code) + + attrs := span.Attributes() + testutil.RequireAttribute(t, attrs, "da.height", int64(50)) + testutil.RequireAttribute(t, attrs, "event.count", 2) +} + +func TestTracedDARetriever_RetrieveFromDA_NoEvents(t *testing.T) { + mock := &mockDARetriever{ + retrieveFromDAFn: func(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) { + return []common.DAHeightEvent{}, nil + }, + } + retriever, sr := setupDARetrieverTrace(t, mock) + ctx := context.Background() + + events, err := retriever.RetrieveFromDA(ctx, 50) + require.NoError(t, err) + require.Empty(t, events) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, codes.Unset, span.Status().Code) + + attrs := span.Attributes() + testutil.RequireAttribute(t, attrs, "event.count", 0) +} + +func TestTracedDARetriever_RetrieveFromDA_Error(t *testing.T) { + expectedErr := errors.New("DA retrieval failed") + mock := &mockDARetriever{ + retrieveFromDAFn: func(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) { + return nil, expectedErr + }, + } + retriever, sr := setupDARetrieverTrace(t, mock) + ctx := context.Background() + + _, err := retriever.RetrieveFromDA(ctx, 50) + require.Error(t, err) + require.Equal(t, expectedErr, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, codes.Error, span.Status().Code) + require.Equal(t, expectedErr.Error(), span.Status().Description) +} diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index a00c2f4fc..6365c548b 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -201,6 +201,9 @@ func (s *Syncer) Start(ctx context.Context) error { // Initialize handlers s.daRetriever = NewDARetriever(s.daClient, s.cache, s.genesis, s.logger) + if s.config.Instrumentation.IsTracingEnabled() { + s.daRetriever = WithTracingDARetriever(s.daRetriever) + } s.fiRetriever = da.NewForcedInclusionRetriever(s.daClient, s.logger, s.config, s.genesis.DAStartHeight, s.genesis.DAEpochForcedInclusion) s.p2pHandler = NewP2PHandler(s.headerStore.Store(), s.dataStore.Store(), s.cache, s.genesis, s.logger) if currentHeight, err := s.store.Height(s.ctx); err != nil { From eeea8b66dae6c70aa245ef92535adc27de591923 Mon Sep 17 00:00:00 2001 From: chatton Date: Mon, 19 Jan 2026 10:34:47 +0000 Subject: [PATCH 4/5] chore: bump sonic version to work with 1.25 --- test/docker-e2e/go.mod | 7 ++++--- test/docker-e2e/go.sum | 20 +++++++++----------- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/test/docker-e2e/go.mod b/test/docker-e2e/go.mod index 099fdcbb3..e6ea03df7 100644 --- a/test/docker-e2e/go.mod +++ b/test/docker-e2e/go.mod @@ -16,10 +16,11 @@ require ( github.com/StackExchange/wmi v1.2.1 // indirect github.com/bcp-innovations/hyperlane-cosmos v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.20.0 // indirect - github.com/bytedance/sonic v1.13.1 // indirect - github.com/bytedance/sonic/loader v0.2.4 // indirect + github.com/bytedance/gopkg v0.1.3 // indirect + github.com/bytedance/sonic v1.14.2 // indirect + github.com/bytedance/sonic/loader v0.4.0 // indirect github.com/celestiaorg/go-square/v3 v3.0.2 // indirect - github.com/cloudwego/base64x v0.1.5 // indirect + github.com/cloudwego/base64x v0.1.6 // indirect github.com/consensys/gnark-crypto v0.18.1 // indirect github.com/containerd/continuity v0.4.5 // indirect github.com/crate-crypto/go-eth-kzg v1.4.0 // indirect diff --git a/test/docker-e2e/go.sum b/test/docker-e2e/go.sum index 000946024..92268cc0b 100644 --- a/test/docker-e2e/go.sum +++ b/test/docker-e2e/go.sum @@ -126,11 +126,12 @@ github.com/btcsuite/btcd/btcutil v1.1.6 h1:zFL2+c3Lb9gEgqKNzowKUPQNb8jV7v5Oaodi/ github.com/btcsuite/btcd/btcutil v1.1.6/go.mod h1:9dFymx8HpuLqBnsPELrImQeTQfKBQqzqGbbV3jK55aE= github.com/bufbuild/protocompile v0.14.1 h1:iA73zAf/fyljNjQKwYzUHD6AD4R8KMasmwa/FBatYVw= github.com/bufbuild/protocompile v0.14.1/go.mod h1:ppVdAIhbr2H8asPk6k4pY7t9zB1OU5DoEw9xY/FUi1c= -github.com/bytedance/sonic v1.13.1 h1:Jyd5CIvdFnkOWuKXr+wm4Nyk2h0yAFsr8ucJgEasO3g= -github.com/bytedance/sonic v1.13.1/go.mod h1:o68xyaF9u2gvVBuGHPlUVCy+ZfmNNO5ETf1+KgkJhz4= -github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= -github.com/bytedance/sonic/loader v0.2.4 h1:ZWCw4stuXUsn1/+zQDqeE7JKP+QO47tz7QCNan80NzY= -github.com/bytedance/sonic/loader v0.2.4/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFosQU6FxH2JmUe6VI= +github.com/bytedance/gopkg v0.1.3 h1:TPBSwH8RsouGCBcMBktLt1AymVo2TVsBVCY4b6TnZ/M= +github.com/bytedance/gopkg v0.1.3/go.mod h1:576VvJ+eJgyCzdjS+c4+77QF3p7ubbtiKARP3TxducM= +github.com/bytedance/sonic v1.14.2 h1:k1twIoe97C1DtYUo+fZQy865IuHia4PR5RPiuGPPIIE= +github.com/bytedance/sonic v1.14.2/go.mod h1:T80iDELeHiHKSc0C9tubFygiuXoGzrkjKzX2quAx980= +github.com/bytedance/sonic/loader v0.4.0 h1:olZ7lEqcxtZygCK9EKYKADnpQoYkRQxaeY2NYzevs+o= +github.com/bytedance/sonic/loader v0.4.0/go.mod h1:AR4NYCk5DdzZizZ5djGqQ92eEhCCcdf5x77udYiSJRo= github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= github.com/celestiaorg/celestia-core v0.39.4 h1:h0WaG8KsP0JyiAVhHipoIgvBP0CYLG/9whUccy1lDlY= github.com/celestiaorg/celestia-core v0.39.4/go.mod h1:t7cSYwLFmpz5RjIBpC3QjpbRoa+RfQ0ULdh+LciKuq8= @@ -162,9 +163,8 @@ github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6D github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I= github.com/clbanning/x2j v0.0.0-20191024224557-825249438eec/go.mod h1:jMjuTZXRI4dUb/I5gc9Hdhagfvm9+RyrPryS/auMzxE= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= -github.com/cloudwego/base64x v0.1.5 h1:XPciSp1xaq2VCSt6lF0phncD4koWyULpl5bUxbfCyP4= -github.com/cloudwego/base64x v0.1.5/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w= -github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY= +github.com/cloudwego/base64x v0.1.6 h1:t11wG9AECkCDk5fMSoxmufanudBtJ+/HemLstXDLI2M= +github.com/cloudwego/base64x v0.1.6/go.mod h1:OFcloc187FXDaYHvrNIjxSe8ncn0OOM8gEHfghB2IPU= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI= @@ -561,12 +561,10 @@ github.com/klauspost/compress v1.11.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYs github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= -github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.10 h1:tBs3QSyvjDyFTq3uoc/9xFpCuOsJQFNPiAhYdw2skhE= github.com/klauspost/cpuid/v2 v2.2.10/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= github.com/klauspost/reedsolomon v1.12.5 h1:4cJuyH926If33BeDgiZpI5OU0pE+wUHZvMSyNGqN73Y= github.com/klauspost/reedsolomon v1.12.5/go.mod h1:LkXRjLYGM8K/iQfujYnaPeDmhZLqkrGUyG9p7zs5L68= -github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= @@ -853,6 +851,7 @@ github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1F github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= @@ -1233,7 +1232,6 @@ honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt nhooyr.io/websocket v1.8.6/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0= nhooyr.io/websocket v1.8.17 h1:KEVeLJkUywCKVsnLIDlD/5gtayKp8VoCkksHCGGfT9Y= nhooyr.io/websocket v1.8.17/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c= -nullprogram.com/x/optparse v1.0.0/go.mod h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50= pgregory.net/rapid v1.2.0 h1:keKAYRcjm+e1F0oAuU5F5+YPAWcyxNNRK2wud503Gnk= pgregory.net/rapid v1.2.0/go.mod h1:PY5XlDGj0+V1FCq0o192FdRhpKHGTRIWBgqjDBTrq04= sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o= From 19add9cc2cb97ba4fdd79744fb51007249de0d53 Mon Sep 17 00:00:00 2001 From: chatton Date: Mon, 19 Jan 2026 11:23:52 +0000 Subject: [PATCH 5/5] chore: adding tracing for da submitter --- block/components.go | 10 +- .../submitting/da_submitter_tracing.go | 97 +++++++++ .../submitting/da_submitter_tracing_test.go | 190 ++++++++++++++++++ block/internal/submitting/submitter.go | 8 +- 4 files changed, 299 insertions(+), 6 deletions(-) create mode 100644 block/internal/submitting/da_submitter_tracing.go create mode 100644 block/internal/submitting/da_submitter_tracing_test.go diff --git a/block/components.go b/block/components.go index a7a883bb3..17cbe018d 100644 --- a/block/components.go +++ b/block/components.go @@ -157,7 +157,10 @@ func NewSyncComponents( } // Create submitter for sync nodes (no signer, only DA inclusion processing) - daSubmitter := submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger) + var daSubmitter submitting.DASubmitterAPI = submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger) + if config.Instrumentation.IsTracingEnabled() { + daSubmitter = submitting.WithTracingDASubmitter(daSubmitter) + } submitter := submitting.NewSubmitter( store, exec, @@ -256,7 +259,10 @@ func NewAggregatorComponents( }, nil } - daSubmitter := submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger) + var daSubmitter submitting.DASubmitterAPI = submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger) + if config.Instrumentation.IsTracingEnabled() { + daSubmitter = submitting.WithTracingDASubmitter(daSubmitter) + } submitter := submitting.NewSubmitter( store, exec, diff --git a/block/internal/submitting/da_submitter_tracing.go b/block/internal/submitting/da_submitter_tracing.go new file mode 100644 index 000000000..e3c531fcf --- /dev/null +++ b/block/internal/submitting/da_submitter_tracing.go @@ -0,0 +1,97 @@ +package submitting + +import ( + "context" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + + "github.com/evstack/ev-node/block/internal/cache" + "github.com/evstack/ev-node/pkg/genesis" + "github.com/evstack/ev-node/pkg/signer" + "github.com/evstack/ev-node/types" +) + +var _ DASubmitterAPI = (*tracedDASubmitter)(nil) + +// tracedDASubmitter wraps a DASubmitterAPI with OpenTelemetry tracing. +type tracedDASubmitter struct { + inner DASubmitterAPI + tracer trace.Tracer +} + +// WithTracingDASubmitter wraps a DASubmitterAPI with OpenTelemetry tracing. +func WithTracingDASubmitter(inner DASubmitterAPI) DASubmitterAPI { + return &tracedDASubmitter{ + inner: inner, + tracer: otel.Tracer("ev-node/da-submitter"), + } +} + +func (t *tracedDASubmitter) SubmitHeaders(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error { + ctx, span := t.tracer.Start(ctx, "DASubmitter.SubmitHeaders", + trace.WithAttributes( + attribute.Int("header.count", len(headers)), + ), + ) + defer span.End() + + // calculate total size + var totalBytes int + for _, h := range marshalledHeaders { + totalBytes += len(h) + } + span.SetAttributes(attribute.Int("header.total_bytes", totalBytes)) + + // add height range if headers present + if len(headers) > 0 { + span.SetAttributes( + attribute.Int64("header.start_height", int64(headers[0].Height())), + attribute.Int64("header.end_height", int64(headers[len(headers)-1].Height())), + ) + } + + err := t.inner.SubmitHeaders(ctx, headers, marshalledHeaders, cache, signer) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + + return nil +} + +func (t *tracedDASubmitter) SubmitData(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error { + ctx, span := t.tracer.Start(ctx, "DASubmitter.SubmitData", + trace.WithAttributes( + attribute.Int("data.count", len(signedDataList)), + ), + ) + defer span.End() + + // calculate total size + var totalBytes int + for _, d := range marshalledData { + totalBytes += len(d) + } + span.SetAttributes(attribute.Int("data.total_bytes", totalBytes)) + + // add height range if data present + if len(signedDataList) > 0 { + span.SetAttributes( + attribute.Int64("data.start_height", int64(signedDataList[0].Height())), + attribute.Int64("data.end_height", int64(signedDataList[len(signedDataList)-1].Height())), + ) + } + + err := t.inner.SubmitData(ctx, signedDataList, marshalledData, cache, signer, genesis) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + + return nil +} diff --git a/block/internal/submitting/da_submitter_tracing_test.go b/block/internal/submitting/da_submitter_tracing_test.go new file mode 100644 index 000000000..6edc5c5ec --- /dev/null +++ b/block/internal/submitting/da_submitter_tracing_test.go @@ -0,0 +1,190 @@ +package submitting + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/codes" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + + "github.com/evstack/ev-node/block/internal/cache" + "github.com/evstack/ev-node/pkg/genesis" + "github.com/evstack/ev-node/pkg/signer" + "github.com/evstack/ev-node/pkg/telemetry/testutil" + "github.com/evstack/ev-node/types" +) + +type mockDASubmitterAPI struct { + submitHeadersFn func(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error + submitDataFn func(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error +} + +func (m *mockDASubmitterAPI) SubmitHeaders(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error { + if m.submitHeadersFn != nil { + return m.submitHeadersFn(ctx, headers, marshalledHeaders, cache, signer) + } + return nil +} + +func (m *mockDASubmitterAPI) SubmitData(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error { + if m.submitDataFn != nil { + return m.submitDataFn(ctx, signedDataList, marshalledData, cache, signer, genesis) + } + return nil +} + +func setupDASubmitterTrace(t *testing.T, inner DASubmitterAPI) (DASubmitterAPI, *tracetest.SpanRecorder) { + t.Helper() + sr := tracetest.NewSpanRecorder() + tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + t.Cleanup(func() { _ = tp.Shutdown(context.Background()) }) + otel.SetTracerProvider(tp) + return WithTracingDASubmitter(inner), sr +} + +func TestTracedDASubmitter_SubmitHeaders_Success(t *testing.T) { + mock := &mockDASubmitterAPI{ + submitHeadersFn: func(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error { + return nil + }, + } + submitter, sr := setupDASubmitterTrace(t, mock) + ctx := context.Background() + + headers := []*types.SignedHeader{ + {Header: types.Header{BaseHeader: types.BaseHeader{Height: 100}}}, + {Header: types.Header{BaseHeader: types.BaseHeader{Height: 101}}}, + {Header: types.Header{BaseHeader: types.BaseHeader{Height: 102}}}, + } + marshalledHeaders := [][]byte{ + []byte("header1"), + []byte("header2"), + []byte("header3"), + } + + err := submitter.SubmitHeaders(ctx, headers, marshalledHeaders, nil, nil) + require.NoError(t, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "DASubmitter.SubmitHeaders", span.Name()) + require.Equal(t, codes.Unset, span.Status().Code) + + attrs := span.Attributes() + testutil.RequireAttribute(t, attrs, "header.count", 3) + testutil.RequireAttribute(t, attrs, "header.total_bytes", 21) // 7+7+7 + testutil.RequireAttribute(t, attrs, "header.start_height", int64(100)) + testutil.RequireAttribute(t, attrs, "header.end_height", int64(102)) +} + +func TestTracedDASubmitter_SubmitHeaders_Error(t *testing.T) { + expectedErr := errors.New("DA submission failed") + mock := &mockDASubmitterAPI{ + submitHeadersFn: func(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error { + return expectedErr + }, + } + submitter, sr := setupDASubmitterTrace(t, mock) + ctx := context.Background() + + headers := []*types.SignedHeader{ + {Header: types.Header{BaseHeader: types.BaseHeader{Height: 100}}}, + } + marshalledHeaders := [][]byte{[]byte("header1")} + + err := submitter.SubmitHeaders(ctx, headers, marshalledHeaders, nil, nil) + require.Error(t, err) + require.Equal(t, expectedErr, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, codes.Error, span.Status().Code) + require.Equal(t, expectedErr.Error(), span.Status().Description) +} + +func TestTracedDASubmitter_SubmitHeaders_Empty(t *testing.T) { + mock := &mockDASubmitterAPI{ + submitHeadersFn: func(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error { + return nil + }, + } + submitter, sr := setupDASubmitterTrace(t, mock) + ctx := context.Background() + + err := submitter.SubmitHeaders(ctx, []*types.SignedHeader{}, [][]byte{}, nil, nil) + require.NoError(t, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + + attrs := span.Attributes() + testutil.RequireAttribute(t, attrs, "header.count", 0) + testutil.RequireAttribute(t, attrs, "header.total_bytes", 0) +} + +func TestTracedDASubmitter_SubmitData_Success(t *testing.T) { + mock := &mockDASubmitterAPI{ + submitDataFn: func(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error { + return nil + }, + } + submitter, sr := setupDASubmitterTrace(t, mock) + ctx := context.Background() + + signedDataList := []*types.SignedData{ + {Data: types.Data{Metadata: &types.Metadata{Height: 100}}}, + {Data: types.Data{Metadata: &types.Metadata{Height: 101}}}, + } + marshalledData := [][]byte{ + []byte("data1data1"), + []byte("data2data2"), + } + + err := submitter.SubmitData(ctx, signedDataList, marshalledData, nil, nil, genesis.Genesis{}) + require.NoError(t, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "DASubmitter.SubmitData", span.Name()) + require.Equal(t, codes.Unset, span.Status().Code) + + attrs := span.Attributes() + testutil.RequireAttribute(t, attrs, "data.count", 2) + testutil.RequireAttribute(t, attrs, "data.total_bytes", 20) // 10+10 + testutil.RequireAttribute(t, attrs, "data.start_height", int64(100)) + testutil.RequireAttribute(t, attrs, "data.end_height", int64(101)) +} + +func TestTracedDASubmitter_SubmitData_Error(t *testing.T) { + expectedErr := errors.New("data submission failed") + mock := &mockDASubmitterAPI{ + submitDataFn: func(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error { + return expectedErr + }, + } + submitter, sr := setupDASubmitterTrace(t, mock) + ctx := context.Background() + + signedDataList := []*types.SignedData{ + {Data: types.Data{Metadata: &types.Metadata{Height: 100}}}, + } + marshalledData := [][]byte{[]byte("data1")} + + err := submitter.SubmitData(ctx, signedDataList, marshalledData, nil, nil, genesis.Genesis{}) + require.Error(t, err) + require.Equal(t, expectedErr, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, codes.Error, span.Status().Code) + require.Equal(t, expectedErr.Error(), span.Status().Description) +} diff --git a/block/internal/submitting/submitter.go b/block/internal/submitting/submitter.go index e6a2328e9..3e1d96777 100644 --- a/block/internal/submitting/submitter.go +++ b/block/internal/submitting/submitter.go @@ -23,8 +23,8 @@ import ( "github.com/evstack/ev-node/types" ) -// daSubmitterAPI defines minimal methods needed by Submitter for DA submissions. -type daSubmitterAPI interface { +// DASubmitterAPI defines minimal methods needed by Submitter for DA submissions. +type DASubmitterAPI interface { SubmitHeaders(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error SubmitData(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error } @@ -43,7 +43,7 @@ type Submitter struct { metrics *common.Metrics // DA submitter - daSubmitter daSubmitterAPI + daSubmitter DASubmitterAPI // Optional signer (only for aggregator nodes) signer signer.Signer @@ -80,7 +80,7 @@ func NewSubmitter( metrics *common.Metrics, config config.Config, genesis genesis.Genesis, - daSubmitter daSubmitterAPI, + daSubmitter DASubmitterAPI, sequencer coresequencer.Sequencer, // Can be nil for sync nodes signer signer.Signer, // Can be nil for sync nodes logger zerolog.Logger,