Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions block/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,10 @@ func NewSyncComponents(
}

// Create submitter for sync nodes (no signer, only DA inclusion processing)
daSubmitter := submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger)
var daSubmitter submitting.DASubmitterAPI = submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger)
if config.Instrumentation.IsTracingEnabled() {
daSubmitter = submitting.WithTracingDASubmitter(daSubmitter)
}
submitter := submitting.NewSubmitter(
store,
exec,
Expand Down Expand Up @@ -256,7 +259,10 @@ func NewAggregatorComponents(
}, nil
}

daSubmitter := submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger)
var daSubmitter submitting.DASubmitterAPI = submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger)
if config.Instrumentation.IsTracingEnabled() {
daSubmitter = submitting.WithTracingDASubmitter(daSubmitter)
}
submitter := submitting.NewSubmitter(
store,
exec,
Expand Down
97 changes: 97 additions & 0 deletions block/internal/submitting/da_submitter_tracing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package submitting

import (
"context"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"

"github.com/evstack/ev-node/block/internal/cache"
"github.com/evstack/ev-node/pkg/genesis"
"github.com/evstack/ev-node/pkg/signer"
"github.com/evstack/ev-node/types"
)

var _ DASubmitterAPI = (*tracedDASubmitter)(nil)

// tracedDASubmitter wraps a DASubmitterAPI with OpenTelemetry tracing.
type tracedDASubmitter struct {
inner DASubmitterAPI
tracer trace.Tracer
}

// WithTracingDASubmitter wraps a DASubmitterAPI with OpenTelemetry tracing.
func WithTracingDASubmitter(inner DASubmitterAPI) DASubmitterAPI {
return &tracedDASubmitter{
inner: inner,
tracer: otel.Tracer("ev-node/da-submitter"),
}
}

func (t *tracedDASubmitter) SubmitHeaders(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error {
ctx, span := t.tracer.Start(ctx, "DASubmitter.SubmitHeaders",
trace.WithAttributes(
attribute.Int("header.count", len(headers)),
),
)
defer span.End()

// calculate total size
var totalBytes int
for _, h := range marshalledHeaders {
totalBytes += len(h)
}
span.SetAttributes(attribute.Int("header.total_bytes", totalBytes))

// add height range if headers present
if len(headers) > 0 {
span.SetAttributes(
attribute.Int64("header.start_height", int64(headers[0].Height())),
attribute.Int64("header.end_height", int64(headers[len(headers)-1].Height())),
)
}

err := t.inner.SubmitHeaders(ctx, headers, marshalledHeaders, cache, signer)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}

return nil
}

func (t *tracedDASubmitter) SubmitData(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error {
ctx, span := t.tracer.Start(ctx, "DASubmitter.SubmitData",
trace.WithAttributes(
attribute.Int("data.count", len(signedDataList)),
),
)
defer span.End()

// calculate total size
var totalBytes int
for _, d := range marshalledData {
totalBytes += len(d)
}
span.SetAttributes(attribute.Int("data.total_bytes", totalBytes))

// add height range if data present
if len(signedDataList) > 0 {
span.SetAttributes(
attribute.Int64("data.start_height", int64(signedDataList[0].Height())),
attribute.Int64("data.end_height", int64(signedDataList[len(signedDataList)-1].Height())),
)
}

err := t.inner.SubmitData(ctx, signedDataList, marshalledData, cache, signer, genesis)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}

return nil
}
190 changes: 190 additions & 0 deletions block/internal/submitting/da_submitter_tracing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
package submitting

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"

"github.com/evstack/ev-node/block/internal/cache"
"github.com/evstack/ev-node/pkg/genesis"
"github.com/evstack/ev-node/pkg/signer"
"github.com/evstack/ev-node/pkg/telemetry/testutil"
"github.com/evstack/ev-node/types"
)

type mockDASubmitterAPI struct {
submitHeadersFn func(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error
submitDataFn func(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error
}

func (m *mockDASubmitterAPI) SubmitHeaders(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error {
if m.submitHeadersFn != nil {
return m.submitHeadersFn(ctx, headers, marshalledHeaders, cache, signer)
}
return nil
}

func (m *mockDASubmitterAPI) SubmitData(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error {
if m.submitDataFn != nil {
return m.submitDataFn(ctx, signedDataList, marshalledData, cache, signer, genesis)
}
return nil
}

func setupDASubmitterTrace(t *testing.T, inner DASubmitterAPI) (DASubmitterAPI, *tracetest.SpanRecorder) {
t.Helper()
sr := tracetest.NewSpanRecorder()
tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr))
t.Cleanup(func() { _ = tp.Shutdown(context.Background()) })
otel.SetTracerProvider(tp)
return WithTracingDASubmitter(inner), sr
}

func TestTracedDASubmitter_SubmitHeaders_Success(t *testing.T) {
mock := &mockDASubmitterAPI{
submitHeadersFn: func(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error {
return nil
},
}
submitter, sr := setupDASubmitterTrace(t, mock)
ctx := context.Background()

headers := []*types.SignedHeader{
{Header: types.Header{BaseHeader: types.BaseHeader{Height: 100}}},
{Header: types.Header{BaseHeader: types.BaseHeader{Height: 101}}},
{Header: types.Header{BaseHeader: types.BaseHeader{Height: 102}}},
}
marshalledHeaders := [][]byte{
[]byte("header1"),
[]byte("header2"),
[]byte("header3"),
}

err := submitter.SubmitHeaders(ctx, headers, marshalledHeaders, nil, nil)
require.NoError(t, err)

spans := sr.Ended()
require.Len(t, spans, 1)
span := spans[0]
require.Equal(t, "DASubmitter.SubmitHeaders", span.Name())
require.Equal(t, codes.Unset, span.Status().Code)

attrs := span.Attributes()
testutil.RequireAttribute(t, attrs, "header.count", 3)
testutil.RequireAttribute(t, attrs, "header.total_bytes", 21) // 7+7+7
testutil.RequireAttribute(t, attrs, "header.start_height", int64(100))
testutil.RequireAttribute(t, attrs, "header.end_height", int64(102))
}

func TestTracedDASubmitter_SubmitHeaders_Error(t *testing.T) {
expectedErr := errors.New("DA submission failed")
mock := &mockDASubmitterAPI{
submitHeadersFn: func(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error {
return expectedErr
},
}
submitter, sr := setupDASubmitterTrace(t, mock)
ctx := context.Background()

headers := []*types.SignedHeader{
{Header: types.Header{BaseHeader: types.BaseHeader{Height: 100}}},
}
marshalledHeaders := [][]byte{[]byte("header1")}

err := submitter.SubmitHeaders(ctx, headers, marshalledHeaders, nil, nil)
require.Error(t, err)
require.Equal(t, expectedErr, err)

spans := sr.Ended()
require.Len(t, spans, 1)
span := spans[0]
require.Equal(t, codes.Error, span.Status().Code)
require.Equal(t, expectedErr.Error(), span.Status().Description)
}

func TestTracedDASubmitter_SubmitHeaders_Empty(t *testing.T) {
mock := &mockDASubmitterAPI{
submitHeadersFn: func(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error {
return nil
},
}
submitter, sr := setupDASubmitterTrace(t, mock)
ctx := context.Background()

err := submitter.SubmitHeaders(ctx, []*types.SignedHeader{}, [][]byte{}, nil, nil)
require.NoError(t, err)

spans := sr.Ended()
require.Len(t, spans, 1)
span := spans[0]

attrs := span.Attributes()
testutil.RequireAttribute(t, attrs, "header.count", 0)
testutil.RequireAttribute(t, attrs, "header.total_bytes", 0)
}

func TestTracedDASubmitter_SubmitData_Success(t *testing.T) {
mock := &mockDASubmitterAPI{
submitDataFn: func(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error {
return nil
},
}
submitter, sr := setupDASubmitterTrace(t, mock)
ctx := context.Background()

signedDataList := []*types.SignedData{
{Data: types.Data{Metadata: &types.Metadata{Height: 100}}},
{Data: types.Data{Metadata: &types.Metadata{Height: 101}}},
}
marshalledData := [][]byte{
[]byte("data1data1"),
[]byte("data2data2"),
}

err := submitter.SubmitData(ctx, signedDataList, marshalledData, nil, nil, genesis.Genesis{})
require.NoError(t, err)

spans := sr.Ended()
require.Len(t, spans, 1)
span := spans[0]
require.Equal(t, "DASubmitter.SubmitData", span.Name())
require.Equal(t, codes.Unset, span.Status().Code)

attrs := span.Attributes()
testutil.RequireAttribute(t, attrs, "data.count", 2)
testutil.RequireAttribute(t, attrs, "data.total_bytes", 20) // 10+10
testutil.RequireAttribute(t, attrs, "data.start_height", int64(100))
testutil.RequireAttribute(t, attrs, "data.end_height", int64(101))
}

func TestTracedDASubmitter_SubmitData_Error(t *testing.T) {
expectedErr := errors.New("data submission failed")
mock := &mockDASubmitterAPI{
submitDataFn: func(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error {
return expectedErr
},
}
submitter, sr := setupDASubmitterTrace(t, mock)
ctx := context.Background()

signedDataList := []*types.SignedData{
{Data: types.Data{Metadata: &types.Metadata{Height: 100}}},
}
marshalledData := [][]byte{[]byte("data1")}

err := submitter.SubmitData(ctx, signedDataList, marshalledData, nil, nil, genesis.Genesis{})
require.Error(t, err)
require.Equal(t, expectedErr, err)

spans := sr.Ended()
require.Len(t, spans, 1)
span := spans[0]
require.Equal(t, codes.Error, span.Status().Code)
require.Equal(t, expectedErr.Error(), span.Status().Description)
}
8 changes: 4 additions & 4 deletions block/internal/submitting/submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"github.com/evstack/ev-node/types"
)

// daSubmitterAPI defines minimal methods needed by Submitter for DA submissions.
type daSubmitterAPI interface {
// DASubmitterAPI defines minimal methods needed by Submitter for DA submissions.
type DASubmitterAPI interface {
SubmitHeaders(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error
SubmitData(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error
}
Expand All @@ -43,7 +43,7 @@ type Submitter struct {
metrics *common.Metrics

// DA submitter
daSubmitter daSubmitterAPI
daSubmitter DASubmitterAPI

// Optional signer (only for aggregator nodes)
signer signer.Signer
Expand Down Expand Up @@ -80,7 +80,7 @@ func NewSubmitter(
metrics *common.Metrics,
config config.Config,
genesis genesis.Genesis,
daSubmitter daSubmitterAPI,
daSubmitter DASubmitterAPI,
sequencer coresequencer.Sequencer, // Can be nil for sync nodes
signer signer.Signer, // Can be nil for sync nodes
logger zerolog.Logger,
Expand Down
Loading