Skip to content

Commit 2da9bbe

Browse files
committed
Merge branch 'main' into watermark-evmrpc
2 parents 055d08e + e8219bb commit 2da9bbe

16 files changed

Lines changed: 287 additions & 104 deletions

evmrpc/block.go

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,16 @@ type BlockAPI struct {
4444
includeShellReceipts bool
4545
includeBankTransfers bool
4646
watermarks *WatermarkManager
47+
globalBlockCache BlockCache
48+
cacheCreationMutex *sync.Mutex
4749
}
4850

4951
type SeiBlockAPI struct {
5052
*BlockAPI
5153
isPanicTx func(ctx context.Context, hash common.Hash) (bool, error)
5254
}
5355

54-
func NewBlockAPI(tmClient rpcclient.Client, k *keeper.Keeper, ctxProvider func(int64) sdk.Context, txConfigProvider func(int64) client.TxConfig, connectionType ConnectionType, watermarks *WatermarkManager) *BlockAPI {
56+
func NewBlockAPI(tmClient rpcclient.Client, k *keeper.Keeper, ctxProvider func(int64) sdk.Context, txConfigProvider func(int64) client.TxConfig, connectionType ConnectionType, watermarks *WatermarkManager, globalBlockCache BlockCache, cacheCreationMutex *sync.Mutex) *BlockAPI {
5557
return &BlockAPI{
5658
tmClient: tmClient,
5759
keeper: k,
@@ -62,6 +64,8 @@ func NewBlockAPI(tmClient rpcclient.Client, k *keeper.Keeper, ctxProvider func(i
6264
includeBankTransfers: false,
6365
namespace: EthNamespace,
6466
watermarks: watermarks,
67+
globalBlockCache: globalBlockCache,
68+
cacheCreationMutex: cacheCreationMutex,
6569
}
6670
}
6771

@@ -73,6 +77,8 @@ func NewSeiBlockAPI(
7377
connectionType ConnectionType,
7478
isPanicTx func(ctx context.Context, hash common.Hash) (bool, error),
7579
watermarks *WatermarkManager,
80+
globalBlockCache BlockCache,
81+
cacheCreationMutex *sync.Mutex,
7682
) *SeiBlockAPI {
7783
blockAPI := &BlockAPI{
7884
tmClient: tmClient,
@@ -84,6 +90,8 @@ func NewSeiBlockAPI(
8490
includeBankTransfers: false,
8591
namespace: SeiNamespace,
8692
watermarks: watermarks,
93+
globalBlockCache: globalBlockCache,
94+
cacheCreationMutex: cacheCreationMutex,
8795
}
8896
return &SeiBlockAPI{
8997
BlockAPI: blockAPI,
@@ -99,8 +107,10 @@ func NewSei2BlockAPI(
99107
connectionType ConnectionType,
100108
isPanicTx func(ctx context.Context, hash common.Hash) (bool, error),
101109
watermarks *WatermarkManager,
110+
globalBlockCache BlockCache,
111+
cacheCreationMutex *sync.Mutex,
102112
) *SeiBlockAPI {
103-
blockAPI := NewSeiBlockAPI(tmClient, k, ctxProvider, txConfigProvider, connectionType, isPanicTx, watermarks)
113+
blockAPI := NewSeiBlockAPI(tmClient, k, ctxProvider, txConfigProvider, connectionType, isPanicTx, watermarks, globalBlockCache, cacheCreationMutex)
104114
blockAPI.namespace = Sei2Namespace
105115
blockAPI.includeBankTransfers = true
106116
return blockAPI
@@ -163,7 +173,7 @@ func (a *BlockAPI) getBlockByHash(ctx context.Context, blockHash common.Hash, fu
163173
if err != nil {
164174
return nil, err
165175
}
166-
return EncodeTmBlock(a.ctxProvider, a.txConfigProvider, block, blockRes, a.keeper, fullTx, a.includeBankTransfers, includeSyntheticTxs, isPanicTx)
176+
return EncodeTmBlock(a.ctxProvider, a.txConfigProvider, block, blockRes, a.keeper, fullTx, a.includeBankTransfers, includeSyntheticTxs, isPanicTx, a.globalBlockCache, a.cacheCreationMutex)
167177
}
168178

169179
func (a *BlockAPI) GetBlockByNumber(ctx context.Context, number rpc.BlockNumber, fullTx bool) (result map[string]interface{}, returnErr error) {
@@ -225,7 +235,7 @@ func (a *BlockAPI) getBlockByNumber(
225235
if err != nil {
226236
return nil, err
227237
}
228-
return EncodeTmBlock(a.ctxProvider, a.txConfigProvider, block, blockRes, a.keeper, fullTx, a.includeBankTransfers, includeSyntheticTxs, isPanicTx)
238+
return EncodeTmBlock(a.ctxProvider, a.txConfigProvider, block, blockRes, a.keeper, fullTx, a.includeBankTransfers, includeSyntheticTxs, isPanicTx, a.globalBlockCache, a.cacheCreationMutex)
229239
}
230240

231241
func (a *BlockAPI) GetBlockReceipts(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (result []map[string]interface{}, returnErr error) {
@@ -244,7 +254,9 @@ func (a *BlockAPI) GetBlockReceipts(ctx context.Context, blockNrOrHash rpc.Block
244254

245255
// Get all tx hashes for the block
246256
height := block.Block.Height
247-
txHashes := getTxHashesFromBlock(a.ctxProvider, a.txConfigProvider, a.keeper, block, shouldIncludeSynthetic(a.namespace))
257+
258+
txHashes := getTxHashesFromBlock(a.ctxProvider, a.txConfigProvider, a.keeper, block, shouldIncludeSynthetic(a.namespace), a.cacheCreationMutex, a.globalBlockCache)
259+
248260
// Get tx receipts for all hashes in parallel
249261
wg := sync.WaitGroup{}
250262
mtx := sync.Mutex{}
@@ -263,7 +275,7 @@ func (a *BlockAPI) GetBlockReceipts(ctx context.Context, blockNrOrHash rpc.Block
263275
mtx.Unlock()
264276
}
265277
} else {
266-
encodedReceipt, err := encodeReceipt(a.ctxProvider, a.txConfigProvider, receipt, a.keeper, block, a.includeShellReceipts)
278+
encodedReceipt, err := encodeReceipt(a.ctxProvider, a.txConfigProvider, receipt, a.keeper, block, a.includeShellReceipts, a.globalBlockCache, a.cacheCreationMutex)
267279
if err != nil {
268280
mtx.Lock()
269281
returnErr = err
@@ -299,6 +311,8 @@ func EncodeTmBlock(
299311
includeBankTransfers bool,
300312
includeSyntheticTxs bool,
301313
isPanicOrSynthetic func(ctx context.Context, hash common.Hash) (bool, error),
314+
globalBlockCache BlockCache,
315+
cacheCreationMutex *sync.Mutex,
302316
) (map[string]interface{}, error) {
303317
number := big.NewInt(block.Block.Height)
304318
blockhash := common.HexToHash(block.BlockID.Hash.String())
@@ -320,7 +334,7 @@ func EncodeTmBlock(
320334
transactions := []interface{}{}
321335
latestCtx := ctxProvider(LatestCtxHeight)
322336

323-
msgs := filterTransactions(k, ctxProvider, txConfigProvider, block, includeSyntheticTxs, includeBankTransfers)
337+
msgs := filterTransactions(k, ctxProvider, txConfigProvider, block, includeSyntheticTxs, includeBankTransfers, cacheCreationMutex, globalBlockCache)
324338

325339
blockBloom := make([]byte, ethtypes.BloomByteLength)
326340
for _, msg := range msgs {

evmrpc/block_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package evmrpc_test
33
import (
44
"crypto/sha256"
55
"math/big"
6+
"sync"
67
"testing"
78
"time"
89

@@ -49,7 +50,7 @@ func TestEncodeTmBlock_EmptyTransactions(t *testing.T) {
4950
}
5051

5152
// Call EncodeTmBlock with empty transactions
52-
result, err := evmrpc.EncodeTmBlock(func(i int64) sdk.Context { return ctx }, func(i int64) client.TxConfig { return TxConfig }, block, blockRes, k, true, false, false, nil)
53+
result, err := evmrpc.EncodeTmBlock(func(i int64) sdk.Context { return ctx }, func(i int64) client.TxConfig { return TxConfig }, block, blockRes, k, true, false, false, nil, evmrpc.NewBlockCache(3000), &sync.Mutex{})
5354
require.Nil(t, err)
5455

5556
// Assert txHash is equal to ethtypes.EmptyTxsHash
@@ -95,7 +96,7 @@ func TestEncodeBankMsg(t *testing.T) {
9596
},
9697
},
9798
}
98-
res, err := evmrpc.EncodeTmBlock(func(i int64) sdk.Context { return ctx }, func(i int64) client.TxConfig { return TxConfig }, &resBlock, &resBlockRes, k, true, false, false, nil)
99+
res, err := evmrpc.EncodeTmBlock(func(i int64) sdk.Context { return ctx }, func(i int64) client.TxConfig { return TxConfig }, &resBlock, &resBlockRes, k, true, false, false, nil, evmrpc.NewBlockCache(3000), &sync.Mutex{})
99100
require.Nil(t, err)
100101
txs := res["transactions"].([]interface{})
101102
require.Equal(t, 0, len(txs))
@@ -143,7 +144,7 @@ func TestEncodeWasmExecuteMsg(t *testing.T) {
143144
},
144145
},
145146
}
146-
res, err := evmrpc.EncodeTmBlock(func(i int64) sdk.Context { return ctx }, func(i int64) client.TxConfig { return TxConfig }, &resBlock, &resBlockRes, k, true, false, true, nil)
147+
res, err := evmrpc.EncodeTmBlock(func(i int64) sdk.Context { return ctx }, func(i int64) client.TxConfig { return TxConfig }, &resBlock, &resBlockRes, k, true, false, true, nil, evmrpc.NewBlockCache(3000), &sync.Mutex{})
147148
require.Nil(t, err)
148149
txs := res["transactions"].([]interface{})
149150
require.Equal(t, 1, len(txs))
@@ -204,7 +205,7 @@ func TestEncodeBankTransferMsg(t *testing.T) {
204205
},
205206
},
206207
}
207-
res, err := evmrpc.EncodeTmBlock(func(i int64) sdk.Context { return ctx }, func(i int64) client.TxConfig { return TxConfig }, &resBlock, &resBlockRes, k, true, true, false, nil)
208+
res, err := evmrpc.EncodeTmBlock(func(i int64) sdk.Context { return ctx }, func(i int64) client.TxConfig { return TxConfig }, &resBlock, &resBlockRes, k, true, true, false, nil, evmrpc.NewBlockCache(3000), &sync.Mutex{})
208209
require.Nil(t, err)
209210
txs := res["transactions"].([]interface{})
210211
require.Equal(t, 1, len(txs))

evmrpc/filter.go

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,20 @@ func getCachedReceipt(globalBlockCache BlockCache, blockHeight int64, txHash com
6868
return nil, false
6969
}
7070

71+
func getOrSetCachedReceipt(cacheCreationMutex *sync.Mutex, globalBlockCache BlockCache, ctx sdk.Context, k *keeper.Keeper, block *coretypes.ResultBlock, txHash common.Hash) (*evmtypes.Receipt, bool) {
72+
blockHeight := block.Block.Height
73+
receipt, found := getCachedReceipt(globalBlockCache, blockHeight, txHash)
74+
if found {
75+
return receipt, true
76+
}
77+
receipt, err := k.GetReceipt(ctx, txHash)
78+
if err != nil {
79+
return nil, false
80+
}
81+
setCachedReceipt(cacheCreationMutex, globalBlockCache, blockHeight, block, txHash, receipt)
82+
return receipt, true
83+
}
84+
7185
// LoadOrStore ensures atomic cache entry creation (like sync.Map.LoadOrStore)
7286
func loadOrStoreCacheEntry(cacheCreationMutex *sync.Mutex, globalBlockCache BlockCache, blockHeight int64, block *coretypes.ResultBlock) *BlockCacheEntry {
7387
// Fast path: try to get existing entry
@@ -261,6 +275,7 @@ func NewFilterAPI(
261275
namespace string,
262276
dbReadSemaphore chan struct{},
263277
globalBlockCache BlockCache,
278+
cacheCreationMutex *sync.Mutex,
264279
globalLogSlicePool *LogSlicePool,
265280
watermarks *WatermarkManager,
266281
) *FilterAPI {
@@ -281,6 +296,7 @@ func NewFilterAPI(
281296
includeSyntheticReceipts: shouldIncludeSynthetic(namespace),
282297
dbReadSemaphore: dbReadSemaphore,
283298
globalBlockCache: globalBlockCache,
299+
cacheCreationMutex: cacheCreationMutex,
284300
globalLogSlicePool: globalLogSlicePool,
285301
watermarks: watermarks,
286302
}
@@ -643,7 +659,7 @@ type LogFetcher struct {
643659
includeSyntheticReceipts bool
644660
dbReadSemaphore chan struct{}
645661
globalBlockCache BlockCache
646-
cacheCreationMutex sync.Mutex
662+
cacheCreationMutex *sync.Mutex
647663
globalLogSlicePool *LogSlicePool
648664
watermarks *WatermarkManager
649665
}
@@ -865,15 +881,11 @@ func (f *LogFetcher) collectLogs(block *coretypes.ResultBlock, crit filters.Filt
865881
totalLogs := uint(0)
866882
evmTxIndex := 0
867883

868-
for _, hash := range getTxHashesFromBlock(f.ctxProvider, f.txConfigProvider, f.k, block, f.includeSyntheticReceipts) {
869-
receipt, found := getCachedReceipt(f.globalBlockCache, block.Block.Height, hash.hash)
884+
for _, hash := range getTxHashesFromBlock(f.ctxProvider, f.txConfigProvider, f.k, block, f.includeSyntheticReceipts, f.cacheCreationMutex, f.globalBlockCache) {
885+
receipt, found := getOrSetCachedReceipt(f.cacheCreationMutex, f.globalBlockCache, ctx, f.k, block, hash.hash)
870886
if !found {
871-
var err error
872-
receipt, err = f.k.GetReceipt(ctx, hash.hash)
873-
if err != nil {
874-
continue
875-
}
876-
setCachedReceipt(&f.cacheCreationMutex, f.globalBlockCache, block.Block.Height, block, hash.hash, receipt)
887+
ctx.Logger().Error(fmt.Sprintf("collectLogs: unable to find receipt for hash %s", hash.hash.Hex()))
888+
continue
877889
}
878890

879891
txLogs := keeper.GetLogsForTx(receipt, totalLogs)
@@ -1073,7 +1085,7 @@ func (f *LogFetcher) processBatch(ctx context.Context, start, end int64, crit fi
10731085
}
10741086

10751087
// Use LoadOrStore to create/get cache entry atomically
1076-
entry := loadOrStoreCacheEntry(&f.cacheCreationMutex, f.globalBlockCache, height, block)
1088+
entry := loadOrStoreCacheEntry(f.cacheCreationMutex, f.globalBlockCache, height, block)
10771089
// Fill bloom if we have it and it's missing
10781090
if blockBloom != (ethtypes.Bloom{}) {
10791091
fillMissingFields(entry, block, blockBloom)

evmrpc/height_availability_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func TestBlockAPIEnsureHeightUnavailable(t *testing.T) {
9292
highHeight := latest + 5
9393
client := newHeightTestClient(highHeight, earliest, latest)
9494
watermarks := NewWatermarkManager(client, testCtxProvider, nil, nil)
95-
api := NewBlockAPI(client, nil, testCtxProvider, testTxConfigProvider, ConnectionTypeHTTP, watermarks)
95+
api := NewBlockAPI(client, nil, testCtxProvider, testTxConfigProvider, ConnectionTypeHTTP, watermarks, nil, nil)
9696

9797
_, err := api.GetBlockByHash(context.Background(), common.HexToHash(highBlockHashHex), false)
9898
require.Error(t, err)

evmrpc/send.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"sync"
78
"time"
89

910
"github.com/cosmos/cosmos-sdk/baseapp"
@@ -38,16 +39,28 @@ type SendConfig struct {
3839
slow bool
3940
}
4041

41-
func NewSendAPI(tmClient rpcclient.Client, txConfigProvider func(int64) client.TxConfig, sendConfig *SendConfig, k *keeper.Keeper, ctxProvider func(int64) sdk.Context, homeDir string, simulateConfig *SimulateConfig, app *baseapp.BaseApp,
42-
antehandler sdk.AnteHandler, connectionType ConnectionType) *SendAPI {
42+
func NewSendAPI(
43+
tmClient rpcclient.Client,
44+
txConfigProvider func(int64) client.TxConfig,
45+
sendConfig *SendConfig,
46+
k *keeper.Keeper,
47+
ctxProvider func(int64) sdk.Context,
48+
homeDir string,
49+
simulateConfig *SimulateConfig,
50+
app *baseapp.BaseApp,
51+
antehandler sdk.AnteHandler,
52+
connectionType ConnectionType,
53+
globalBlockCache BlockCache,
54+
cacheCreationMutex *sync.Mutex,
55+
) *SendAPI {
4356
return &SendAPI{
4457
tmClient: tmClient,
4558
txConfigProvider: txConfigProvider,
4659
sendConfig: sendConfig,
4760
keeper: k,
4861
ctxProvider: ctxProvider,
4962
homeDir: homeDir,
50-
backend: NewBackend(ctxProvider, k, txConfigProvider, tmClient, simulateConfig, app, antehandler),
63+
backend: NewBackend(ctxProvider, k, txConfigProvider, tmClient, simulateConfig, app, antehandler, globalBlockCache, cacheCreationMutex),
5164
connectionType: connectionType,
5265
}
5366
}

0 commit comments

Comments
 (0)