diff --git a/block/components.go b/block/components.go index a7a883bb3..17cbe018d 100644 --- a/block/components.go +++ b/block/components.go @@ -157,7 +157,10 @@ func NewSyncComponents( } // Create submitter for sync nodes (no signer, only DA inclusion processing) - daSubmitter := submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger) + var daSubmitter submitting.DASubmitterAPI = submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger) + if config.Instrumentation.IsTracingEnabled() { + daSubmitter = submitting.WithTracingDASubmitter(daSubmitter) + } submitter := submitting.NewSubmitter( store, exec, @@ -256,7 +259,10 @@ func NewAggregatorComponents( }, nil } - daSubmitter := submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger) + var daSubmitter submitting.DASubmitterAPI = submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger) + if config.Instrumentation.IsTracingEnabled() { + daSubmitter = submitting.WithTracingDASubmitter(daSubmitter) + } submitter := submitting.NewSubmitter( store, exec, diff --git a/block/internal/submitting/da_submitter_tracing.go b/block/internal/submitting/da_submitter_tracing.go new file mode 100644 index 000000000..e3c531fcf --- /dev/null +++ b/block/internal/submitting/da_submitter_tracing.go @@ -0,0 +1,97 @@ +package submitting + +import ( + "context" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + + "github.com/evstack/ev-node/block/internal/cache" + "github.com/evstack/ev-node/pkg/genesis" + "github.com/evstack/ev-node/pkg/signer" + "github.com/evstack/ev-node/types" +) + +var _ DASubmitterAPI = (*tracedDASubmitter)(nil) + +// tracedDASubmitter wraps a DASubmitterAPI with OpenTelemetry tracing. +type tracedDASubmitter struct { + inner DASubmitterAPI + tracer trace.Tracer +} + +// WithTracingDASubmitter wraps a DASubmitterAPI with OpenTelemetry tracing. +func WithTracingDASubmitter(inner DASubmitterAPI) DASubmitterAPI { + return &tracedDASubmitter{ + inner: inner, + tracer: otel.Tracer("ev-node/da-submitter"), + } +} + +func (t *tracedDASubmitter) SubmitHeaders(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error { + ctx, span := t.tracer.Start(ctx, "DASubmitter.SubmitHeaders", + trace.WithAttributes( + attribute.Int("header.count", len(headers)), + ), + ) + defer span.End() + + // calculate total size + var totalBytes int + for _, h := range marshalledHeaders { + totalBytes += len(h) + } + span.SetAttributes(attribute.Int("header.total_bytes", totalBytes)) + + // add height range if headers present + if len(headers) > 0 { + span.SetAttributes( + attribute.Int64("header.start_height", int64(headers[0].Height())), + attribute.Int64("header.end_height", int64(headers[len(headers)-1].Height())), + ) + } + + err := t.inner.SubmitHeaders(ctx, headers, marshalledHeaders, cache, signer) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + + return nil +} + +func (t *tracedDASubmitter) SubmitData(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error { + ctx, span := t.tracer.Start(ctx, "DASubmitter.SubmitData", + trace.WithAttributes( + attribute.Int("data.count", len(signedDataList)), + ), + ) + defer span.End() + + // calculate total size + var totalBytes int + for _, d := range marshalledData { + totalBytes += len(d) + } + span.SetAttributes(attribute.Int("data.total_bytes", totalBytes)) + + // add height range if data present + if len(signedDataList) > 0 { + span.SetAttributes( + attribute.Int64("data.start_height", int64(signedDataList[0].Height())), + attribute.Int64("data.end_height", int64(signedDataList[len(signedDataList)-1].Height())), + ) + } + + err := t.inner.SubmitData(ctx, signedDataList, marshalledData, cache, signer, genesis) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + + return nil +} diff --git a/block/internal/submitting/da_submitter_tracing_test.go b/block/internal/submitting/da_submitter_tracing_test.go new file mode 100644 index 000000000..6edc5c5ec --- /dev/null +++ b/block/internal/submitting/da_submitter_tracing_test.go @@ -0,0 +1,190 @@ +package submitting + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/codes" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + + "github.com/evstack/ev-node/block/internal/cache" + "github.com/evstack/ev-node/pkg/genesis" + "github.com/evstack/ev-node/pkg/signer" + "github.com/evstack/ev-node/pkg/telemetry/testutil" + "github.com/evstack/ev-node/types" +) + +type mockDASubmitterAPI struct { + submitHeadersFn func(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error + submitDataFn func(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error +} + +func (m *mockDASubmitterAPI) SubmitHeaders(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error { + if m.submitHeadersFn != nil { + return m.submitHeadersFn(ctx, headers, marshalledHeaders, cache, signer) + } + return nil +} + +func (m *mockDASubmitterAPI) SubmitData(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error { + if m.submitDataFn != nil { + return m.submitDataFn(ctx, signedDataList, marshalledData, cache, signer, genesis) + } + return nil +} + +func setupDASubmitterTrace(t *testing.T, inner DASubmitterAPI) (DASubmitterAPI, *tracetest.SpanRecorder) { + t.Helper() + sr := tracetest.NewSpanRecorder() + tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + t.Cleanup(func() { _ = tp.Shutdown(context.Background()) }) + otel.SetTracerProvider(tp) + return WithTracingDASubmitter(inner), sr +} + +func TestTracedDASubmitter_SubmitHeaders_Success(t *testing.T) { + mock := &mockDASubmitterAPI{ + submitHeadersFn: func(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error { + return nil + }, + } + submitter, sr := setupDASubmitterTrace(t, mock) + ctx := context.Background() + + headers := []*types.SignedHeader{ + {Header: types.Header{BaseHeader: types.BaseHeader{Height: 100}}}, + {Header: types.Header{BaseHeader: types.BaseHeader{Height: 101}}}, + {Header: types.Header{BaseHeader: types.BaseHeader{Height: 102}}}, + } + marshalledHeaders := [][]byte{ + []byte("header1"), + []byte("header2"), + []byte("header3"), + } + + err := submitter.SubmitHeaders(ctx, headers, marshalledHeaders, nil, nil) + require.NoError(t, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "DASubmitter.SubmitHeaders", span.Name()) + require.Equal(t, codes.Unset, span.Status().Code) + + attrs := span.Attributes() + testutil.RequireAttribute(t, attrs, "header.count", 3) + testutil.RequireAttribute(t, attrs, "header.total_bytes", 21) // 7+7+7 + testutil.RequireAttribute(t, attrs, "header.start_height", int64(100)) + testutil.RequireAttribute(t, attrs, "header.end_height", int64(102)) +} + +func TestTracedDASubmitter_SubmitHeaders_Error(t *testing.T) { + expectedErr := errors.New("DA submission failed") + mock := &mockDASubmitterAPI{ + submitHeadersFn: func(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error { + return expectedErr + }, + } + submitter, sr := setupDASubmitterTrace(t, mock) + ctx := context.Background() + + headers := []*types.SignedHeader{ + {Header: types.Header{BaseHeader: types.BaseHeader{Height: 100}}}, + } + marshalledHeaders := [][]byte{[]byte("header1")} + + err := submitter.SubmitHeaders(ctx, headers, marshalledHeaders, nil, nil) + require.Error(t, err) + require.Equal(t, expectedErr, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, codes.Error, span.Status().Code) + require.Equal(t, expectedErr.Error(), span.Status().Description) +} + +func TestTracedDASubmitter_SubmitHeaders_Empty(t *testing.T) { + mock := &mockDASubmitterAPI{ + submitHeadersFn: func(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error { + return nil + }, + } + submitter, sr := setupDASubmitterTrace(t, mock) + ctx := context.Background() + + err := submitter.SubmitHeaders(ctx, []*types.SignedHeader{}, [][]byte{}, nil, nil) + require.NoError(t, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + + attrs := span.Attributes() + testutil.RequireAttribute(t, attrs, "header.count", 0) + testutil.RequireAttribute(t, attrs, "header.total_bytes", 0) +} + +func TestTracedDASubmitter_SubmitData_Success(t *testing.T) { + mock := &mockDASubmitterAPI{ + submitDataFn: func(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error { + return nil + }, + } + submitter, sr := setupDASubmitterTrace(t, mock) + ctx := context.Background() + + signedDataList := []*types.SignedData{ + {Data: types.Data{Metadata: &types.Metadata{Height: 100}}}, + {Data: types.Data{Metadata: &types.Metadata{Height: 101}}}, + } + marshalledData := [][]byte{ + []byte("data1data1"), + []byte("data2data2"), + } + + err := submitter.SubmitData(ctx, signedDataList, marshalledData, nil, nil, genesis.Genesis{}) + require.NoError(t, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "DASubmitter.SubmitData", span.Name()) + require.Equal(t, codes.Unset, span.Status().Code) + + attrs := span.Attributes() + testutil.RequireAttribute(t, attrs, "data.count", 2) + testutil.RequireAttribute(t, attrs, "data.total_bytes", 20) // 10+10 + testutil.RequireAttribute(t, attrs, "data.start_height", int64(100)) + testutil.RequireAttribute(t, attrs, "data.end_height", int64(101)) +} + +func TestTracedDASubmitter_SubmitData_Error(t *testing.T) { + expectedErr := errors.New("data submission failed") + mock := &mockDASubmitterAPI{ + submitDataFn: func(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error { + return expectedErr + }, + } + submitter, sr := setupDASubmitterTrace(t, mock) + ctx := context.Background() + + signedDataList := []*types.SignedData{ + {Data: types.Data{Metadata: &types.Metadata{Height: 100}}}, + } + marshalledData := [][]byte{[]byte("data1")} + + err := submitter.SubmitData(ctx, signedDataList, marshalledData, nil, nil, genesis.Genesis{}) + require.Error(t, err) + require.Equal(t, expectedErr, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, codes.Error, span.Status().Code) + require.Equal(t, expectedErr.Error(), span.Status().Description) +} diff --git a/block/internal/submitting/submitter.go b/block/internal/submitting/submitter.go index e6a2328e9..3e1d96777 100644 --- a/block/internal/submitting/submitter.go +++ b/block/internal/submitting/submitter.go @@ -23,8 +23,8 @@ import ( "github.com/evstack/ev-node/types" ) -// daSubmitterAPI defines minimal methods needed by Submitter for DA submissions. -type daSubmitterAPI interface { +// DASubmitterAPI defines minimal methods needed by Submitter for DA submissions. +type DASubmitterAPI interface { SubmitHeaders(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error SubmitData(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error } @@ -43,7 +43,7 @@ type Submitter struct { metrics *common.Metrics // DA submitter - daSubmitter daSubmitterAPI + daSubmitter DASubmitterAPI // Optional signer (only for aggregator nodes) signer signer.Signer @@ -80,7 +80,7 @@ func NewSubmitter( metrics *common.Metrics, config config.Config, genesis genesis.Genesis, - daSubmitter daSubmitterAPI, + daSubmitter DASubmitterAPI, sequencer coresequencer.Sequencer, // Can be nil for sync nodes signer signer.Signer, // Can be nil for sync nodes logger zerolog.Logger,