Skip to content

Commit 24880f5

Browse files
committed
Backports
1 parent 52825bf commit 24880f5

File tree

10 files changed

+542
-54
lines changed

10 files changed

+542
-54
lines changed

.mockery.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ packages:
3030
dir: ./test/mocks
3131
pkgname: mocks
3232
filename: store.go
33+
Batch:
34+
config:
35+
dir: ./test/mocks
36+
pkgname: mocks
37+
filename: batch.go
3338
github.com/celestiaorg/go-header:
3439
interfaces:
3540
Store:

block/internal/cache/pending_data.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package cache
22

33
import (
44
"context"
5+
"errors"
56

67
"github.com/rs/zerolog"
78

@@ -28,8 +29,17 @@ type PendingData struct {
2829
base *pendingBase[*types.Data]
2930
}
3031

32+
var errInFlightData = errors.New("inflight data")
33+
3134
func fetchData(ctx context.Context, store store.Store, height uint64) (*types.Data, error) {
3235
_, data, err := store.GetBlockData(ctx, height)
36+
if err != nil {
37+
return nil, err
38+
}
39+
// in the executor, WIP data is temporary stored. skip them until the process is completed
40+
if data.Height() == 0 {
41+
return nil, errInFlightData
42+
}
3343
return data, err
3444
}
3545

block/internal/cache/pending_headers.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package cache
22

33
import (
44
"context"
5+
"errors"
56

67
"github.com/rs/zerolog"
78

@@ -25,8 +26,17 @@ type PendingHeaders struct {
2526
base *pendingBase[*types.SignedHeader]
2627
}
2728

29+
var errInFlightHeader = errors.New("inflight header")
30+
2831
func fetchSignedHeader(ctx context.Context, store storepkg.Store, height uint64) (*types.SignedHeader, error) {
2932
header, err := store.GetHeader(ctx, height)
33+
if err != nil {
34+
return nil, err
35+
}
36+
// in the executor, WIP headers are temporary stored. skip them until the process is completed
37+
if header.Height() == 0 {
38+
return nil, errInFlightHeader
39+
}
3040
return header, err
3141
}
3242

block/internal/executing/executor.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,9 @@ func (e *Executor) initializeState() error {
227227
if err := batch.SetHeight(state.LastBlockHeight); err != nil {
228228
return fmt.Errorf("failed to set store height: %w", err)
229229
}
230+
if err := batch.UpdateState(state); err != nil {
231+
return fmt.Errorf("failed to update state: %w", err)
232+
}
230233
if err := batch.Commit(); err != nil {
231234
return fmt.Errorf("failed to commit batch: %w", err)
232235
}
@@ -236,7 +239,8 @@ func (e *Executor) initializeState() error {
236239

237240
// Sync execution layer with store on startup
238241
execReplayer := common.NewReplayer(e.store, e.exec, e.genesis, e.logger)
239-
if err := execReplayer.SyncToHeight(e.ctx, state.LastBlockHeight); err != nil {
242+
syncTargetHeight := state.LastBlockHeight
243+
if err := execReplayer.SyncToHeight(e.ctx, syncTargetHeight); err != nil {
240244
e.sendCriticalError(fmt.Errorf("failed to sync execution layer: %w", err))
241245
return fmt.Errorf("failed to sync execution layer: %w", err)
242246
}
@@ -281,7 +285,7 @@ func (e *Executor) executionLoop() {
281285
}
282286
txsAvailable := false
283287

284-
for {
288+
for e.ctx.Err() == nil {
285289
select {
286290
case <-e.ctx.Done():
287291
return
@@ -316,6 +320,10 @@ func (e *Executor) executionLoop() {
316320

317321
// ProduceBlock creates, validates, and stores a new block.
318322
func (e *Executor) ProduceBlock(ctx context.Context) error {
323+
if ctx.Err() != nil {
324+
return ctx.Err()
325+
}
326+
319327
start := time.Now()
320328
defer func() {
321329
if e.metrics.OperationDuration["block_production"] != nil {
@@ -582,7 +590,7 @@ func (e *Executor) CreateBlock(ctx context.Context, height uint64, batchData *Ba
582590
}
583591

584592
for i, tx := range batchData.Transactions {
585-
data.Txs[i] = types.Tx(tx)
593+
data.Txs[i] = tx
586594
}
587595

588596
// Set data hash

block/internal/submitting/submitter.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ func (s *Submitter) Start(ctx context.Context) error {
108108

109109
// Start DA submission loop if signer is available (aggregator nodes only)
110110
if s.signer != nil {
111+
s.logger.Info().Msg("starting DA submission loop")
111112
s.wg.Add(1)
112113
go func() {
113114
defer s.wg.Done()
@@ -130,7 +131,18 @@ func (s *Submitter) Stop() error {
130131
if s.cancel != nil {
131132
s.cancel()
132133
}
133-
s.wg.Wait()
134+
// Wait for goroutines to finish with a timeout to prevent hanging
135+
done := make(chan struct{})
136+
go func() {
137+
s.wg.Wait()
138+
close(done)
139+
}()
140+
select {
141+
case <-done:
142+
// All goroutines finished cleanly
143+
case <-time.After(5 * time.Second):
144+
s.logger.Warn().Msg("submitter shutdown timed out waiting for goroutines, proceeding anyway")
145+
}
134146
s.logger.Info().Msg("submitter stopped")
135147
return nil
136148
}
@@ -153,7 +165,9 @@ func (s *Submitter) daSubmissionLoop() {
153165
s.logger.Debug().Time("t", time.Now()).Uint64("headers", headersNb).Msg("Submitting headers")
154166
if s.headerSubmissionMtx.TryLock() {
155167
s.logger.Debug().Time("t", time.Now()).Uint64("headers", headersNb).Msg("Header submission in progress")
168+
s.wg.Add(1)
156169
go func() {
170+
defer s.wg.Done()
157171
defer func() {
158172
s.logger.Debug().Time("t", time.Now()).Uint64("headers", headersNb).Msg("Header submission completed")
159173
s.headerSubmissionMtx.Unlock()
@@ -177,7 +191,9 @@ func (s *Submitter) daSubmissionLoop() {
177191
s.logger.Debug().Time("t", time.Now()).Uint64("data", dataNb).Msg("Submitting data")
178192
if s.dataSubmissionMtx.TryLock() {
179193
s.logger.Debug().Time("t", time.Now()).Uint64("data", dataNb).Msg("Data submission in progress")
194+
s.wg.Add(1)
180195
go func() {
196+
defer s.wg.Done()
181197
defer func() {
182198
s.logger.Debug().Time("t", time.Now()).Uint64("data", dataNb).Msg("Data submission completed")
183199
s.dataSubmissionMtx.Unlock()

block/internal/syncing/assert.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package syncing
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
7+
"github.com/evstack/ev-node/pkg/genesis"
8+
"github.com/evstack/ev-node/types"
9+
)
10+
11+
func assertExpectedProposer(genesis genesis.Genesis, proposerAddr []byte) error {
12+
if string(proposerAddr) != string(genesis.ProposerAddress) {
13+
return fmt.Errorf("unexpected proposer: got %x, expected %x",
14+
proposerAddr, genesis.ProposerAddress)
15+
}
16+
return nil
17+
}
18+
19+
func assertValidSignedData(signedData *types.SignedData, genesis genesis.Genesis) error {
20+
if signedData == nil || signedData.Txs == nil {
21+
return errors.New("empty signed data")
22+
}
23+
24+
if err := assertExpectedProposer(genesis, signedData.Signer.Address); err != nil {
25+
return err
26+
}
27+
28+
dataBytes, err := signedData.Data.MarshalBinary()
29+
if err != nil {
30+
return fmt.Errorf("failed to get signed data payload: %w", err)
31+
}
32+
33+
valid, err := signedData.Signer.PubKey.Verify(dataBytes, signedData.Signature)
34+
if err != nil {
35+
return fmt.Errorf("failed to verify signature: %w", err)
36+
}
37+
38+
if !valid {
39+
return fmt.Errorf("invalid signature")
40+
}
41+
42+
return nil
43+
}

block/internal/syncing/da_retriever.go

Lines changed: 2 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -346,38 +346,12 @@ func (r *daRetriever) tryDecodeData(bz []byte, daHeight uint64) *types.Data {
346346

347347
// assertExpectedProposer validates the proposer address
348348
func (r *daRetriever) assertExpectedProposer(proposerAddr []byte) error {
349-
if string(proposerAddr) != string(r.genesis.ProposerAddress) {
350-
return fmt.Errorf("unexpected proposer: got %x, expected %x",
351-
proposerAddr, r.genesis.ProposerAddress)
352-
}
353-
return nil
349+
return assertExpectedProposer(r.genesis, proposerAddr)
354350
}
355351

356352
// assertValidSignedData validates signed data using the configured signature provider
357353
func (r *daRetriever) assertValidSignedData(signedData *types.SignedData) error {
358-
if signedData == nil || signedData.Txs == nil {
359-
return errors.New("empty signed data")
360-
}
361-
362-
if err := r.assertExpectedProposer(signedData.Signer.Address); err != nil {
363-
return err
364-
}
365-
366-
dataBytes, err := signedData.Data.MarshalBinary()
367-
if err != nil {
368-
return fmt.Errorf("failed to get signed data payload: %w", err)
369-
}
370-
371-
valid, err := signedData.Signer.PubKey.Verify(dataBytes, signedData.Signature)
372-
if err != nil {
373-
return fmt.Errorf("failed to verify signature: %w", err)
374-
}
375-
376-
if !valid {
377-
return fmt.Errorf("invalid signature")
378-
}
379-
380-
return nil
354+
return assertValidSignedData(signedData, r.genesis)
381355
}
382356

383357
// isEmptyDataExpected checks if empty data is expected for a header

0 commit comments

Comments
 (0)