Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions sei-cosmos/storev2/rootmulti/flatkv_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"github.com/sei-protocol/sei-chain/sei-cosmos/store/types"
"github.com/sei-protocol/sei-chain/sei-db/common/utils"
seidbconfig "github.com/sei-protocol/sei-chain/sei-db/config"
"github.com/sei-protocol/sei-chain/sei-db/proto"
"github.com/sei-protocol/sei-chain/sei-db/state_db/sc/flatkv"
"github.com/sei-protocol/sei-chain/sei-db/state_db/sc/migration"
sctypes "github.com/sei-protocol/sei-chain/sei-db/state_db/sc/types"
evmtypes "github.com/sei-protocol/sei-chain/x/evm/types"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -449,3 +451,64 @@ func TestRootMultiMigrateEVM_DoubleFlushAppHashStable(t *testing.T) {
blockIdx, cid.Version)
}
}

// applyCountingCommitter wraps the SC committer and counts ApplyChangeSets
// calls. It is used to assert that rootmulti's per-version flush memo forwards
// exactly one ApplyChangeSets to the SC store per commit cycle, despite both
// GetWorkingHash() and Commit() calling flush().
type applyCountingCommitter struct {
sctypes.Committer
applyCalls int
}

func (c *applyCountingCommitter) ApplyChangeSets(cs []*proto.NamedChangeSet) error {
c.applyCalls++
return c.Committer.ApplyChangeSets(cs)
}

// TestRootMultiFlushesSCStoreOncePerCommit verifies that the
// GetWorkingHash + Commit double flush results in exactly one
// ApplyChangeSets reaching the underlying SC store per commit cycle.
//
// The first flush (FinalizeBlock's GetWorkingHash) carries the block's
// complete change set; the follow-up flushes inside Commit must be
// suppressed by the flushedVersion memo rather than forwarding an empty
// ApplyChangeSets (which would trip flatKV's one-apply-per-commit guard and,
// in migration modes, drive a second flatKV write). Empty blocks must still
// forward their single (empty) apply so migration modes advance their
// boundary every block.
func TestRootMultiFlushesSCStoreOncePerCommit(t *testing.T) {
for _, tc := range []struct {
name string
cfg seidbconfig.StateCommitConfig
}{
{"migrate_evm", migrateEVMConfig(1024)},
{"evm_migrated", evmMigratedConfig()},
{"memiavl_only", memiavlOnlyConfig()},
} {
t.Run(tc.name, func(t *testing.T) {
dir := t.TempDir()
store, storeKeys := newTestRootMulti(t, dir, tc.cfg)
defer func() { require.NoError(t, store.Close()) }()

spy := &applyCountingCommitter{Committer: store.scStore}
store.scStore = spy
evmData := newEVMTestData(0x42)

// Non-empty block: exactly one apply reaches the SC store.
spy.applyCalls = 0
simulateBlock(t, store, storeKeys, 1, evmData)
require.Equal(t, 1, spy.applyCalls,
"non-empty block must forward exactly one ApplyChangeSets "+
"(the FinalizeBlock flush), not the empty Commit-phase flush")

// Empty block (no caller writes): still exactly one apply so that
// empty blocks continue to advance migration boundaries.
spy.applyCalls = 0
finalizeBlock(t, store)
require.Equal(t, 1, spy.applyCalls,
"empty block must forward exactly one ApplyChangeSets so empty "+
"blocks still advance migration")
})
}
}
30 changes: 29 additions & 1 deletion sei-cosmos/storev2/rootmulti/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@ type Store struct {
ckvStores map[types.StoreKey]types.CommitKVStore
gigaKeys []string

// flushedVersion records the pending version (lastCommitInfo.Version+1)
// whose change sets have already been forwarded to the SC store this
// commit cycle. It makes flush() idempotent per pending version so the
// SC store (and therefore flatKV's one-apply-per-commit guard, plus any
// migration advancement) sees exactly one ApplyChangeSets per commit,
// even though both GetWorkingHash() and Commit() call flush(). Reset to
// -1 on (re)load and rollback so the next flush always proceeds.
flushedVersion int64

histProofSem chan struct{}
histProofLimiter *rate.Limiter

Expand Down Expand Up @@ -113,6 +122,7 @@ func NewStore(
gigaKeys: gigaKeys,
histProofSem: make(chan struct{}, maxInFlight),
histProofLimiter: limiter,
flushedVersion: -1,
}
if ssConfig.Enable {
ssStore, err := ss.NewStateStore(homeDir, ssConfig)
Expand Down Expand Up @@ -182,6 +192,14 @@ func (rs *Store) Commit(bumpVersion bool) types.CommitID {
func (rs *Store) flush() error {
var changeSets []*proto.NamedChangeSet
currentVersion := rs.lastCommitInfo.Version + 1
// Forward to the SC store at most once per pending version. Both
// GetWorkingHash() (at FinalizeBlock) and Commit() call flush(); the
// first one carries the block's complete change set and the rest would
// otherwise forward an empty ApplyChangeSets, which trips flatKV's
// one-apply-per-commit guard and would double-drive migration.
if rs.flushedVersion == currentVersion {
return nil
}
for key := range rs.ckvStores {
// it'll unwrap the inter-block cache
store := rs.GetCommitKVStore(key)
Expand Down Expand Up @@ -218,7 +236,11 @@ func (rs *Store) flush() error {
telemetry.SetGauge(float32(currentVersion), "storeV2", "ss", "version")
}
}
return rs.scStore.ApplyChangeSets(changeSets)
if err := rs.scStore.ApplyChangeSets(changeSets); err != nil {
return fmt.Errorf("failed to apply change sets: %w", err)
}
rs.flushedVersion = currentVersion
return nil
}

func (rs *Store) Close() error {
Expand Down Expand Up @@ -536,6 +558,9 @@ func (rs *Store) LoadVersionAndUpgrade(version int64, upgrades *types.StoreUpgra
rs.mtx.Lock()
defer rs.mtx.Unlock()
rs.ckvStores = newStores
// A reload changes the pending version baseline; clear the per-version
// flush memo so the next flush re-applies rather than being skipped.
rs.flushedVersion = -1
// to keep the root hash compatible with cosmos-sdk 0.46
if rs.scStore.Version() != 0 {
rs.lastCommitInfo = convertCommitInfo(rs.scStore.LastCommitInfo())
Expand Down Expand Up @@ -608,6 +633,9 @@ func (rs *Store) RollbackToVersion(target int64) error {
if err != nil {
return err
}
// Rollback moves the pending version baseline; clear the per-version
// flush memo so the next flush re-applies rather than being skipped.
rs.flushedVersion = -1
// We need to update the lastCommitInfo after rollback
if rs.scStore.Version() != 0 {
fmt.Printf("Rolled back CMS to version %d\n", rs.scStore.Version())
Expand Down
69 changes: 0 additions & 69 deletions sei-db/state_db/sc/composite/store_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,75 +297,6 @@ func reopenInMigrateEVM(t *testing.T, dir string, batch int) *CompositeCommitSto
return cs
}

func TestComposite_MigrateEVM_SecondNonEmptyFlushDoesNotAdvanceMigration(t *testing.T) {
dir := t.TempDir()
key1 := evmStorageTestKey(0x01)
key2 := evmStorageTestKey(0x02)

memCfg := config.DefaultStateCommitConfig()
memCfg.WriteMode = config.MemiavlOnly
memCfg.MemIAVLConfig.AsyncCommitBuffer = 0
cs, err := NewCompositeCommitStore(t.Context(), dir, memCfg)
require.NoError(t, err)
require.NoError(t, cs.Initialize([]string{keys.BankStoreKey, keys.EVMStoreKey}))
_, err = cs.LoadVersion(0, false)
require.NoError(t, err)

require.NoError(t, cs.ApplyChangeSets([]*proto.NamedChangeSet{
{Name: keys.EVMStoreKey, Changeset: proto.ChangeSet{Pairs: []*proto.KVPair{
{Key: key1, Value: evmStorageTestValue(0x11)},
{Key: key2, Value: evmStorageTestValue(0x22)},
}}},
}))
_, err = cs.Commit()
require.NoError(t, err)
require.NoError(t, cs.Close())

cs = reopenInMigrateEVM(t, dir, 1)
defer func() { _ = cs.Close() }()

require.NoError(t, cs.ApplyChangeSets([]*proto.NamedChangeSet{
{Name: keys.EVMStoreKey, Changeset: proto.ChangeSet{Pairs: []*proto.KVPair{
{Key: evmStorageTestKey(0x03), Value: evmStorageTestValue(0x33)},
}}},
}))
require.NoError(t, cs.ApplyChangeSets([]*proto.NamedChangeSet{
{Name: keys.EVMStoreKey, Changeset: proto.ChangeSet{Pairs: []*proto.KVPair{
{Key: evmStorageTestKey(0x04), Value: evmStorageTestValue(0x44)},
}}},
}))

boundaryBytes, ok := cs.flatKV.Get(migration.MigrationStore, []byte(migration.MigrationBoundaryKey))
require.True(t, ok)
boundary, err := migration.DeserializeMigrationBoundary(boundaryBytes)
require.NoError(t, err)
require.True(t, boundary.Equals(migration.NewMigrationBoundary(keys.EVMStoreKey, key1)),
"second non-empty ApplyChangeSets in the same block must not migrate key2")

_, ok = cs.flatKV.Get(keys.EVMStoreKey, key2)
require.False(t, ok, "key2 should remain unmigrated until the next block")
}

func evmStorageTestKey(seed byte) []byte {
addr := make([]byte, keys.AddressLen)
slot := make([]byte, 32)
for i := range addr {
addr[i] = seed
}
for i := range slot {
slot[i] = seed
}
return keys.BuildEVMKey(keys.EVMKeyStorage, append(addr, slot...))
}

func evmStorageTestValue(seed byte) []byte {
value := make([]byte, 32)
for i := range value {
value[i] = seed
}
return value
}

// runUntilMigrationComplete drives the workload through commits until
// the flatkv migration-version key reaches Version1_MigrateEVM. Fails
// the test if completion takes more than maxBlocks (guards against a
Expand Down
Loading
Loading