Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
78b54f2
wip: adding tracing
chatton Jan 6, 2026
ae87a59
chore: only having the first tracing decorator
chatton Jan 6, 2026
fdd943b
chore: remove comment
chatton Jan 6, 2026
74d4a5f
deps: adding pin to genproto version
chatton Jan 6, 2026
d796589
chore: ensuring errors reported, adding unit tests
chatton Jan 7, 2026
d52f187
chore: add check to validate basic
chatton Jan 7, 2026
b2b3218
chore: modified default
chatton Jan 7, 2026
41bce54
chore: adding logging of possible error
chatton Jan 7, 2026
fd34073
chore: updated flag test
chatton Jan 7, 2026
fd7425a
chore: bump endpoint to correct port
chatton Jan 7, 2026
f30a577
wip: adding propagating client to engine and eth client
chatton Jan 7, 2026
570509b
chore: simplify construction of rpc opts
chatton Jan 7, 2026
caa0684
chore: address PR feedback
chatton Jan 7, 2026
c154f23
chore: ensure consistent propagation settings
chatton Jan 8, 2026
607f4a3
chore: adding interface for engine client and tracing implementation
chatton Jan 8, 2026
c5d7c41
chore: mrege main
chatton Jan 8, 2026
80e2b17
chore: refactored wiring to use bool
chatton Jan 8, 2026
07a45b6
chore: tidy all fix
chatton Jan 8, 2026
423bb15
chore: fix go mod conflicts
chatton Jan 8, 2026
3e373ce
chore: addressing PR feedback
chatton Jan 8, 2026
ed217d7
chore: adding eth client tracing
chatton Jan 8, 2026
17eb5aa
chore: merge main
chatton Jan 8, 2026
931d2ac
chore: add payload id as attribute
chatton Jan 8, 2026
ee2d158
chore: handle merge conflicts
chatton Jan 8, 2026
8d7fc84
Merge branch 'cian/add-tracing-part-3' into cian/add-tracing-part-4
chatton Jan 8, 2026
32db6c8
chore: merge main
chatton Jan 12, 2026
a3fa329
chore: adding tracing for DA client
chatton Jan 12, 2026
776a2ea
chore: add *.test to gitignore
chatton Jan 12, 2026
d3bdb1e
chore: updated test
chatton Jan 12, 2026
4d1d3ff
chore: adding hex encoded namespace
chatton Jan 13, 2026
ed675a8
feat: add Phase 1 RPC server tracing instrumentation
chatton Jan 13, 2026
b1a827e
chore: make tidy all
chatton Jan 13, 2026
30edc0c
chore: tidy all and add wrappers
chatton Jan 13, 2026
1ff13f3
chore: merge main
chatton Jan 13, 2026
9d726af
chore: merge part 5
chatton Jan 13, 2026
db319d2
chore: removed unused tracer
chatton Jan 13, 2026
2edad3e
chore: remove unused attribute
chatton Jan 13, 2026
b1aaa84
Merge branch 'main' into tracing-part-6
chatton Jan 14, 2026
3ab312c
chore: tidy all
chatton Jan 14, 2026
fecea22
Merge branch 'main' into tracing-part-6
chatton Jan 14, 2026
d63f852
chore: addressing PR feedback
chatton Jan 14, 2026
0bc03f0
chore: adding block production tracing
chatton Jan 14, 2026
b1246eb
chore: removing accidental file
chatton Jan 15, 2026
cb10d69
Merge branch 'main' into tracing-part-6
chatton Jan 15, 2026
dd7ad70
chore: remove da.test
chatton Jan 15, 2026
d5fde15
Merge branch 'tracing-part-6' into tracing-part-7
chatton Jan 15, 2026
8666164
chore: addressing PR feedback
chatton Jan 15, 2026
4577312
Merge branch 'main' into tracing-part-7
chatton Jan 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions block/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ func NewAggregatorComponents(
return nil, fmt.Errorf("failed to create executor: %w", err)
}

if config.Instrumentation.IsTracingEnabled() {
executor.SetBlockProducer(executing.WithTracingBlockProducer(executor))
}

reaper, err := reaping.NewReaper(
exec,
sequencer,
Expand Down
27 changes: 27 additions & 0 deletions block/internal/executing/block_producer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package executing

import (
"context"

"github.com/evstack/ev-node/types"
)

// BlockProducer defines operations for block production that can be traced.
// The Executor implements this interface, and a tracing decorator can wrap it
// to add OpenTelemetry spans to each operation.
type BlockProducer interface {
// ProduceBlock creates, validates, and broadcasts a new block.
ProduceBlock(ctx context.Context) error

// RetrieveBatch gets the next batch of transactions from the sequencer.
RetrieveBatch(ctx context.Context) (*BatchData, error)

// CreateBlock constructs a new block from the given batch data.
CreateBlock(ctx context.Context, height uint64, batchData *BatchData) (*types.SignedHeader, *types.Data, error)

// ApplyBlock executes the block transactions and returns the new state.
ApplyBlock(ctx context.Context, header types.Header, data *types.Data) (types.State, error)

// ValidateBlock validates block structure and state transitions.
ValidateBlock(ctx context.Context, lastState types.State, header *types.SignedHeader, data *types.Data) error
}
66 changes: 40 additions & 26 deletions block/internal/executing/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"github.com/evstack/ev-node/types"
)

var _ BlockProducer = (*Executor)(nil)

// Executor handles block production, transaction processing, and state management
type Executor struct {
// Core components
Expand Down Expand Up @@ -60,6 +62,10 @@ type Executor struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup

// blockProducer is the interface used for block production operations.
// defaults to self, but can be wrapped with tracing.
blockProducer BlockProducer
}

// NewExecutor creates a new block executor.
Expand Down Expand Up @@ -101,7 +107,7 @@ func NewExecutor(
}
}

return &Executor{
e := &Executor{
store: store,
exec: exec,
sequencer: sequencer,
Expand All @@ -117,7 +123,15 @@ func NewExecutor(
txNotifyCh: make(chan struct{}, 1),
errorCh: errorCh,
logger: logger.With().Str("component", "executor").Logger(),
}, nil
}
e.blockProducer = e
Copy link
Contributor Author

@chatton chatton Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a bit "weird" since the non tracing implemention of the interface is the Executor itself, but I didn't want to create one giant Executor interface, we can maybe do that in a followup where we have

type Executor interface {
BlockProducer
BlockSycner
....
}

and we don't need to do this, but for now it can keep the scope of the PR smaller.

return e, nil
}

// SetBlockProducer sets the block producer interface, allowing injection of
// a tracing wrapper or other decorator.
func (e *Executor) SetBlockProducer(bp BlockProducer) {
e.blockProducer = bp
}

// Start begins the execution component
Expand Down Expand Up @@ -279,7 +293,7 @@ func (e *Executor) executionLoop() {
continue
}

if err := e.produceBlock(); err != nil {
if err := e.blockProducer.ProduceBlock(e.ctx); err != nil {
e.logger.Error().Err(err).Msg("failed to produce block")
}
txsAvailable = false
Expand All @@ -288,7 +302,7 @@ func (e *Executor) executionLoop() {

case <-lazyTimerCh:
e.logger.Debug().Msg("Lazy timer triggered block production")
if err := e.produceBlock(); err != nil {
if err := e.blockProducer.ProduceBlock(e.ctx); err != nil {
e.logger.Error().Err(err).Msg("failed to produce block from lazy timer")
}
// Reset lazy timer
Expand All @@ -300,8 +314,8 @@ func (e *Executor) executionLoop() {
}
}

// produceBlock creates, validates, and stores a new block
func (e *Executor) produceBlock() error {
// ProduceBlock creates, validates, and stores a new block.
func (e *Executor) ProduceBlock(ctx context.Context) error {
start := time.Now()
defer func() {
if e.metrics.OperationDuration["block_production"] != nil {
Expand Down Expand Up @@ -338,7 +352,7 @@ func (e *Executor) produceBlock() error {

// Check if there's an already stored block at the newHeight
// If there is use that instead of creating a new block
pendingHeader, pendingData, err := e.store.GetBlockData(e.ctx, newHeight)
pendingHeader, pendingData, err := e.store.GetBlockData(ctx, newHeight)
if err == nil {
e.logger.Info().Uint64("height", newHeight).Msg("using pending block")
header = pendingHeader
Expand All @@ -347,7 +361,7 @@ func (e *Executor) produceBlock() error {
return fmt.Errorf("failed to get block data: %w", err)
} else {
// get batch from sequencer
batchData, err = e.retrieveBatch(e.ctx)
batchData, err = e.blockProducer.RetrieveBatch(ctx)
if errors.Is(err, common.ErrNoBatch) {
e.logger.Debug().Msg("no batch available")
return nil
Expand All @@ -357,13 +371,13 @@ func (e *Executor) produceBlock() error {
return fmt.Errorf("failed to retrieve batch: %w", err)
}

header, data, err = e.createBlock(e.ctx, newHeight, batchData)
header, data, err = e.blockProducer.CreateBlock(ctx, newHeight, batchData)
if err != nil {
return fmt.Errorf("failed to create block: %w", err)
}

// saved early for crash recovery, will be overwritten later with the final signature
batch, err := e.store.NewBatch(e.ctx)
batch, err := e.store.NewBatch(ctx)
if err != nil {
return fmt.Errorf("failed to create batch for early save: %w", err)
}
Expand All @@ -378,12 +392,12 @@ func (e *Executor) produceBlock() error {
// Pass force-included mask through context for execution optimization
// Force-included txs (from DA) MUST be validated as they're from untrusted sources
// Mempool txs can skip validation as they were validated when added to mempool
ctx := e.ctx
applyCtx := ctx
if batchData != nil && batchData.Batch != nil && batchData.ForceIncludedMask != nil {
ctx = coreexecutor.WithForceIncludedMask(ctx, batchData.ForceIncludedMask)
applyCtx = coreexecutor.WithForceIncludedMask(applyCtx, batchData.ForceIncludedMask)
}

newState, err := e.applyBlock(ctx, header.Header, data)
newState, err := e.blockProducer.ApplyBlock(applyCtx, header.Header, data)
if err != nil {
return fmt.Errorf("failed to apply block: %w", err)
}
Expand All @@ -400,13 +414,13 @@ func (e *Executor) produceBlock() error {
}
header.Signature = signature

if err := e.validateBlock(currentState, header, data); err != nil {
if err := e.blockProducer.ValidateBlock(ctx, currentState, header, data); err != nil {
e.sendCriticalError(fmt.Errorf("failed to validate block: %w", err))
e.logger.Error().Err(err).Msg("CRITICAL: Permanent block validation error - halting block production")
return fmt.Errorf("failed to validate block: %w", err)
}

batch, err := e.store.NewBatch(e.ctx)
batch, err := e.store.NewBatch(ctx)
if err != nil {
return fmt.Errorf("failed to create batch: %w", err)
}
Expand All @@ -431,9 +445,9 @@ func (e *Executor) produceBlock() error {
e.setLastState(newState)

// broadcast header and data to P2P network
g, ctx := errgroup.WithContext(e.ctx)
g.Go(func() error { return e.headerBroadcaster.WriteToStoreAndBroadcast(ctx, header) })
g.Go(func() error { return e.dataBroadcaster.WriteToStoreAndBroadcast(ctx, data) })
g, broadcastCtx := errgroup.WithContext(ctx)
g.Go(func() error { return e.headerBroadcaster.WriteToStoreAndBroadcast(broadcastCtx, header) })
g.Go(func() error { return e.dataBroadcaster.WriteToStoreAndBroadcast(broadcastCtx, data) })
if err := g.Wait(); err != nil {
e.logger.Error().Err(err).Msg("failed to broadcast header and/data")
// don't fail block production on broadcast error
Expand All @@ -449,8 +463,8 @@ func (e *Executor) produceBlock() error {
return nil
}

// retrieveBatch gets the next batch of transactions from the sequencer
func (e *Executor) retrieveBatch(ctx context.Context) (*BatchData, error) {
// RetrieveBatch gets the next batch of transactions from the sequencer.
func (e *Executor) RetrieveBatch(ctx context.Context) (*BatchData, error) {
req := coresequencer.GetNextBatchRequest{
Id: []byte(e.genesis.ChainID),
MaxBytes: common.DefaultMaxBlobSize,
Expand Down Expand Up @@ -481,8 +495,8 @@ func (e *Executor) retrieveBatch(ctx context.Context) (*BatchData, error) {
}, nil
}

// createBlock creates a new block from the given batch
func (e *Executor) createBlock(ctx context.Context, height uint64, batchData *BatchData) (*types.SignedHeader, *types.Data, error) {
// CreateBlock creates a new block from the given batch.
func (e *Executor) CreateBlock(ctx context.Context, height uint64, batchData *BatchData) (*types.SignedHeader, *types.Data, error) {
currentState := e.getLastState()
headerTime := uint64(e.genesis.StartTime.UnixNano())

Expand Down Expand Up @@ -581,8 +595,8 @@ func (e *Executor) createBlock(ctx context.Context, height uint64, batchData *Ba
return header, data, nil
}

// applyBlock applies the block to get the new state
func (e *Executor) applyBlock(ctx context.Context, header types.Header, data *types.Data) (types.State, error) {
// ApplyBlock applies the block to get the new state.
func (e *Executor) ApplyBlock(ctx context.Context, header types.Header, data *types.Data) (types.State, error) {
currentState := e.getLastState()

// Prepare transactions
Expand Down Expand Up @@ -654,8 +668,8 @@ func (e *Executor) executeTxsWithRetry(ctx context.Context, rawTxs [][]byte, hea
return nil, nil
}

// validateBlock validates the created block
func (e *Executor) validateBlock(lastState types.State, header *types.SignedHeader, data *types.Data) error {
// ValidateBlock validates the created block.
func (e *Executor) ValidateBlock(_ context.Context, lastState types.State, header *types.SignedHeader, data *types.Data) error {
// Set custom verifier for aggregator node signature
header.SetCustomVerifierForAggregator(e.options.AggregatorNodeSignatureBytesProvider)

Expand Down
8 changes: 4 additions & 4 deletions block/internal/executing/executor_lazy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ func TestLazyMode_ProduceBlockLogic(t *testing.T) {

mockSeq.EXPECT().GetDAHeight().Return(uint64(0)).Once()

// Direct call to produceBlock should work (this is what lazy timer does)
err = exec.produceBlock()
// Direct call to ProduceBlock should work (this is what lazy timer does)
err = exec.ProduceBlock(exec.ctx)
require.NoError(t, err)

h1, err := memStore.Height(context.Background())
Expand All @@ -118,7 +118,7 @@ func TestLazyMode_ProduceBlockLogic(t *testing.T) {

mockSeq.EXPECT().GetDAHeight().Return(uint64(0)).Once()

err = exec.produceBlock()
err = exec.ProduceBlock(exec.ctx)
require.NoError(t, err)

h2, err := memStore.Height(context.Background())
Expand Down Expand Up @@ -209,7 +209,7 @@ func TestRegularMode_ProduceBlockLogic(t *testing.T) {

mockSeq.EXPECT().GetDAHeight().Return(uint64(0)).Once()

err = exec.produceBlock()
err = exec.ProduceBlock(exec.ctx)
require.NoError(t, err)

h1, err := memStore.Height(context.Background())
Expand Down
8 changes: 4 additions & 4 deletions block/internal/executing/executor_logic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestProduceBlock_EmptyBatch_SetsEmptyDataHash(t *testing.T) {
mockSeq.EXPECT().GetDAHeight().Return(uint64(0)).Once()

// produce one block
err = exec.produceBlock()
err = exec.ProduceBlock(exec.ctx)
require.NoError(t, err)

// Verify height and stored block
Expand Down Expand Up @@ -202,14 +202,14 @@ func TestPendingLimit_SkipsProduction(t *testing.T) {

mockSeq.EXPECT().GetDAHeight().Return(uint64(0)).Once()

require.NoError(t, exec.produceBlock())
require.NoError(t, exec.ProduceBlock(exec.ctx))
h1, err := memStore.Height(context.Background())
require.NoError(t, err)
assert.Equal(t, uint64(1), h1)

// With limit=1 and lastSubmitted default 0, pending >= 1 so next production should be skipped
// No new expectations; produceBlock should return early before hitting sequencer
require.NoError(t, exec.produceBlock())
// No new expectations; ProduceBlock should return early before hitting sequencer
require.NoError(t, exec.ProduceBlock(exec.ctx))
h2, err := memStore.Height(context.Background())
require.NoError(t, err)
assert.Equal(t, h1, h2, "height should not change when production is skipped")
Expand Down
8 changes: 4 additions & 4 deletions block/internal/executing/executor_restart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestExecutor_RestartUsesPendingHeader(t *testing.T) {

mockSeq1.EXPECT().GetDAHeight().Return(uint64(0)).Once()

err = exec1.produceBlock()
err = exec1.ProduceBlock(exec1.ctx)
require.NoError(t, err)

// Verify first block was produced
Expand Down Expand Up @@ -214,7 +214,7 @@ func TestExecutor_RestartUsesPendingHeader(t *testing.T) {

// Note: mockSeq2 should NOT receive GetNextBatch calls because pending block should be used

err = exec2.produceBlock()
err = exec2.ProduceBlock(exec2.ctx)
require.NoError(t, err)

// Verify height advanced to 2
Expand Down Expand Up @@ -316,7 +316,7 @@ func TestExecutor_RestartNoPendingHeader(t *testing.T) {

mockSeq1.EXPECT().GetDAHeight().Return(uint64(0)).Once()

err = exec1.produceBlock()
err = exec1.ProduceBlock(exec1.ctx)
require.NoError(t, err)

// Stop first executor
Expand Down Expand Up @@ -372,7 +372,7 @@ func TestExecutor_RestartNoPendingHeader(t *testing.T) {

mockSeq2.EXPECT().GetDAHeight().Return(uint64(0)).Once()

err = exec2.produceBlock()
err = exec2.ProduceBlock(exec2.ctx)
require.NoError(t, err)

// Verify normal operation
Expand Down
Loading
Loading