Skip to content

Commit 7d30f97

Browse files
feat(submitting): add posting strategies (#2973)
* feat(submitting): add posting strategies * updates * don't use hardcoded da time * simplify * cleanups * updates * lint * fix race * remove double marshalling * bring back concurrent marhalling but in submitter * marshal in cache * lower max blob * updates * add cl * feedback * fix defaults * fix e2e * fix submit --------- Co-authored-by: Marko <marko@baricevic.me>
1 parent 52825bf commit 7d30f97

26 files changed

+924
-215
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1717
- Added `post-tx` command and force inclusion server to submit transaction directly to the DA layer. ([#2888](https://github.com/evstack/ev-node/pull/2888))
1818
Additionally, modified the core package to support marking transactions as forced included transactions.
1919
The execution client ought to perform basic validation on those transactions as they have skipped the execution client's mempool.
20+
- Add batching stategies (default stay time-based, unchanged with previous betas). Currently available strategies are `time`, `size`, `immediate` and `adaptive`.
2021

2122
### Changed
2223

block/internal/cache/bench_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func BenchmarkManager_GetPendingHeaders(b *testing.B) {
7272
b.ReportAllocs()
7373
b.ResetTimer()
7474
for b.Loop() {
75-
hs, err := m.GetPendingHeaders(ctx)
75+
hs, _, err := m.GetPendingHeaders(ctx)
7676
if err != nil {
7777
b.Fatal(err)
7878
}
@@ -93,7 +93,7 @@ func BenchmarkManager_GetPendingData(b *testing.B) {
9393
b.ReportAllocs()
9494
b.ResetTimer()
9595
for b.Loop() {
96-
ds, err := m.GetPendingData(ctx)
96+
ds, _, err := m.GetPendingData(ctx)
9797
if err != nil {
9898
b.Fatal(err)
9999
}

block/internal/cache/manager.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ type CacheManager interface {
7878

7979
// PendingManager provides operations for managing pending headers and data
8080
type PendingManager interface {
81-
GetPendingHeaders(ctx context.Context) ([]*types.SignedHeader, error)
82-
GetPendingData(ctx context.Context) ([]*types.SignedData, error)
81+
GetPendingHeaders(ctx context.Context) ([]*types.SignedHeader, [][]byte, error)
82+
GetPendingData(ctx context.Context) ([]*types.SignedData, [][]byte, error)
8383
SetLastSubmittedHeaderHeight(ctx context.Context, height uint64)
8484
SetLastSubmittedDataHeight(ctx context.Context, height uint64)
8585
NumPendingHeaders() uint64
@@ -318,20 +318,21 @@ func (m *implementation) DeleteHeight(blockHeight uint64) {
318318
}
319319

320320
// Pending operations
321-
func (m *implementation) GetPendingHeaders(ctx context.Context) ([]*types.SignedHeader, error) {
321+
func (m *implementation) GetPendingHeaders(ctx context.Context) ([]*types.SignedHeader, [][]byte, error) {
322322
return m.pendingHeaders.GetPendingHeaders(ctx)
323323
}
324324

325-
func (m *implementation) GetPendingData(ctx context.Context) ([]*types.SignedData, error) {
326-
// Get pending raw data
327-
dataList, err := m.pendingData.GetPendingData(ctx)
325+
func (m *implementation) GetPendingData(ctx context.Context) ([]*types.SignedData, [][]byte, error) {
326+
// Get pending raw data with marshalled bytes
327+
dataList, marshalledData, err := m.pendingData.GetPendingData(ctx)
328328
if err != nil {
329-
return nil, err
329+
return nil, nil, err
330330
}
331331

332332
// Convert to SignedData (this logic was in manager.go)
333333
signedDataList := make([]*types.SignedData, 0, len(dataList))
334-
for _, data := range dataList {
334+
marshalledSignedData := make([][]byte, 0, len(dataList))
335+
for i, data := range dataList {
335336
if len(data.Txs) == 0 {
336337
continue // Skip empty data
337338
}
@@ -342,9 +343,10 @@ func (m *implementation) GetPendingData(ctx context.Context) ([]*types.SignedDat
342343
Data: *data,
343344
// Signature and Signer will be set by executing component
344345
})
346+
marshalledSignedData = append(marshalledSignedData, marshalledData[i])
345347
}
346348

347-
return signedDataList, nil
349+
return signedDataList, marshalledSignedData, nil
348350
}
349351

350352
func (m *implementation) SetLastSubmittedHeaderHeight(ctx context.Context, height uint64) {

block/internal/cache/manager_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -183,14 +183,14 @@ func TestPendingHeadersAndData_Flow(t *testing.T) {
183183
require.NoError(t, err)
184184

185185
// headers: all 3 should be pending initially
186-
headers, err := cm.GetPendingHeaders(ctx)
186+
headers, _, err := cm.GetPendingHeaders(ctx)
187187
require.NoError(t, err)
188188
require.Len(t, headers, 3)
189189
assert.Equal(t, uint64(1), headers[0].Height())
190190
assert.Equal(t, uint64(3), headers[2].Height())
191191

192192
// data: empty one filtered, so 2 and 3 only
193-
signedData, err := cm.GetPendingData(ctx)
193+
signedData, _, err := cm.GetPendingData(ctx)
194194
require.NoError(t, err)
195195
require.Len(t, signedData, 2)
196196
assert.Equal(t, uint64(2), signedData[0].Height())
@@ -200,12 +200,12 @@ func TestPendingHeadersAndData_Flow(t *testing.T) {
200200
cm.SetLastSubmittedHeaderHeight(ctx, 1)
201201
cm.SetLastSubmittedDataHeight(ctx, 2)
202202

203-
headers, err = cm.GetPendingHeaders(ctx)
203+
headers, _, err = cm.GetPendingHeaders(ctx)
204204
require.NoError(t, err)
205205
require.Len(t, headers, 2)
206206
assert.Equal(t, uint64(2), headers[0].Height())
207207

208-
signedData, err = cm.GetPendingData(ctx)
208+
signedData, _, err = cm.GetPendingData(ctx)
209209
require.NoError(t, err)
210210
require.Len(t, signedData, 1)
211211
assert.Equal(t, uint64(3), signedData[0].Height())

block/internal/cache/pending_base.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/binary"
66
"errors"
77
"fmt"
8+
"sync"
89
"sync/atomic"
910

1011
ds "github.com/ipfs/go-datastore"
@@ -22,6 +23,9 @@ type pendingBase[T any] struct {
2223
metaKey string
2324
fetch func(ctx context.Context, store store.Store, height uint64) (T, error)
2425
lastHeight atomic.Uint64
26+
27+
// Marshalling cache to avoid redundant marshalling
28+
marshalledCache sync.Map // key: uint64 (height), value: []byte
2529
}
2630

2731
// newPendingBase constructs a new pendingBase for a given type.
@@ -80,6 +84,9 @@ func (pb *pendingBase[T]) setLastSubmittedHeight(ctx context.Context, newLastSub
8084
if err != nil {
8185
pb.logger.Error().Err(err).Msg("failed to store height of latest item submitted to DA")
8286
}
87+
88+
// Clear marshalled cache for submitted heights
89+
pb.clearMarshalledCacheUpTo(newLastSubmittedHeight)
8390
}
8491
}
8592

@@ -101,3 +108,26 @@ func (pb *pendingBase[T]) init() error {
101108
pb.lastHeight.CompareAndSwap(0, lsh)
102109
return nil
103110
}
111+
112+
// getMarshalledForHeight returns cached marshalled bytes for a height, or nil if not cached
113+
func (pb *pendingBase[T]) getMarshalledForHeight(height uint64) []byte {
114+
if val, ok := pb.marshalledCache.Load(height); ok {
115+
return val.([]byte)
116+
}
117+
return nil
118+
}
119+
120+
// setMarshalledForHeight caches marshalled bytes for a height
121+
func (pb *pendingBase[T]) setMarshalledForHeight(height uint64, marshalled []byte) {
122+
pb.marshalledCache.Store(height, marshalled)
123+
}
124+
125+
// clearMarshalledCacheUpTo removes cached marshalled bytes up to and including the given height
126+
func (pb *pendingBase[T]) clearMarshalledCacheUpTo(height uint64) {
127+
pb.marshalledCache.Range(func(key, _ any) bool {
128+
if h := key.(uint64); h <= height {
129+
pb.marshalledCache.Delete(h)
130+
}
131+
return true
132+
})
133+
}

block/internal/cache/pending_base_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func TestPendingBase_ErrorConditions(t *testing.T) {
3535
// ensure store height stays lower (0)
3636
ph, err := NewPendingHeaders(st, logger)
3737
require.NoError(t, err)
38-
pending, err := ph.GetPendingHeaders(ctx)
38+
pending, _, err := ph.GetPendingHeaders(ctx)
3939
assert.Error(t, err)
4040
assert.Len(t, pending, 0)
4141

block/internal/cache/pending_data.go

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package cache
22

33
import (
44
"context"
5+
"fmt"
56

67
"github.com/rs/zerolog"
78

@@ -46,9 +47,42 @@ func (pd *PendingData) init() error {
4647
return pd.base.init()
4748
}
4849

49-
// GetPendingData returns a sorted slice of pending Data.
50-
func (pd *PendingData) GetPendingData(ctx context.Context) ([]*types.Data, error) {
51-
return pd.base.getPending(ctx)
50+
// GetPendingData returns a sorted slice of pending Data along with their marshalled bytes.
51+
// It uses an internal cache to avoid re-marshalling data on subsequent calls.
52+
func (pd *PendingData) GetPendingData(ctx context.Context) ([]*types.Data, [][]byte, error) {
53+
dataList, err := pd.base.getPending(ctx)
54+
if err != nil {
55+
return nil, nil, err
56+
}
57+
58+
if len(dataList) == 0 {
59+
return nil, nil, nil
60+
}
61+
62+
marshalled := make([][]byte, len(dataList))
63+
lastSubmitted := pd.base.lastHeight.Load()
64+
65+
for i, data := range dataList {
66+
height := lastSubmitted + uint64(i) + 1
67+
68+
// Try to get from cache first
69+
if cached := pd.base.getMarshalledForHeight(height); cached != nil {
70+
marshalled[i] = cached
71+
continue
72+
}
73+
74+
// Marshal if not in cache
75+
dataBytes, err := data.MarshalBinary()
76+
if err != nil {
77+
return nil, nil, fmt.Errorf("failed to marshal data at height %d: %w", height, err)
78+
}
79+
marshalled[i] = dataBytes
80+
81+
// Store in cache
82+
pd.base.setMarshalledForHeight(height, dataBytes)
83+
}
84+
85+
return dataList, marshalled, nil
5286
}
5387

5488
func (pd *PendingData) NumPendingData() uint64 {

block/internal/cache/pending_data_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func TestPendingData_BasicFlow(t *testing.T) {
3939

4040
// initially all 3 data items are pending, incl. empty
4141
require.Equal(t, uint64(3), pendingData.NumPendingData())
42-
pendingDataList, err := pendingData.GetPendingData(ctx)
42+
pendingDataList, _, err := pendingData.GetPendingData(ctx)
4343
require.NoError(t, err)
4444
require.Len(t, pendingDataList, 3)
4545
require.Equal(t, uint64(1), pendingDataList[0].Height())
@@ -53,7 +53,7 @@ func TestPendingData_BasicFlow(t *testing.T) {
5353
require.Equal(t, uint64(1), binary.LittleEndian.Uint64(metadataRaw))
5454

5555
require.Equal(t, uint64(2), pendingData.NumPendingData())
56-
pendingDataList, err = pendingData.GetPendingData(ctx)
56+
pendingDataList, _, err = pendingData.GetPendingData(ctx)
5757
require.NoError(t, err)
5858
require.Len(t, pendingDataList, 2)
5959
require.Equal(t, uint64(2), pendingDataList[0].Height())
@@ -97,7 +97,7 @@ func TestPendingData_GetPending_PropagatesFetchError(t *testing.T) {
9797
require.NoError(t, err)
9898

9999
// fetching pending should propagate the not-found error from store
100-
pending, err := pendingData.GetPendingData(ctx)
100+
pending, _, err := pendingData.GetPendingData(ctx)
101101
require.Error(t, err)
102102
require.Empty(t, pending)
103103
}

block/internal/cache/pending_headers.go

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package cache
22

33
import (
44
"context"
5+
"fmt"
56

67
"github.com/rs/zerolog"
78

@@ -39,9 +40,42 @@ func NewPendingHeaders(store storepkg.Store, logger zerolog.Logger) (*PendingHea
3940
return &PendingHeaders{base: base}, nil
4041
}
4142

42-
// GetPendingHeaders returns a sorted slice of pending headers.
43-
func (ph *PendingHeaders) GetPendingHeaders(ctx context.Context) ([]*types.SignedHeader, error) {
44-
return ph.base.getPending(ctx)
43+
// GetPendingHeaders returns a sorted slice of pending headers along with their marshalled bytes.
44+
// It uses an internal cache to avoid re-marshalling headers on subsequent calls.
45+
func (ph *PendingHeaders) GetPendingHeaders(ctx context.Context) ([]*types.SignedHeader, [][]byte, error) {
46+
headers, err := ph.base.getPending(ctx)
47+
if err != nil {
48+
return nil, nil, err
49+
}
50+
51+
if len(headers) == 0 {
52+
return nil, nil, nil
53+
}
54+
55+
marshalled := make([][]byte, len(headers))
56+
lastSubmitted := ph.base.lastHeight.Load()
57+
58+
for i, header := range headers {
59+
height := lastSubmitted + uint64(i) + 1
60+
61+
// Try to get from cache first
62+
if cached := ph.base.getMarshalledForHeight(height); cached != nil {
63+
marshalled[i] = cached
64+
continue
65+
}
66+
67+
// Marshal if not in cache
68+
data, err := header.MarshalBinary()
69+
if err != nil {
70+
return nil, nil, fmt.Errorf("failed to marshal header at height %d: %w", height, err)
71+
}
72+
marshalled[i] = data
73+
74+
// Store in cache
75+
ph.base.setMarshalledForHeight(height, data)
76+
}
77+
78+
return headers, marshalled, nil
4579
}
4680

4781
func (ph *PendingHeaders) NumPendingHeaders() uint64 {

block/internal/cache/pending_headers_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func TestPendingHeaders_BasicFlow(t *testing.T) {
3939

4040
// initially all three are pending
4141
require.Equal(t, uint64(3), pendingHeaders.NumPendingHeaders())
42-
headers, err := pendingHeaders.GetPendingHeaders(ctx)
42+
headers, _, err := pendingHeaders.GetPendingHeaders(ctx)
4343
require.NoError(t, err)
4444
require.Len(t, headers, 3)
4545
require.Equal(t, uint64(1), headers[0].Height())
@@ -53,7 +53,7 @@ func TestPendingHeaders_BasicFlow(t *testing.T) {
5353
require.Equal(t, uint64(2), binary.LittleEndian.Uint64(metadataRaw))
5454

5555
require.Equal(t, uint64(1), pendingHeaders.NumPendingHeaders())
56-
headers, err = pendingHeaders.GetPendingHeaders(ctx)
56+
headers, _, err = pendingHeaders.GetPendingHeaders(ctx)
5757
require.NoError(t, err)
5858
require.Len(t, headers, 1)
5959
require.Equal(t, uint64(3), headers[0].Height())
@@ -82,7 +82,7 @@ func TestPendingHeaders_EmptyWhenUpToDate(t *testing.T) {
8282
// set last submitted to the current height, so nothing pending
8383
pendingHeaders.SetLastSubmittedHeaderHeight(ctx, 1)
8484
require.Equal(t, uint64(0), pendingHeaders.NumPendingHeaders())
85-
headers, err := pendingHeaders.GetPendingHeaders(ctx)
85+
headers, _, err := pendingHeaders.GetPendingHeaders(ctx)
8686
require.NoError(t, err)
8787
require.Empty(t, headers)
8888
}

0 commit comments

Comments
 (0)