Skip to content

Commit 6e4554d

Browse files
committed
Merge branch 'main' into alex/2609_hints
* main: chore: update calculator for strategies (#2995) chore: adding tracing for da submitter (#2993) feat(tracing): part 10 da retriever tracing (#2991) chore: add da posting strategy to docs (#2992)
2 parents 4907c92 + 9ad4016 commit 6e4554d

File tree

14 files changed

+973
-171
lines changed

14 files changed

+973
-171
lines changed

.github/workflows/docs_deploy.yml

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,6 @@ on:
1313
# Allows you to run this workflow manually from the Actions tab
1414
workflow_dispatch:
1515

16-
# Sets permissions of the GITHUB_TOKEN to allow deployment to GitHub Pages
17-
permissions: write-all
18-
1916
# Allow only one concurrent deployment, skipping runs queued between the run in-progress and latest queued.
2017
# However, do NOT cancel in-progress runs as we want to allow these production deployments to complete.
2118
concurrency:
@@ -49,6 +46,6 @@ jobs:
4946
- name: Deploy to GitHub Pages
5047
uses: peaceiris/actions-gh-pages@v4
5148
with:
52-
github_token: ${{ secrets.GITHUB_TOKEN }}
49+
github_token: ${{ secrets.DOCS_DEPLOY_TOKEN }}
5350
publish_dir: ./docs/.vitepress/dist
5451
cname: ev.xyz

block/components.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,10 @@ func NewSyncComponents(
156156
}
157157

158158
// Create submitter for sync nodes (no signer, only DA inclusion processing)
159-
daSubmitter := submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger, headerStore, dataStore)
159+
var daSubmitter submitting.DASubmitterAPI = submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger, headerStore, dataStore)
160+
if config.Instrumentation.IsTracingEnabled() {
161+
daSubmitter = submitting.WithTracingDASubmitter(daSubmitter)
162+
}
160163
submitter := submitting.NewSubmitter(
161164
store,
162165
exec,
@@ -255,7 +258,10 @@ func NewAggregatorComponents(
255258
}, nil
256259
}
257260

258-
daSubmitter := submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger, headerBroadcaster, dataBroadcaster)
261+
var daSubmitter submitting.DASubmitterAPI = submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger, headerBroadcaster, dataBroadcaster)
262+
if config.Instrumentation.IsTracingEnabled() {
263+
daSubmitter = submitting.WithTracingDASubmitter(daSubmitter)
264+
}
259265
submitter := submitting.NewSubmitter(
260266
store,
261267
exec,
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package submitting
2+
3+
import (
4+
"context"
5+
6+
"go.opentelemetry.io/otel"
7+
"go.opentelemetry.io/otel/attribute"
8+
"go.opentelemetry.io/otel/codes"
9+
"go.opentelemetry.io/otel/trace"
10+
11+
"github.com/evstack/ev-node/block/internal/cache"
12+
"github.com/evstack/ev-node/pkg/genesis"
13+
"github.com/evstack/ev-node/pkg/signer"
14+
"github.com/evstack/ev-node/types"
15+
)
16+
17+
var _ DASubmitterAPI = (*tracedDASubmitter)(nil)
18+
19+
// tracedDASubmitter wraps a DASubmitterAPI with OpenTelemetry tracing.
20+
type tracedDASubmitter struct {
21+
inner DASubmitterAPI
22+
tracer trace.Tracer
23+
}
24+
25+
// WithTracingDASubmitter wraps a DASubmitterAPI with OpenTelemetry tracing.
26+
func WithTracingDASubmitter(inner DASubmitterAPI) DASubmitterAPI {
27+
return &tracedDASubmitter{
28+
inner: inner,
29+
tracer: otel.Tracer("ev-node/da-submitter"),
30+
}
31+
}
32+
33+
func (t *tracedDASubmitter) SubmitHeaders(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error {
34+
ctx, span := t.tracer.Start(ctx, "DASubmitter.SubmitHeaders",
35+
trace.WithAttributes(
36+
attribute.Int("header.count", len(headers)),
37+
),
38+
)
39+
defer span.End()
40+
41+
// calculate total size
42+
var totalBytes int
43+
for _, h := range marshalledHeaders {
44+
totalBytes += len(h)
45+
}
46+
span.SetAttributes(attribute.Int("header.total_bytes", totalBytes))
47+
48+
// add height range if headers present
49+
if len(headers) > 0 {
50+
span.SetAttributes(
51+
attribute.Int64("header.start_height", int64(headers[0].Height())),
52+
attribute.Int64("header.end_height", int64(headers[len(headers)-1].Height())),
53+
)
54+
}
55+
56+
err := t.inner.SubmitHeaders(ctx, headers, marshalledHeaders, cache, signer)
57+
if err != nil {
58+
span.RecordError(err)
59+
span.SetStatus(codes.Error, err.Error())
60+
return err
61+
}
62+
63+
return nil
64+
}
65+
66+
func (t *tracedDASubmitter) SubmitData(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error {
67+
ctx, span := t.tracer.Start(ctx, "DASubmitter.SubmitData",
68+
trace.WithAttributes(
69+
attribute.Int("data.count", len(signedDataList)),
70+
),
71+
)
72+
defer span.End()
73+
74+
// calculate total size
75+
var totalBytes int
76+
for _, d := range marshalledData {
77+
totalBytes += len(d)
78+
}
79+
span.SetAttributes(attribute.Int("data.total_bytes", totalBytes))
80+
81+
// add height range if data present
82+
if len(signedDataList) > 0 {
83+
span.SetAttributes(
84+
attribute.Int64("data.start_height", int64(signedDataList[0].Height())),
85+
attribute.Int64("data.end_height", int64(signedDataList[len(signedDataList)-1].Height())),
86+
)
87+
}
88+
89+
err := t.inner.SubmitData(ctx, signedDataList, marshalledData, cache, signer, genesis)
90+
if err != nil {
91+
span.RecordError(err)
92+
span.SetStatus(codes.Error, err.Error())
93+
return err
94+
}
95+
96+
return nil
97+
}
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
package submitting
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
8+
"github.com/stretchr/testify/require"
9+
"go.opentelemetry.io/otel"
10+
"go.opentelemetry.io/otel/codes"
11+
sdktrace "go.opentelemetry.io/otel/sdk/trace"
12+
"go.opentelemetry.io/otel/sdk/trace/tracetest"
13+
14+
"github.com/evstack/ev-node/block/internal/cache"
15+
"github.com/evstack/ev-node/pkg/genesis"
16+
"github.com/evstack/ev-node/pkg/signer"
17+
"github.com/evstack/ev-node/pkg/telemetry/testutil"
18+
"github.com/evstack/ev-node/types"
19+
)
20+
21+
type mockDASubmitterAPI struct {
22+
submitHeadersFn func(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error
23+
submitDataFn func(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error
24+
}
25+
26+
func (m *mockDASubmitterAPI) SubmitHeaders(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error {
27+
if m.submitHeadersFn != nil {
28+
return m.submitHeadersFn(ctx, headers, marshalledHeaders, cache, signer)
29+
}
30+
return nil
31+
}
32+
33+
func (m *mockDASubmitterAPI) SubmitData(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error {
34+
if m.submitDataFn != nil {
35+
return m.submitDataFn(ctx, signedDataList, marshalledData, cache, signer, genesis)
36+
}
37+
return nil
38+
}
39+
40+
func setupDASubmitterTrace(t *testing.T, inner DASubmitterAPI) (DASubmitterAPI, *tracetest.SpanRecorder) {
41+
t.Helper()
42+
sr := tracetest.NewSpanRecorder()
43+
tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr))
44+
t.Cleanup(func() { _ = tp.Shutdown(context.Background()) })
45+
otel.SetTracerProvider(tp)
46+
return WithTracingDASubmitter(inner), sr
47+
}
48+
49+
func TestTracedDASubmitter_SubmitHeaders_Success(t *testing.T) {
50+
mock := &mockDASubmitterAPI{
51+
submitHeadersFn: func(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error {
52+
return nil
53+
},
54+
}
55+
submitter, sr := setupDASubmitterTrace(t, mock)
56+
ctx := context.Background()
57+
58+
headers := []*types.SignedHeader{
59+
{Header: types.Header{BaseHeader: types.BaseHeader{Height: 100}}},
60+
{Header: types.Header{BaseHeader: types.BaseHeader{Height: 101}}},
61+
{Header: types.Header{BaseHeader: types.BaseHeader{Height: 102}}},
62+
}
63+
marshalledHeaders := [][]byte{
64+
[]byte("header1"),
65+
[]byte("header2"),
66+
[]byte("header3"),
67+
}
68+
69+
err := submitter.SubmitHeaders(ctx, headers, marshalledHeaders, nil, nil)
70+
require.NoError(t, err)
71+
72+
spans := sr.Ended()
73+
require.Len(t, spans, 1)
74+
span := spans[0]
75+
require.Equal(t, "DASubmitter.SubmitHeaders", span.Name())
76+
require.Equal(t, codes.Unset, span.Status().Code)
77+
78+
attrs := span.Attributes()
79+
testutil.RequireAttribute(t, attrs, "header.count", 3)
80+
testutil.RequireAttribute(t, attrs, "header.total_bytes", 21) // 7+7+7
81+
testutil.RequireAttribute(t, attrs, "header.start_height", int64(100))
82+
testutil.RequireAttribute(t, attrs, "header.end_height", int64(102))
83+
}
84+
85+
func TestTracedDASubmitter_SubmitHeaders_Error(t *testing.T) {
86+
expectedErr := errors.New("DA submission failed")
87+
mock := &mockDASubmitterAPI{
88+
submitHeadersFn: func(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error {
89+
return expectedErr
90+
},
91+
}
92+
submitter, sr := setupDASubmitterTrace(t, mock)
93+
ctx := context.Background()
94+
95+
headers := []*types.SignedHeader{
96+
{Header: types.Header{BaseHeader: types.BaseHeader{Height: 100}}},
97+
}
98+
marshalledHeaders := [][]byte{[]byte("header1")}
99+
100+
err := submitter.SubmitHeaders(ctx, headers, marshalledHeaders, nil, nil)
101+
require.Error(t, err)
102+
require.Equal(t, expectedErr, err)
103+
104+
spans := sr.Ended()
105+
require.Len(t, spans, 1)
106+
span := spans[0]
107+
require.Equal(t, codes.Error, span.Status().Code)
108+
require.Equal(t, expectedErr.Error(), span.Status().Description)
109+
}
110+
111+
func TestTracedDASubmitter_SubmitHeaders_Empty(t *testing.T) {
112+
mock := &mockDASubmitterAPI{
113+
submitHeadersFn: func(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error {
114+
return nil
115+
},
116+
}
117+
submitter, sr := setupDASubmitterTrace(t, mock)
118+
ctx := context.Background()
119+
120+
err := submitter.SubmitHeaders(ctx, []*types.SignedHeader{}, [][]byte{}, nil, nil)
121+
require.NoError(t, err)
122+
123+
spans := sr.Ended()
124+
require.Len(t, spans, 1)
125+
span := spans[0]
126+
127+
attrs := span.Attributes()
128+
testutil.RequireAttribute(t, attrs, "header.count", 0)
129+
testutil.RequireAttribute(t, attrs, "header.total_bytes", 0)
130+
}
131+
132+
func TestTracedDASubmitter_SubmitData_Success(t *testing.T) {
133+
mock := &mockDASubmitterAPI{
134+
submitDataFn: func(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error {
135+
return nil
136+
},
137+
}
138+
submitter, sr := setupDASubmitterTrace(t, mock)
139+
ctx := context.Background()
140+
141+
signedDataList := []*types.SignedData{
142+
{Data: types.Data{Metadata: &types.Metadata{Height: 100}}},
143+
{Data: types.Data{Metadata: &types.Metadata{Height: 101}}},
144+
}
145+
marshalledData := [][]byte{
146+
[]byte("data1data1"),
147+
[]byte("data2data2"),
148+
}
149+
150+
err := submitter.SubmitData(ctx, signedDataList, marshalledData, nil, nil, genesis.Genesis{})
151+
require.NoError(t, err)
152+
153+
spans := sr.Ended()
154+
require.Len(t, spans, 1)
155+
span := spans[0]
156+
require.Equal(t, "DASubmitter.SubmitData", span.Name())
157+
require.Equal(t, codes.Unset, span.Status().Code)
158+
159+
attrs := span.Attributes()
160+
testutil.RequireAttribute(t, attrs, "data.count", 2)
161+
testutil.RequireAttribute(t, attrs, "data.total_bytes", 20) // 10+10
162+
testutil.RequireAttribute(t, attrs, "data.start_height", int64(100))
163+
testutil.RequireAttribute(t, attrs, "data.end_height", int64(101))
164+
}
165+
166+
func TestTracedDASubmitter_SubmitData_Error(t *testing.T) {
167+
expectedErr := errors.New("data submission failed")
168+
mock := &mockDASubmitterAPI{
169+
submitDataFn: func(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error {
170+
return expectedErr
171+
},
172+
}
173+
submitter, sr := setupDASubmitterTrace(t, mock)
174+
ctx := context.Background()
175+
176+
signedDataList := []*types.SignedData{
177+
{Data: types.Data{Metadata: &types.Metadata{Height: 100}}},
178+
}
179+
marshalledData := [][]byte{[]byte("data1")}
180+
181+
err := submitter.SubmitData(ctx, signedDataList, marshalledData, nil, nil, genesis.Genesis{})
182+
require.Error(t, err)
183+
require.Equal(t, expectedErr, err)
184+
185+
spans := sr.Ended()
186+
require.Len(t, spans, 1)
187+
span := spans[0]
188+
require.Equal(t, codes.Error, span.Status().Code)
189+
require.Equal(t, expectedErr.Error(), span.Status().Description)
190+
}

block/internal/submitting/submitter.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ import (
2323
"github.com/evstack/ev-node/types"
2424
)
2525

26-
// daSubmitterAPI defines minimal methods needed by Submitter for DA submissions.
27-
type daSubmitterAPI interface {
26+
// DASubmitterAPI defines minimal methods needed by Submitter for DA submissions.
27+
type DASubmitterAPI interface {
2828
SubmitHeaders(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error
2929
SubmitData(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error
3030
}
@@ -43,7 +43,7 @@ type Submitter struct {
4343
metrics *common.Metrics
4444

4545
// DA submitter
46-
daSubmitter daSubmitterAPI
46+
daSubmitter DASubmitterAPI
4747

4848
// Optional signer (only for aggregator nodes)
4949
signer signer.Signer
@@ -80,7 +80,7 @@ func NewSubmitter(
8080
metrics *common.Metrics,
8181
config config.Config,
8282
genesis genesis.Genesis,
83-
daSubmitter daSubmitterAPI,
83+
daSubmitter DASubmitterAPI,
8484
sequencer coresequencer.Sequencer, // Can be nil for sync nodes
8585
signer signer.Signer, // Can be nil for sync nodes
8686
logger zerolog.Logger,

0 commit comments

Comments
 (0)