diff --git a/sei-cosmos/storev2/rootmulti/flatkv_migration_test.go b/sei-cosmos/storev2/rootmulti/flatkv_migration_test.go index f96ea39443..333fbea7ac 100644 --- a/sei-cosmos/storev2/rootmulti/flatkv_migration_test.go +++ b/sei-cosmos/storev2/rootmulti/flatkv_migration_test.go @@ -20,8 +20,10 @@ import ( "github.com/sei-protocol/sei-chain/sei-cosmos/store/types" "github.com/sei-protocol/sei-chain/sei-db/common/utils" seidbconfig "github.com/sei-protocol/sei-chain/sei-db/config" + "github.com/sei-protocol/sei-chain/sei-db/proto" "github.com/sei-protocol/sei-chain/sei-db/state_db/sc/flatkv" "github.com/sei-protocol/sei-chain/sei-db/state_db/sc/migration" + sctypes "github.com/sei-protocol/sei-chain/sei-db/state_db/sc/types" evmtypes "github.com/sei-protocol/sei-chain/x/evm/types" "github.com/stretchr/testify/require" ) @@ -449,3 +451,64 @@ func TestRootMultiMigrateEVM_DoubleFlushAppHashStable(t *testing.T) { blockIdx, cid.Version) } } + +// applyCountingCommitter wraps the SC committer and counts ApplyChangeSets +// calls. It is used to assert that rootmulti's per-version flush memo forwards +// exactly one ApplyChangeSets to the SC store per commit cycle, despite both +// GetWorkingHash() and Commit() calling flush(). +type applyCountingCommitter struct { + sctypes.Committer + applyCalls int +} + +func (c *applyCountingCommitter) ApplyChangeSets(cs []*proto.NamedChangeSet) error { + c.applyCalls++ + return c.Committer.ApplyChangeSets(cs) +} + +// TestRootMultiFlushesSCStoreOncePerCommit verifies that the +// GetWorkingHash + Commit double flush results in exactly one +// ApplyChangeSets reaching the underlying SC store per commit cycle. +// +// The first flush (FinalizeBlock's GetWorkingHash) carries the block's +// complete change set; the follow-up flushes inside Commit must be +// suppressed by the flushedVersion memo rather than forwarding an empty +// ApplyChangeSets (which would trip flatKV's one-apply-per-commit guard and, +// in migration modes, drive a second flatKV write). Empty blocks must still +// forward their single (empty) apply so migration modes advance their +// boundary every block. +func TestRootMultiFlushesSCStoreOncePerCommit(t *testing.T) { + for _, tc := range []struct { + name string + cfg seidbconfig.StateCommitConfig + }{ + {"migrate_evm", migrateEVMConfig(1024)}, + {"evm_migrated", evmMigratedConfig()}, + {"memiavl_only", memiavlOnlyConfig()}, + } { + t.Run(tc.name, func(t *testing.T) { + dir := t.TempDir() + store, storeKeys := newTestRootMulti(t, dir, tc.cfg) + defer func() { require.NoError(t, store.Close()) }() + + spy := &applyCountingCommitter{Committer: store.scStore} + store.scStore = spy + evmData := newEVMTestData(0x42) + + // Non-empty block: exactly one apply reaches the SC store. + spy.applyCalls = 0 + simulateBlock(t, store, storeKeys, 1, evmData) + require.Equal(t, 1, spy.applyCalls, + "non-empty block must forward exactly one ApplyChangeSets "+ + "(the FinalizeBlock flush), not the empty Commit-phase flush") + + // Empty block (no caller writes): still exactly one apply so that + // empty blocks continue to advance migration boundaries. + spy.applyCalls = 0 + finalizeBlock(t, store) + require.Equal(t, 1, spy.applyCalls, + "empty block must forward exactly one ApplyChangeSets so empty "+ + "blocks still advance migration") + }) + } +} diff --git a/sei-cosmos/storev2/rootmulti/store.go b/sei-cosmos/storev2/rootmulti/store.go index 6284d4def7..e15c89f71b 100644 --- a/sei-cosmos/storev2/rootmulti/store.go +++ b/sei-cosmos/storev2/rootmulti/store.go @@ -57,6 +57,15 @@ type Store struct { ckvStores map[types.StoreKey]types.CommitKVStore gigaKeys []string + // flushedVersion records the pending version (lastCommitInfo.Version+1) + // whose change sets have already been forwarded to the SC store this + // commit cycle. It makes flush() idempotent per pending version so the + // SC store (and therefore flatKV's one-apply-per-commit guard, plus any + // migration advancement) sees exactly one ApplyChangeSets per commit, + // even though both GetWorkingHash() and Commit() call flush(). Reset to + // -1 on (re)load and rollback so the next flush always proceeds. + flushedVersion int64 + histProofSem chan struct{} histProofLimiter *rate.Limiter @@ -113,6 +122,7 @@ func NewStore( gigaKeys: gigaKeys, histProofSem: make(chan struct{}, maxInFlight), histProofLimiter: limiter, + flushedVersion: -1, } if ssConfig.Enable { ssStore, err := ss.NewStateStore(homeDir, ssConfig) @@ -182,6 +192,14 @@ func (rs *Store) Commit(bumpVersion bool) types.CommitID { func (rs *Store) flush() error { var changeSets []*proto.NamedChangeSet currentVersion := rs.lastCommitInfo.Version + 1 + // Forward to the SC store at most once per pending version. Both + // GetWorkingHash() (at FinalizeBlock) and Commit() call flush(); the + // first one carries the block's complete change set and the rest would + // otherwise forward an empty ApplyChangeSets, which trips flatKV's + // one-apply-per-commit guard and would double-drive migration. + if rs.flushedVersion == currentVersion { + return nil + } for key := range rs.ckvStores { // it'll unwrap the inter-block cache store := rs.GetCommitKVStore(key) @@ -218,7 +236,11 @@ func (rs *Store) flush() error { telemetry.SetGauge(float32(currentVersion), "storeV2", "ss", "version") } } - return rs.scStore.ApplyChangeSets(changeSets) + if err := rs.scStore.ApplyChangeSets(changeSets); err != nil { + return fmt.Errorf("failed to apply change sets: %w", err) + } + rs.flushedVersion = currentVersion + return nil } func (rs *Store) Close() error { @@ -536,6 +558,9 @@ func (rs *Store) LoadVersionAndUpgrade(version int64, upgrades *types.StoreUpgra rs.mtx.Lock() defer rs.mtx.Unlock() rs.ckvStores = newStores + // A reload changes the pending version baseline; clear the per-version + // flush memo so the next flush re-applies rather than being skipped. + rs.flushedVersion = -1 // to keep the root hash compatible with cosmos-sdk 0.46 if rs.scStore.Version() != 0 { rs.lastCommitInfo = convertCommitInfo(rs.scStore.LastCommitInfo()) @@ -608,6 +633,9 @@ func (rs *Store) RollbackToVersion(target int64) error { if err != nil { return err } + // Rollback moves the pending version baseline; clear the per-version + // flush memo so the next flush re-applies rather than being skipped. + rs.flushedVersion = -1 // We need to update the lastCommitInfo after rollback if rs.scStore.Version() != 0 { fmt.Printf("Rolled back CMS to version %d\n", rs.scStore.Version()) diff --git a/sei-db/state_db/sc/composite/store_migration_test.go b/sei-db/state_db/sc/composite/store_migration_test.go index 4a647281c5..416ed3aa46 100644 --- a/sei-db/state_db/sc/composite/store_migration_test.go +++ b/sei-db/state_db/sc/composite/store_migration_test.go @@ -297,75 +297,6 @@ func reopenInMigrateEVM(t *testing.T, dir string, batch int) *CompositeCommitSto return cs } -func TestComposite_MigrateEVM_SecondNonEmptyFlushDoesNotAdvanceMigration(t *testing.T) { - dir := t.TempDir() - key1 := evmStorageTestKey(0x01) - key2 := evmStorageTestKey(0x02) - - memCfg := config.DefaultStateCommitConfig() - memCfg.WriteMode = config.MemiavlOnly - memCfg.MemIAVLConfig.AsyncCommitBuffer = 0 - cs, err := NewCompositeCommitStore(t.Context(), dir, memCfg) - require.NoError(t, err) - require.NoError(t, cs.Initialize([]string{keys.BankStoreKey, keys.EVMStoreKey})) - _, err = cs.LoadVersion(0, false) - require.NoError(t, err) - - require.NoError(t, cs.ApplyChangeSets([]*proto.NamedChangeSet{ - {Name: keys.EVMStoreKey, Changeset: proto.ChangeSet{Pairs: []*proto.KVPair{ - {Key: key1, Value: evmStorageTestValue(0x11)}, - {Key: key2, Value: evmStorageTestValue(0x22)}, - }}}, - })) - _, err = cs.Commit() - require.NoError(t, err) - require.NoError(t, cs.Close()) - - cs = reopenInMigrateEVM(t, dir, 1) - defer func() { _ = cs.Close() }() - - require.NoError(t, cs.ApplyChangeSets([]*proto.NamedChangeSet{ - {Name: keys.EVMStoreKey, Changeset: proto.ChangeSet{Pairs: []*proto.KVPair{ - {Key: evmStorageTestKey(0x03), Value: evmStorageTestValue(0x33)}, - }}}, - })) - require.NoError(t, cs.ApplyChangeSets([]*proto.NamedChangeSet{ - {Name: keys.EVMStoreKey, Changeset: proto.ChangeSet{Pairs: []*proto.KVPair{ - {Key: evmStorageTestKey(0x04), Value: evmStorageTestValue(0x44)}, - }}}, - })) - - boundaryBytes, ok := cs.flatKV.Get(migration.MigrationStore, []byte(migration.MigrationBoundaryKey)) - require.True(t, ok) - boundary, err := migration.DeserializeMigrationBoundary(boundaryBytes) - require.NoError(t, err) - require.True(t, boundary.Equals(migration.NewMigrationBoundary(keys.EVMStoreKey, key1)), - "second non-empty ApplyChangeSets in the same block must not migrate key2") - - _, ok = cs.flatKV.Get(keys.EVMStoreKey, key2) - require.False(t, ok, "key2 should remain unmigrated until the next block") -} - -func evmStorageTestKey(seed byte) []byte { - addr := make([]byte, keys.AddressLen) - slot := make([]byte, 32) - for i := range addr { - addr[i] = seed - } - for i := range slot { - slot[i] = seed - } - return keys.BuildEVMKey(keys.EVMKeyStorage, append(addr, slot...)) -} - -func evmStorageTestValue(seed byte) []byte { - value := make([]byte, 32) - for i := range value { - value[i] = seed - } - return value -} - // runUntilMigrationComplete drives the workload through commits until // the flatkv migration-version key reaches Version1_MigrateEVM. Fails // the test if completion takes more than maxBlocks (guards against a diff --git a/sei-db/state_db/sc/flatkv/lthash_correctness_test.go b/sei-db/state_db/sc/flatkv/lthash_correctness_test.go index 0aa2a98aaf..fdcd363e82 100644 --- a/sei-db/state_db/sc/flatkv/lthash_correctness_test.go +++ b/sei-db/state_db/sc/flatkv/lthash_correctness_test.go @@ -2,7 +2,6 @@ package flatkv import ( "bytes" - "encoding/binary" "errors" "fmt" @@ -143,21 +142,17 @@ func TestLtHashIncrementalEqualsFullScan(t *testing.T) { } verifyLtHashAtHeight(t, s, 60) - // ── Blocks 61-70: Multiple ApplyChangeSets per block (Bug 2) ──── - // Same account is modified in two separate ApplyChangeSets calls - // within a single block. + // ── Blocks 61-70: Nonce + codehash for the same account in one block ─ + // A block's changes for an account must be coalesced into a single + // ApplyChangeSets call (the one-apply-per-commit contract). for i := 61; i <= 70; i++ { addr := addrN(byte(i - 60)) - - // First call: update nonce - cs1 := namedCS(noncePair(addr, uint64(i*1000))) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs1})) - - // Second call: update codehash (same account, same block) ch := codeHashN(byte(i + 100)) - cs2 := namedCS(codeHashPair(addr, ch)) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs2})) - + cs := namedCS( + noncePair(addr, uint64(i*1000)), + codeHashPair(addr, ch), + ) + require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs})) commitAndCheck(t, s) } verifyLtHashAtHeight(t, s, 70) @@ -205,20 +200,17 @@ func TestLtHashIncrementalEqualsFullScan(t *testing.T) { } verifyLtHashAtHeight(t, s, 90) - // ── Blocks 91-95: Triple ApplyChangeSets per block ────────────── - // Account gets nonce in call 1, codehash in call 2, storage in call 3. + // ── Blocks 91-95: Nonce + codehash + storage for an account in one block ─ + // All of a block's changes for the account are coalesced into a single + // ApplyChangeSets call. for i := 91; i <= 95; i++ { addr := addrN(byte(i - 90)) - - cs1 := namedCS(noncePair(addr, uint64(i*77777))) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs1})) - - cs2 := namedCS(codeHashPair(addr, codeHashN(byte(i+200)))) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs2})) - - cs3 := namedCS(storagePair(addr, slotN(byte(i+200)), []byte{byte(i)})) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs3})) - + cs := namedCS( + noncePair(addr, uint64(i*77777)), + codeHashPair(addr, codeHashN(byte(i+200))), + storagePair(addr, slotN(byte(i+200)), []byte{byte(i)}), + ) + require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs})) commitAndCheck(t, s) } @@ -244,11 +236,12 @@ func TestLtHashIncrementalEqualsFullScan(t *testing.T) { })) commitAndCheck(t, s) - // Block 99: update addr 20 nonce + delete its storage in separate calls - cs99a := namedCS(noncePair(addr20, 2)) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs99a})) - cs99b := namedCS(storageDeletePair(addr20, slotN(1))) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs99b})) + // Block 99: update addr 20 nonce + delete its storage in one changeset + cs99 := namedCS( + noncePair(addr20, 2), + storageDeletePair(addr20, slotN(1)), + ) + require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs99})) commitAndCheck(t, s) // Block 100: big mixed batch @@ -290,33 +283,6 @@ func TestLtHashNewAccountNoPhantomMixOut(t *testing.T) { verifyLtHashAtHeight(t, s, 1) } -// TestLtHashMultiApplyPerBlock is a focused regression test for Bug 2: -// calling ApplyChangeSets twice for the same account in one block must not -// double-MixOut the committed DB value. -func TestLtHashMultiApplyPerBlock(t *testing.T) { - s := setupTestStore(t) - defer s.Close() - - addr := addrN(0x77) - - // Block 1: create account - cs := namedCS(noncePair(addr, 1)) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs})) - commitAndCheck(t, s) - verifyLtHashAtHeight(t, s, 1) - - // Block 2: two separate ApplyChangeSets for same account - cs1 := namedCS(noncePair(addr, 10)) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs1})) - - ch := codeHashN(0xAB) - cs2 := namedCS(codeHashPair(addr, ch)) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs2})) - - commitAndCheck(t, s) - verifyLtHashAtHeight(t, s, 2) -} - // TestLtHashStorageAddUpdateDelete verifies that storage operations // (add → update → delete → re-add) produce correct LtHash at each step. func TestLtHashStorageAddUpdateDelete(t *testing.T) { @@ -612,227 +578,6 @@ func TestFullScanLtHashIncludesLegacy(t *testing.T) { "full scan including legacyDB should match incremental LtHash") } -// ============================================================================= -// Cross-ApplyChangeSets Same-Key Overwrite LtHash Verification -// ============================================================================= - -// TestLtHashCrossApplyAccountSameFieldOverwrite verifies that overwriting the -// same account field (nonce→nonce) across two ApplyChangeSets calls in the same -// block produces a correct LtHash. This is distinct from write→delete→write; -// here the second call simply overwrites the pending value. -func TestLtHashCrossApplyAccountSameFieldOverwrite(t *testing.T) { - s := setupTestStore(t) - defer s.Close() - - addr := addrN(0x50) - - // Block 1: create account with nonce=1 - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{ - namedCS(noncePair(addr, 1)), - })) - commitAndCheck(t, s) - verifyLtHashAtHeight(t, s, 1) - - // Block 2: two ApplyChangeSets overwriting the same nonce field - cs1 := namedCS(noncePair(addr, 10)) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs1})) - - cs2 := namedCS(noncePair(addr, 20)) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs2})) - - commitAndCheck(t, s) - verifyLtHashAtHeight(t, s, 2) - - // Verify final value - key := keys.BuildEVMKey(keys.EVMKeyNonce, addr[:]) - val, found := s.Get(keys.EVMStoreKey, key) - require.True(t, found) - require.Equal(t, uint64(20), binary.BigEndian.Uint64(val)) -} - -// TestLtHashCrossApplyStorageOverwrite verifies that overwriting the same -// storage key across two ApplyChangeSets calls in the same block produces -// a correct LtHash (full-scan verified). -func TestLtHashCrossApplyStorageOverwrite(t *testing.T) { - s := setupTestStore(t) - defer s.Close() - - addr := addrN(0x51) - slot := slotN(0x01) - - // Block 1: create storage entry - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{ - namedCS(storagePair(addr, slot, []byte{0x11})), - })) - commitAndCheck(t, s) - verifyLtHashAtHeight(t, s, 1) - - // Block 2: overwrite same key in two separate ApplyChangeSets calls - cs1 := namedCS(storagePair(addr, slot, []byte{0x22})) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs1})) - - cs2 := namedCS(storagePair(addr, slot, []byte{0x33})) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs2})) - - commitAndCheck(t, s) - verifyLtHashAtHeight(t, s, 2) - - // Verify final value - key := keys.BuildEVMKey(keys.EVMKeyStorage, ktype.StorageKey(addr, slot)) - val, found := s.Get(keys.EVMStoreKey, key) - require.True(t, found) - require.Equal(t, padLeft32(0x33), val) -} - -// TestLtHashCrossApplyCodeOverwrite verifies that overwriting the same code -// key across two ApplyChangeSets calls in the same block produces a correct -// LtHash (full-scan verified). -func TestLtHashCrossApplyCodeOverwrite(t *testing.T) { - s := setupTestStore(t) - defer s.Close() - - addr := addrN(0x52) - - // Block 1: deploy code - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{ - namedCS( - noncePair(addr, 1), - codePair(addr, []byte{0x60, 0x80}), - codeHashPair(addr, codeHashN(0xAA)), - ), - })) - commitAndCheck(t, s) - verifyLtHashAtHeight(t, s, 1) - - // Block 2: overwrite code in two separate ApplyChangeSets calls - cs1 := namedCS(codePair(addr, []byte{0x60, 0x40, 0x01})) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs1})) - - cs2 := namedCS(codePair(addr, []byte{0x60, 0x40, 0x02, 0x03})) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs2})) - - commitAndCheck(t, s) - verifyLtHashAtHeight(t, s, 2) - - // Verify final value - key := keys.BuildEVMKey(keys.EVMKeyCode, addr[:]) - val, found := s.Get(keys.EVMStoreKey, key) - require.True(t, found) - require.Equal(t, []byte{0x60, 0x40, 0x02, 0x03}, val) -} - -// TestLtHashCrossApplyLegacyOverwrite verifies that overwriting the same -// legacy key across two ApplyChangeSets calls in the same block produces -// a correct LtHash (full-scan verified). -func TestLtHashCrossApplyLegacyOverwrite(t *testing.T) { - s := setupTestStore(t) - defer s.Close() - - addr := addrN(0x53) - legacyKey := append([]byte{0x09}, addr[:]...) - - // Block 1: create legacy entry - cs0 := makeChangeSet(legacyKey, []byte{0x00, 0x10}, false) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs0})) - commitAndCheck(t, s) - verifyLtHashAtHeight(t, s, 1) - - // Block 2: overwrite same legacy key in two separate ApplyChangeSets calls - cs1 := makeChangeSet(legacyKey, []byte{0x00, 0x20}, false) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs1})) - - cs2 := makeChangeSet(legacyKey, []byte{0x00, 0x30}, false) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs2})) - - commitAndCheck(t, s) - verifyLtHashAtHeight(t, s, 2) - - // Verify final value - val, found := s.Get(keys.EVMStoreKey, legacyKey) - require.True(t, found) - require.Equal(t, []byte{0x00, 0x30}, val) -} - -// TestLtHashCrossApplyMixedOverwrite is a comprehensive test that exercises -// cross-Apply overwrites for ALL key types simultaneously in the same block, -// verifying that the incremental LtHash remains correct. -func TestLtHashCrossApplyMixedOverwrite(t *testing.T) { - s := setupTestStore(t) - defer s.Close() - - addr := addrN(0x54) - slot := slotN(0x01) - legacyKey := append([]byte{0x09}, addr[:]...) - - // Block 1: create initial state for all key types - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{ - namedCS( - noncePair(addr, 1), - codeHashPair(addr, codeHashN(0x10)), - codePair(addr, []byte{0x60, 0x80}), - storagePair(addr, slot, []byte{0x11}), - ), - })) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{ - makeChangeSet(legacyKey, []byte{0x00, 0x01}, false), - })) - commitAndCheck(t, s) - verifyLtHashAtHeight(t, s, 1) - - // Block 2: first Apply — update all key types - cs1a := namedCS( - noncePair(addr, 10), - codeHashPair(addr, codeHashN(0x20)), - codePair(addr, []byte{0x60, 0x40}), - storagePair(addr, slot, []byte{0x22}), - ) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs1a})) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{ - makeChangeSet(legacyKey, []byte{0x00, 0x02}, false), - })) - - // Block 2: second Apply — overwrite all key types again - cs2a := namedCS( - noncePair(addr, 100), - codeHashPair(addr, codeHashN(0x30)), - codePair(addr, []byte{0x60, 0x60, 0x01}), - storagePair(addr, slot, []byte{0x33}), - ) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs2a})) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{ - makeChangeSet(legacyKey, []byte{0x00, 0x03}, false), - })) - - commitAndCheck(t, s) - verifyLtHashAtHeight(t, s, 2) - - // Verify all final values - nonceKey := keys.BuildEVMKey(keys.EVMKeyNonce, addr[:]) - nonceVal, found := s.Get(keys.EVMStoreKey, nonceKey) - require.True(t, found) - require.Equal(t, uint64(100), binary.BigEndian.Uint64(nonceVal)) - - chKey := keys.BuildEVMKey(keys.EVMKeyCodeHash, addr[:]) - chVal, found := s.Get(keys.EVMStoreKey, chKey) - require.True(t, found) - expected := codeHashN(0x30) - require.Equal(t, expected[:], chVal) - - codeKey := keys.BuildEVMKey(keys.EVMKeyCode, addr[:]) - codeVal, found := s.Get(keys.EVMStoreKey, codeKey) - require.True(t, found) - require.Equal(t, []byte{0x60, 0x60, 0x01}, codeVal) - - storageKey := keys.BuildEVMKey(keys.EVMKeyStorage, ktype.StorageKey(addr, slot)) - storageVal, found := s.Get(keys.EVMStoreKey, storageKey) - require.True(t, found) - require.Equal(t, padLeft32(0x33), storageVal) - - legacyVal, found := s.Get(keys.EVMStoreKey, legacyKey) - require.True(t, found) - require.Equal(t, []byte{0x00, 0x03}, legacyVal) -} - // ---------- Account Row GC LtHash tests ---------- func TestLtHashAccountRowDelete(t *testing.T) { @@ -857,56 +602,6 @@ func TestLtHashAccountRowDelete(t *testing.T) { require.Error(t, err, "accountDB row should be physically absent") } -// TestLtHashAccountDeleteThenRecreate is the critical cross-apply LtHash -// regression test. It starts with a contract account (72-byte encoding), -// deletes all fields in one ApplyChangeSets (paw.isDelete=true), then -// recreates as a nonce-only EOA (40-byte encoding) in a second -// ApplyChangeSets within the same block. The LtHash baseline for the -// second apply must be nil (row logically gone), not the 72-byte encoding. -func TestLtHashAccountDeleteThenRecreate(t *testing.T) { - s := setupTestStore(t) - defer s.Close() - - addr := addrN(0xD2) - - // Block 1: create contract account (72-byte encoding) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{ - namedCS(noncePair(addr, 10), codeHashPair(addr, codeHashN(0xBB))), - })) - commitAndCheck(t, s) - verifyLtHashAtHeight(t, s, 1) - - // Block 2, apply 1: delete nonce + codehash → paw.isDelete = true - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{ - namedCS(nonceDeletePair(addr), codeHashDeletePair(addr)), - })) - - // Block 2, apply 2: write nonce only → paw.isDelete = false, 40-byte EOA - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{ - namedCS(noncePair(addr, 99)), - })) - - commitAndCheck(t, s) - verifyLtHashAtHeight(t, s, 2) - - nonceKey := keys.BuildEVMKey(keys.EVMKeyNonce, addr[:]) - nonceVal, found := s.Get(keys.EVMStoreKey, nonceKey) - require.True(t, found) - require.Equal(t, nonceBytes(99), nonceVal) - - chKey := keys.BuildEVMKey(keys.EVMKeyCodeHash, addr[:]) - _, found = s.Get(keys.EVMStoreKey, chKey) - require.False(t, found, "codehash should be zero (EOA)") - - raw, err := s.accountDB.Get(accountPhysKey(addr)) - require.NoError(t, err) - ad, err := vtype.DeserializeAccountData(raw) - require.NoError(t, err) - require.Equal(t, uint64(99), ad.GetNonce()) - var zeroHash vtype.CodeHash - require.Equal(t, &zeroHash, ad.GetCodeHash(), "codehash should be zero (EOA)") -} - func TestLtHashAccountPartialDeletePreservesRow(t *testing.T) { s := setupTestStore(t) defer s.Close() @@ -934,89 +629,89 @@ func TestLtHashAccountPartialDeletePreservesRow(t *testing.T) { require.Equal(t, &zeroHash, ad.GetCodeHash(), "codehash should be zero after delete") } -// TestAccountPendingReadPartialDelete verifies that the isDelete guard in -// Get() only fires when all fields are zero, not on partial deletes. -func TestAccountPendingReadPartialDelete(t *testing.T) { - s := setupTestStore(t) - defer s.Close() - - addr := addrN(0xD4) - nonceKey := keys.BuildEVMKey(keys.EVMKeyNonce, addr[:]) - chKey := keys.BuildEVMKey(keys.EVMKeyCodeHash, addr[:]) - - // Apply 1: write nonce + codehash (not committed yet) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{ - namedCS(noncePair(addr, 42), codeHashPair(addr, codeHashN(0xDD))), - })) +// TestAccountRowDeletePartialVsFull verifies the isDelete row-GC semantics +// in a single ApplyChangeSets call: a partial delete (codehash only, nonce +// preserved) keeps the row, while deleting all fields marks the row for +// deletion. Pending reads before Commit must reflect the merged state. +func TestAccountRowDeletePartialVsFull(t *testing.T) { + t.Run("partial-delete-preserves-row", func(t *testing.T) { + s := setupTestStore(t) + defer s.Close() + + addr := addrN(0xD4) + nonceKey := keys.BuildEVMKey(keys.EVMKeyNonce, addr[:]) + chKey := keys.BuildEVMKey(keys.EVMKeyCodeHash, addr[:]) + + // Seed nonce + codehash, then within a single block write nonce and + // delete codehash (a partial delete) before committing. + require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{ + namedCS(noncePair(addr, 1), codeHashPair(addr, codeHashN(0xDD))), + })) + commitAndCheck(t, s) - // Apply 2: delete only codehash (not committed yet) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{ - namedCS(codeHashDeletePair(addr)), - })) + require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{ + namedCS(noncePair(addr, 42), codeHashDeletePair(addr)), + })) - // Pending reads before commit - nonceVal, found := s.Get(keys.EVMStoreKey, nonceKey) - require.True(t, found, "nonce should be readable from pending writes") - require.Equal(t, nonceBytes(42), nonceVal) + // Pending reads before commit + nonceVal, found := s.Get(keys.EVMStoreKey, nonceKey) + require.True(t, found, "nonce should be readable from pending writes") + require.Equal(t, nonceBytes(42), nonceVal) - chVal, found := s.Get(keys.EVMStoreKey, chKey) - require.False(t, found, "codehash should be not-found after pending delete") - require.Nil(t, chVal) + chVal, found := s.Get(keys.EVMStoreKey, chKey) + require.False(t, found, "codehash should be not-found after pending delete") + require.Nil(t, chVal) - paw := s.accountWrites[string(accountPhysKey(addr))] - require.NotNil(t, paw) - require.False(t, paw.IsDelete(), "row should NOT be marked for deletion (partial delete)") -} + paw := s.accountWrites[string(accountPhysKey(addr))] + require.NotNil(t, paw) + require.False(t, paw.IsDelete(), "row should NOT be marked for deletion (partial delete)") + }) -// TestAccountRowDeleteGetBeforeCommit verifies the core behavioral change: -// after deleting all account fields within a block, Get() returns (nil, false) -// for both nonce and codehash BEFORE commit. -func TestAccountRowDeleteGetBeforeCommit(t *testing.T) { - s := setupTestStore(t) - defer s.Close() + t.Run("full-delete-marks-row", func(t *testing.T) { + s := setupTestStore(t) + defer s.Close() - addr := addrN(0xD5) - nonceKey := keys.BuildEVMKey(keys.EVMKeyNonce, addr[:]) - chKey := keys.BuildEVMKey(keys.EVMKeyCodeHash, addr[:]) + addr := addrN(0xD5) + nonceKey := keys.BuildEVMKey(keys.EVMKeyNonce, addr[:]) + chKey := keys.BuildEVMKey(keys.EVMKeyCodeHash, addr[:]) - // Write nonce + codehash (not committed yet) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{ - namedCS(noncePair(addr, 10), codeHashPair(addr, codeHashN(0xEE))), - })) + // Seed nonce + codehash in a prior block. + require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{ + namedCS(noncePair(addr, 10), codeHashPair(addr, codeHashN(0xEE))), + })) + commitAndCheck(t, s) - // Verify both fields are readable before commit - nonceVal, found := s.Get(keys.EVMStoreKey, nonceKey) - require.True(t, found, "nonce should be readable from pending writes") - require.Equal(t, nonceBytes(10), nonceVal) + // Verify both fields are readable from the committed store. + nonceVal, found := s.Get(keys.EVMStoreKey, nonceKey) + require.True(t, found, "nonce should be readable") + require.Equal(t, nonceBytes(10), nonceVal) - chVal, found := s.Get(keys.EVMStoreKey, chKey) - require.True(t, found, "codehash should be readable from pending writes") - expected := codeHashN(0xEE) - require.Equal(t, expected[:], chVal) + chVal, found := s.Get(keys.EVMStoreKey, chKey) + require.True(t, found, "codehash should be readable") + expected := codeHashN(0xEE) + require.Equal(t, expected[:], chVal) - // Delete both fields (still before commit) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{ - namedCS(nonceDeletePair(addr), codeHashDeletePair(addr)), - })) + // Delete both fields in a single ApplyChangeSets (still before commit). + require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{ + namedCS(nonceDeletePair(addr), codeHashDeletePair(addr)), + })) - // Verify both fields return not-found BEFORE commit (the core semantic change) - nonceVal, found = s.Get(keys.EVMStoreKey, nonceKey) - require.False(t, found, "nonce should not be found after pending full-delete") - require.Nil(t, nonceVal) + // Verify both fields return not-found BEFORE commit. + nonceVal, found = s.Get(keys.EVMStoreKey, nonceKey) + require.False(t, found, "nonce should not be found after pending full-delete") + require.Nil(t, nonceVal) - chVal, found = s.Get(keys.EVMStoreKey, chKey) - require.False(t, found, "codehash should not be found after pending full-delete") - require.Nil(t, chVal) + chVal, found = s.Get(keys.EVMStoreKey, chKey) + require.False(t, found, "codehash should not be found after pending full-delete") + require.Nil(t, chVal) - hasNonce := s.Has(keys.EVMStoreKey, nonceKey) - require.False(t, hasNonce, "Has(nonce) should be false after pending full-delete") - hasCodeHash := s.Has(keys.EVMStoreKey, chKey) - require.False(t, hasCodeHash, "Has(codehash) should be false after pending full-delete") + require.False(t, s.Has(keys.EVMStoreKey, nonceKey), "Has(nonce) should be false after pending full-delete") + require.False(t, s.Has(keys.EVMStoreKey, chKey), "Has(codehash) should be false after pending full-delete") - // Verify isDelete is set - paw := s.accountWrites[string(accountPhysKey(addr))] - require.NotNil(t, paw) - require.True(t, paw.IsDelete(), "row should be marked for deletion (all fields zero)") + paw := s.accountWrites[string(accountPhysKey(addr))] + require.NotNil(t, paw) + require.True(t, paw.IsDelete(), "row should be marked for deletion (all fields zero)") + }) } // TestLtHashAccountWriteZeroGC verifies that writing a zero value (not a diff --git a/sei-db/state_db/sc/flatkv/store.go b/sei-db/state_db/sc/flatkv/store.go index c59a690ada..661ec4990c 100644 --- a/sei-db/state_db/sc/flatkv/store.go +++ b/sei-db/state_db/sc/flatkv/store.go @@ -130,6 +130,13 @@ type CommitStore struct { // Changes to feed into the WAL at the next commit. pendingChangeSets []*proto.NamedChangeSet + // appliedSinceCommit guards the one-apply-per-commit invariant: at most + // one successful ApplyChangeSets call is permitted per commit cycle. It is + // set when an ApplyChangeSets call completes successfully and cleared by + // clearPendingWrites (called at the end of Commit and after each replayed + // WAL entry in catchup). + appliedSinceCommit bool + lastSnapshotTime time.Time // File lock prevents multiple processes from opening the same DB. diff --git a/sei-db/state_db/sc/flatkv/store_apply.go b/sei-db/state_db/sc/flatkv/store_apply.go index cd8967628f..12245a4825 100644 --- a/sei-db/state_db/sc/flatkv/store_apply.go +++ b/sei-db/state_db/sc/flatkv/store_apply.go @@ -22,6 +22,13 @@ func (s *CommitStore) ApplyChangeSets(changeSets []*proto.NamedChangeSet) (err e return errReadOnly } + // Enforce the one-apply-per-commit invariant: a second ApplyChangeSets + // call without an intervening Commit is a programming error (the caller + // must coalesce all of a block's changes into a single call). + if s.appliedSinceCommit { + return fmt.Errorf("flatkv: ApplyChangeSets called more than once without an intervening Commit") + } + /////////// // Setup // /////////// @@ -133,6 +140,11 @@ func (s *CommitStore) ApplyChangeSets(changeSets []*proto.NamedChangeSet) (err e // Now that we've made it through the batch without errors, we can add the change sets to the pending change sets. s.pendingChangeSets = append(s.pendingChangeSets, changeSets...) + // Mark that a successful apply has occurred in this commit cycle. A + // failed apply (early return above) intentionally does not set this, so + // the caller may retry. Cleared by clearPendingWrites. + s.appliedSinceCommit = true + s.phaseTimer.SetPhase("apply_change_done") logger.Debug("FlatKV ApplyChangeSets complete", "changesets", len(changeSets), diff --git a/sei-db/state_db/sc/flatkv/store_test.go b/sei-db/state_db/sc/flatkv/store_test.go index 7ff3b1191e..862c06c1b7 100644 --- a/sei-db/state_db/sc/flatkv/store_test.go +++ b/sei-db/state_db/sc/flatkv/store_test.go @@ -686,12 +686,19 @@ func TestPersistenceAllKeyTypes(t *testing.T) { nonceKey := keys.BuildEVMKey(keys.EVMKeyNonce, addr[:]) codeKey := keys.BuildEVMKey(keys.EVMKeyCode, addr[:]) - cs := makeChangeSet(storageKey, padLeft32(0x11), false) + // Coalesce all of the block's writes into a single ApplyChangeSets call + // (the one-apply-per-commit contract) so the store commits at version 1. + cs := &proto.NamedChangeSet{ + Name: "evm", + Changeset: proto.ChangeSet{ + Pairs: []*proto.KVPair{ + {Key: storageKey, Value: padLeft32(0x11)}, + {Key: nonceKey, Value: []byte{0, 0, 0, 0, 0, 0, 0, 5}}, + {Key: codeKey, Value: []byte{0x60, 0x80}}, + }, + }, + } require.NoError(t, s1.ApplyChangeSets([]*proto.NamedChangeSet{cs})) - cs2 := makeChangeSet(nonceKey, []byte{0, 0, 0, 0, 0, 0, 0, 5}, false) - require.NoError(t, s1.ApplyChangeSets([]*proto.NamedChangeSet{cs2})) - cs3 := makeChangeSet(codeKey, []byte{0x60, 0x80}, false) - require.NoError(t, s1.ApplyChangeSets([]*proto.NamedChangeSet{cs3})) commitAndCheck(t, s1) hash := s1.RootHash() diff --git a/sei-db/state_db/sc/flatkv/store_write.go b/sei-db/state_db/sc/flatkv/store_write.go index 8d495b6bb2..afcd6691af 100644 --- a/sei-db/state_db/sc/flatkv/store_write.go +++ b/sei-db/state_db/sc/flatkv/store_write.go @@ -141,6 +141,7 @@ func (s *CommitStore) clearPendingWrites() { s.storageWrites = make(map[string]*vtype.StorageData, len(s.storageWrites)) s.legacyWrites = make(map[string]*vtype.LegacyData, len(s.legacyWrites)) s.pendingChangeSets = make([]*proto.NamedChangeSet, 0, len(s.pendingChangeSets)) + s.appliedSinceCommit = false } // commitBatches commits pending writes to their respective DBs atomically. diff --git a/sei-db/state_db/sc/flatkv/store_write_test.go b/sei-db/state_db/sc/flatkv/store_write_test.go index 78b8719ef4..6c7fd52d83 100644 --- a/sei-db/state_db/sc/flatkv/store_write_test.go +++ b/sei-db/state_db/sc/flatkv/store_write_test.go @@ -36,11 +36,11 @@ func TestStoreNonStorageKeys(t *testing.T) { // Write nonce (8 bytes) cs1 := makeChangeSet(nonceKey, []byte{0, 0, 0, 0, 0, 0, 0, 17}, false) require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs1})) + commitAndCheck(t, s) // Write codehash (32 bytes) cs2 := makeChangeSet(codeHashKey, codeHash[:], false) require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs2})) - commitAndCheck(t, s) // Nonce should be found @@ -625,38 +625,6 @@ func TestAutoSnapshotDisabledWhenIntervalZero(t *testing.T) { require.Equal(t, countBefore, countAfter, "no new auto-snapshot when interval=0") } -// ============================================================================= -// Multiple ApplyChangeSets before Commit -// ============================================================================= - -func TestMultipleApplyChangeSetsBeforeCommit(t *testing.T) { - s := setupTestStore(t) - defer s.Close() - - addr := ktype.Address{0xAA} - slot1 := ktype.Slot{0x01} - slot2 := ktype.Slot{0x02} - - key1 := evmStorageKey(addr, slot1) - key2 := evmStorageKey(addr, slot2) - - cs1 := makeChangeSet(key1, padLeft32(0x11), false) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs1})) - - cs2 := makeChangeSet(key2, padLeft32(0x22), false) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs2})) - - commitAndCheck(t, s) - - v1, ok := s.Get(keys.EVMStoreKey, key1) - require.True(t, ok) - require.Equal(t, padLeft32(0x11), v1) - - v2, ok := s.Get(keys.EVMStoreKey, key2) - require.True(t, ok) - require.Equal(t, padLeft32(0x22), v2) -} - func TestMultipleApplyAccountFieldsPreservesOther(t *testing.T) { s := setupTestStore(t) defer s.Close() @@ -936,58 +904,6 @@ func TestDeleteSemanticsCodehashAsymmetry(t *testing.T) { require.Error(t, err, "accountDB row should be physically deleted when all fields are zero") } -// ============================================================================= -// Cross-ApplyChangeSets Ordering (W-P0-5) -// ============================================================================= - -func TestCrossApplyChangeSetsOrdering(t *testing.T) { - t.Run("write-then-delete", func(t *testing.T) { - s := setupTestStore(t) - defer s.Close() - - addr := ktype.Address{0x01} - slot := ktype.Slot{0x01} - - cs1 := namedCS(storagePair(addr, slot, []byte{0xAA})) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs1})) - - cs2 := namedCS(storageDeletePair(addr, slot)) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs2})) - - commitAndCheck(t, s) - - key := keys.BuildEVMKey(keys.EVMKeyStorage, ktype.StorageKey(addr, slot)) - _, found := s.Get(keys.EVMStoreKey, key) - require.False(t, found, "write-then-delete: key should be gone") - }) - - t.Run("delete-then-write", func(t *testing.T) { - s := setupTestStore(t) - defer s.Close() - - addr := ktype.Address{0x02} - slot := ktype.Slot{0x02} - - cs0 := namedCS(storagePair(addr, slot, []byte{0x11})) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs0})) - commitAndCheck(t, s) - - cs1 := namedCS(storageDeletePair(addr, slot)) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs1})) - - cs2 := namedCS(storagePair(addr, slot, []byte{0xBB})) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs2})) - - commitAndCheck(t, s) - - key := keys.BuildEVMKey(keys.EVMKeyStorage, ktype.StorageKey(addr, slot)) - val, found := s.Get(keys.EVMStoreKey, key) - require.True(t, found, "delete-then-write: key should exist") - require.Equal(t, padLeft32(0xBB), val) - }) - -} - // ============================================================================= // Empty Commit WAL Payload Distinction (W-P0-6) // ============================================================================= @@ -1122,100 +1038,46 @@ func TestApplyChangeSetsInvalidCodehashLength(t *testing.T) { } // ============================================================================= -// Cross-ApplyChangeSets Account Field Ordering +// One-Apply-Per-Commit Invariant // ============================================================================= -func TestCrossApplyChangeSetsAccountOrdering(t *testing.T) { - t.Run("nonce-write-then-delete", func(t *testing.T) { - s := setupTestStore(t) - defer s.Close() - - addr := addrN(0x01) - cs1 := namedCS(noncePair(addr, 42)) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs1})) - - cs2 := namedCS(nonceDeletePair(addr)) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs2})) - - commitAndCheck(t, s) - - // With Account Row GC, nonce-only account becomes all-zero → row deleted - key := keys.BuildEVMKey(keys.EVMKeyNonce, addr[:]) - _, found := s.Get(keys.EVMStoreKey, key) - require.False(t, found, "nonce-only account should be deleted after nonce delete") - }) - - t.Run("nonce-delete-then-write", func(t *testing.T) { - s := setupTestStore(t) - defer s.Close() - - addr := addrN(0x02) - cs0 := namedCS(noncePair(addr, 10)) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs0})) - commitAndCheck(t, s) - - cs1 := namedCS(nonceDeletePair(addr)) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs1})) - - cs2 := namedCS(noncePair(addr, 99)) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs2})) - - commitAndCheck(t, s) - - key := keys.BuildEVMKey(keys.EVMKeyNonce, addr[:]) - val, found := s.Get(keys.EVMStoreKey, key) - require.True(t, found) - require.Equal(t, uint64(99), bytesToNonce(val)) - }) - - t.Run("codehash-write-then-delete", func(t *testing.T) { - s := setupTestStore(t) - defer s.Close() - - addr := addrN(0x03) - cs1 := namedCS(codeHashPair(addr, codeHashN(0xFF))) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs1})) +// TestApplyChangeSetsTwiceWithoutCommitFails verifies the one-apply-per-commit +// contract: a second ApplyChangeSets call without an intervening Commit is a +// programming error and must be rejected. Callers must coalesce all of a +// block's changes into a single ApplyChangeSets call. +func TestApplyChangeSetsTwiceWithoutCommitFails(t *testing.T) { + s := setupTestStore(t) + defer s.Close() - cs2 := namedCS(codeHashDeletePair(addr)) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs2})) + addr := ktype.Address{0xAA} + key1 := evmStorageKey(addr, ktype.Slot{0x01}) + key2 := evmStorageKey(addr, ktype.Slot{0x02}) - commitAndCheck(t, s) + // First apply succeeds. + require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{ + makeChangeSet(key1, padLeft32(0x11), false), + })) - key := keys.BuildEVMKey(keys.EVMKeyCodeHash, addr[:]) - _, found := s.Get(keys.EVMStoreKey, key) - require.False(t, found, "codehash-only account: delete → all-zero → row deleted") + // Second apply without a Commit in between is rejected. + err := s.ApplyChangeSets([]*proto.NamedChangeSet{ + makeChangeSet(key2, padLeft32(0x22), false), }) + require.Error(t, err) + require.Contains(t, err.Error(), "more than once without an intervening Commit") - t.Run("codehash-delete-then-write", func(t *testing.T) { - s := setupTestStore(t) - defer s.Close() - - addr := addrN(0x04) - cs0 := namedCS(codeHashPair(addr, codeHashN(0xAA))) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs0})) - commitAndCheck(t, s) - - cs1 := namedCS(codeHashDeletePair(addr)) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs1})) - - cs2 := namedCS(codeHashPair(addr, codeHashN(0xBB))) - require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs2})) - - commitAndCheck(t, s) - - key := keys.BuildEVMKey(keys.EVMKeyCodeHash, addr[:]) - val, found := s.Get(keys.EVMStoreKey, key) - require.True(t, found, "codehash should be restored after delete-then-write") - expected := codeHashN(0xBB) - require.Equal(t, expected[:], val) - }) -} + // After Commit clears the guard, applying again is allowed. + commitAndCheck(t, s) + require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{ + makeChangeSet(key2, padLeft32(0x22), false), + })) + commitAndCheck(t, s) -func bytesToNonce(b []byte) uint64 { - if len(b) != vtype.NonceLen { - return 0 - } - return binary.BigEndian.Uint64(b) + v1, ok := s.Get(keys.EVMStoreKey, key1) + require.True(t, ok) + require.Equal(t, padLeft32(0x11), v1) + v2, ok := s.Get(keys.EVMStoreKey, key2) + require.True(t, ok) + require.Equal(t, padLeft32(0x22), v2) } // ============================================================================= diff --git a/sei-db/state_db/sc/migration/accumulating_writer.go b/sei-db/state_db/sc/migration/accumulating_writer.go new file mode 100644 index 0000000000..cd93a86200 --- /dev/null +++ b/sei-db/state_db/sc/migration/accumulating_writer.go @@ -0,0 +1,64 @@ +package migration + +import "github.com/sei-protocol/sei-chain/sei-db/proto" + +// accumulatingWriter wraps a leaf DBWriter, buffering the change sets from one +// or more Apply calls and forwarding them to the wrapped writer as a single +// ApplyChangeSets batch when Flush is called. +// +// It exists to coalesce the fan-out a single dispatch can produce: when several +// routes (e.g. a direct flatKV route and MigrationManager.newDBWriter) target +// the same backend, they share one accumulatingWriter so the backend observes +// exactly one ApplyChangeSets per dispatch instead of one per route. +// +// The wrapped writer is a leaf backend writer (buildFlatKVWriter / +// buildMemIAVLWriter) that ignores firstBatchInBlock, so Apply ignores it too; +// any migration-boundary semantics have already been consumed upstream by the +// MigrationManager before its physical change set reaches this writer. +// +// Not safe for concurrent use; callers must serialize Apply/Flush (the router +// tree is serialized by threadSafeRouter in migration modes). +type accumulatingWriter struct { + wrapped DBWriter + + // buffered holds the change sets accumulated since the last Flush, in the + // order they were supplied to Apply. + buffered []*proto.NamedChangeSet + + // pending is true once at least one Apply call has occurred since the last + // Flush, even if that call carried no change sets. This lets an empty apply + // still be forwarded on Flush, preserving the "every dispatch forwards once" + // behavior the wrapped leaf writer saw before coalescing. + pending bool +} + +// newAccumulatingWriter wraps writer in an accumulator. +func newAccumulatingWriter(writer DBWriter) *accumulatingWriter { + return &accumulatingWriter{wrapped: writer} +} + +// Apply buffers changesets for a later Flush. It never writes to the wrapped +// writer and so always returns nil; downstream write errors surface from Flush +// instead. Its signature matches DBWriter so it can be used as a route writer. +func (a *accumulatingWriter) Apply(changesets []*proto.NamedChangeSet, _ bool) error { + a.buffered = append(a.buffered, changesets...) + a.pending = true + return nil +} + +// Flush forwards all change sets buffered since the last Flush to the wrapped +// writer as a single ApplyChangeSets call, then resets the buffer. If no Apply +// call has occurred since the last Flush, Flush is a no-op. +// +// The buffer is reset before the downstream call so the writer is left in a +// clean state regardless of the outcome; per the Router contract an +// ApplyChangeSets error is fatal and must not be retried. +func (a *accumulatingWriter) Flush() error { + if !a.pending { + return nil + } + changesets := a.buffered + a.buffered = nil + a.pending = false + return a.wrapped(changesets, false) +} diff --git a/sei-db/state_db/sc/migration/accumulating_writer_test.go b/sei-db/state_db/sc/migration/accumulating_writer_test.go new file mode 100644 index 0000000000..ae541063a6 --- /dev/null +++ b/sei-db/state_db/sc/migration/accumulating_writer_test.go @@ -0,0 +1,104 @@ +package migration + +import ( + "errors" + "testing" + + "github.com/sei-protocol/sei-chain/sei-db/proto" + "github.com/stretchr/testify/require" +) + +// recordedWrite captures a single call made to the wrapped leaf DBWriter. +type recordedWrite struct { + changesets []*proto.NamedChangeSet + firstBatchInBlock bool +} + +// recordingWriter is a leaf DBWriter that records every call and can be +// configured to return an error. +type recordingWriter struct { + calls []recordedWrite + err error +} + +func (w *recordingWriter) write(changesets []*proto.NamedChangeSet, firstBatchInBlock bool) error { + w.calls = append(w.calls, recordedWrite{changesets: changesets, firstBatchInBlock: firstBatchInBlock}) + return w.err +} + +func TestAccumulatingWriter_ApplyDoesNotWriteUntilFlush(t *testing.T) { + leaf := &recordingWriter{} + a := newAccumulatingWriter(leaf.write) + + require.NoError(t, a.Apply([]*proto.NamedChangeSet{namedCS("evm", kv("k", "v"))}, true)) + require.Empty(t, leaf.calls, "Apply must not touch the wrapped writer before Flush") + + require.NoError(t, a.Flush()) + require.Len(t, leaf.calls, 1) + require.Equal(t, []*proto.NamedChangeSet{namedCS("evm", kv("k", "v"))}, leaf.calls[0].changesets) +} + +func TestAccumulatingWriter_CoalescesMultipleAppliesInOrder(t *testing.T) { + leaf := &recordingWriter{} + a := newAccumulatingWriter(leaf.write) + + require.NoError(t, a.Apply([]*proto.NamedChangeSet{namedCS("evm", kv("a", "1"))}, true)) + require.NoError(t, a.Apply([]*proto.NamedChangeSet{namedCS("bank", kv("b", "2")), namedCS("evm", kv("c", "3"))}, false)) + require.NoError(t, a.Flush()) + + require.Len(t, leaf.calls, 1, "all accumulated applies must collapse into one downstream call") + require.Equal(t, []*proto.NamedChangeSet{ + namedCS("evm", kv("a", "1")), + namedCS("bank", kv("b", "2")), + namedCS("evm", kv("c", "3")), + }, leaf.calls[0].changesets, "changesets must be forwarded in accumulation order") +} + +func TestAccumulatingWriter_EmptyApplyStillForwardsOnFlush(t *testing.T) { + leaf := &recordingWriter{} + a := newAccumulatingWriter(leaf.write) + + require.NoError(t, a.Apply(nil, false)) + require.NoError(t, a.Flush()) + + require.Len(t, leaf.calls, 1, "an apply with no changesets must still forward once on Flush") + require.Empty(t, leaf.calls[0].changesets) +} + +func TestAccumulatingWriter_FlushWithoutApplyIsNoOp(t *testing.T) { + leaf := &recordingWriter{} + a := newAccumulatingWriter(leaf.write) + + require.NoError(t, a.Flush()) + require.Empty(t, leaf.calls, "Flush with nothing accumulated must not touch the wrapped writer") +} + +func TestAccumulatingWriter_ResetsBetweenFlushes(t *testing.T) { + leaf := &recordingWriter{} + a := newAccumulatingWriter(leaf.write) + + require.NoError(t, a.Apply([]*proto.NamedChangeSet{namedCS("evm", kv("a", "1"))}, true)) + require.NoError(t, a.Flush()) + + // A no-op Flush in between must not re-emit the first batch. + require.NoError(t, a.Flush()) + require.NoError(t, a.Apply([]*proto.NamedChangeSet{namedCS("evm", kv("b", "2"))}, false)) + require.NoError(t, a.Flush()) + + require.Len(t, leaf.calls, 2) + require.Equal(t, []*proto.NamedChangeSet{namedCS("evm", kv("a", "1"))}, leaf.calls[0].changesets) + require.Equal(t, []*proto.NamedChangeSet{namedCS("evm", kv("b", "2"))}, leaf.calls[1].changesets) +} + +func TestAccumulatingWriter_FlushPropagatesDownstreamError(t *testing.T) { + wantErr := errors.New("downstream boom") + leaf := &recordingWriter{err: wantErr} + a := newAccumulatingWriter(leaf.write) + + require.NoError(t, a.Apply([]*proto.NamedChangeSet{namedCS("evm", kv("k", "v"))}, true)) + require.ErrorIs(t, a.Flush(), wantErr) + + // Buffer is cleared even on error; a subsequent Flush is a clean no-op. + require.NoError(t, a.Flush()) + require.Len(t, leaf.calls, 1, "failed batch must not be retried by a later Flush") +} diff --git a/sei-db/state_db/sc/migration/flushing_router.go b/sei-db/state_db/sc/migration/flushing_router.go new file mode 100644 index 0000000000..537e797055 --- /dev/null +++ b/sei-db/state_db/sc/migration/flushing_router.go @@ -0,0 +1,60 @@ +package migration + +import ( + "errors" + "fmt" + + ics23 "github.com/confio/ics23/go" + "github.com/sei-protocol/sei-chain/sei-db/proto" + db "github.com/tendermint/tm-db" +) + +var _ Router = (*flushingRouter)(nil) + +// flushingRouter wraps an inner Router and, after each successful +// ApplyChangeSets, runs a fixed list of post-apply callbacks. All other Router +// methods delegate straight to the inner router. +type flushingRouter struct { + inner Router + + // afterApply runs in order after a successful inner.ApplyChangeSets. + afterApply []func() error +} + +// newFlushingRouter wraps inner so that afterApply callbacks run, in order, +// after each successful ApplyChangeSets. +func newFlushingRouter(inner Router, afterApply ...func() error) *flushingRouter { + return &flushingRouter{inner: inner, afterApply: afterApply} +} + +// ApplyChangeSets dispatches to the inner router and, on success, runs the +// post-apply callbacks in order. If the inner dispatch fails the callbacks are +// not run: an ApplyChangeSets error is fatal per the Router contract, and the +// accumulators' buffers are discarded with the process on the ensuing shutdown. +func (f *flushingRouter) ApplyChangeSets(changesets []*proto.NamedChangeSet, firstBatchInBlock bool) error { + if err := f.inner.ApplyChangeSets(changesets, firstBatchInBlock); err != nil { + return err + } + collected := make([]error, 0, len(f.afterApply)) + for _, cb := range f.afterApply { + if err := cb(); err != nil { + collected = append(collected, err) + } + } + if err := errors.Join(collected...); err != nil { + return fmt.Errorf("flushingRouter: post-apply flush failed: %w", err) + } + return nil +} + +func (f *flushingRouter) Read(store string, key []byte) ([]byte, bool, error) { + return f.inner.Read(store, key) +} + +func (f *flushingRouter) Iterator(store string, start []byte, end []byte, ascending bool) (db.Iterator, error) { + return f.inner.Iterator(store, start, end, ascending) +} + +func (f *flushingRouter) GetProof(store string, key []byte) (*ics23.CommitmentProof, error) { + return f.inner.GetProof(store, key) +} diff --git a/sei-db/state_db/sc/migration/flushing_router_test.go b/sei-db/state_db/sc/migration/flushing_router_test.go new file mode 100644 index 0000000000..5bc80609a8 --- /dev/null +++ b/sei-db/state_db/sc/migration/flushing_router_test.go @@ -0,0 +1,145 @@ +package migration + +import ( + "errors" + "testing" + + ics23 "github.com/confio/ics23/go" + "github.com/sei-protocol/sei-chain/sei-db/proto" + "github.com/stretchr/testify/require" + dbm "github.com/tendermint/tm-db" +) + +// recordingInnerRouter is a fake Router used to test the flushingRouter +// wrapper: it records ApplyChangeSets calls and read-method dispatch and can be +// configured to fail the apply. +type recordingInnerRouter struct { + applyCount int + applyErr error + lastReadKey []byte + readValue []byte + readFound bool + lastIterStore string + lastProofKey []byte +} + +var _ Router = (*recordingInnerRouter)(nil) + +func (r *recordingInnerRouter) ApplyChangeSets(_ []*proto.NamedChangeSet, _ bool) error { + r.applyCount++ + return r.applyErr +} + +func (r *recordingInnerRouter) Read(store string, key []byte) ([]byte, bool, error) { + r.lastReadKey = key + return r.readValue, r.readFound, nil +} + +func (r *recordingInnerRouter) Iterator(store string, _ []byte, _ []byte, _ bool) (dbm.Iterator, error) { + r.lastIterStore = store + return nil, nil +} + +func (r *recordingInnerRouter) GetProof(_ string, key []byte) (*ics23.CommitmentProof, error) { + r.lastProofKey = key + return nil, nil +} + +func TestFlushingRouter_RunsCallbacksAfterApplyInOrder(t *testing.T) { + inner := &recordingInnerRouter{} + var order []string + f := newFlushingRouter(inner, + func() error { order = append(order, "first"); return nil }, + func() error { order = append(order, "second"); return nil }, + ) + + require.NoError(t, f.ApplyChangeSets(nil, true)) + require.Equal(t, 1, inner.applyCount) + require.Equal(t, []string{"first", "second"}, order, "callbacks must run in order after the inner apply") +} + +func TestFlushingRouter_InnerErrorShortCircuitsCallbacks(t *testing.T) { + wantErr := errors.New("inner boom") + inner := &recordingInnerRouter{applyErr: wantErr} + ran := false + f := newFlushingRouter(inner, func() error { ran = true; return nil }) + + require.ErrorIs(t, f.ApplyChangeSets(nil, true), wantErr) + require.False(t, ran, "callbacks must not run when the inner apply fails") +} + +func TestFlushingRouter_CallbackErrorPropagates(t *testing.T) { + inner := &recordingInnerRouter{} + wantErr := errors.New("flush boom") + secondRan := false + f := newFlushingRouter(inner, + func() error { return wantErr }, + func() error { secondRan = true; return nil }, + ) + + err := f.ApplyChangeSets(nil, true) + require.ErrorIs(t, err, wantErr) + require.True(t, secondRan, "all callbacks run; errors are joined, not short-circuited") +} + +func TestFlushingRouter_ReadMethodsDelegate(t *testing.T) { + inner := &recordingInnerRouter{readValue: []byte("v"), readFound: true} + f := newFlushingRouter(inner) + + val, found, err := f.Read("evm", []byte("k")) + require.NoError(t, err) + require.True(t, found) + require.Equal(t, []byte("v"), val) + require.Equal(t, []byte("k"), inner.lastReadKey) + + _, err = f.Iterator("evm", []byte("s"), []byte("e"), true) + require.NoError(t, err) + require.Equal(t, "evm", inner.lastIterStore) + + _, err = f.GetProof("evm", []byte("pk")) + require.NoError(t, err) + require.Equal(t, []byte("pk"), inner.lastProofKey) +} + +// TestFlushingRouter_CoalescesSharedAccumulator is a direct-topology test that +// proves both-backend coalescing without going through BuildRouter: two routes +// share one accumulating writer per backend, so a single dispatch yields +// exactly one downstream call per backend even though each backend is targeted +// by two routes. +func TestFlushingRouter_CoalescesSharedAccumulator(t *testing.T) { + flatKVLeaf := &recordingWriter{} + memIAVLLeaf := &recordingWriter{} + flatKVAcc := newAccumulatingWriter(flatKVLeaf.write) + memIAVLAcc := newAccumulatingWriter(memIAVLLeaf.write) + + // Two routes target flatKV (evm/, other/) and two target memIAVL (bank/, aux/), + // mirroring the MigrateAllButBank fan-out shape. + evmRoute, err := NewRoute(flushTestStubReader, flatKVAcc.Apply, nil, nil, "evm") + require.NoError(t, err) + otherRoute, err := NewRoute(flushTestStubReader, flatKVAcc.Apply, nil, nil, "other") + require.NoError(t, err) + bankRoute, err := NewRoute(flushTestStubReader, memIAVLAcc.Apply, nil, nil, "bank") + require.NoError(t, err) + auxRoute, err := NewRoute(flushTestStubReader, memIAVLAcc.Apply, nil, nil, "aux") + require.NoError(t, err) + + moduleRouter, err := NewModuleRouter(evmRoute, otherRoute, bankRoute, auxRoute) + require.NoError(t, err) + router := newFlushingRouter(moduleRouter, memIAVLAcc.Flush, flatKVAcc.Flush) + + require.NoError(t, router.ApplyChangeSets([]*proto.NamedChangeSet{ + namedCS("evm", kv("e", "1")), + namedCS("other", kv("o", "2")), + namedCS("bank", kv("b", "3")), + namedCS("aux", kv("a", "4")), + }, true)) + + require.Len(t, flatKVLeaf.calls, 1, "flatKV must be written exactly once per dispatch") + require.Len(t, memIAVLLeaf.calls, 1, "memIAVL must be written exactly once per dispatch") + require.Equal(t, []*proto.NamedChangeSet{namedCS("evm", kv("e", "1")), namedCS("other", kv("o", "2"))}, + flatKVLeaf.calls[0].changesets) + require.Equal(t, []*proto.NamedChangeSet{namedCS("bank", kv("b", "3")), namedCS("aux", kv("a", "4"))}, + memIAVLLeaf.calls[0].changesets) +} + +func flushTestStubReader(string, []byte) ([]byte, bool, error) { return nil, false, nil } diff --git a/sei-db/state_db/sc/migration/router_builder.go b/sei-db/state_db/sc/migration/router_builder.go index 557eda4f98..f22a3312c2 100644 --- a/sei-db/state_db/sc/migration/router_builder.go +++ b/sei-db/state_db/sc/migration/router_builder.go @@ -164,15 +164,18 @@ func buildMigrateEVMRouter( return nil, fmt.Errorf("migrationBatchSize must be greater than 0") } + memIAVLAcc := newAccumulatingWriter(buildMemIAVLWriter(memIAVL)) + flatKVAcc := newAccumulatingWriter(buildFlatKVWriter(flatKV)) + // Manages migration and routing for keys in the evm/ module. migrationManager, err := NewMigrationManager( migrationBatchSize, Version0_MemiavlOnly, Version1_MigrateEVM, buildMemIAVLReader(memIAVL), - buildMemIAVLWriter(memIAVL), + memIAVLAcc.Apply, buildFlatKVReader(flatKV), - buildFlatKVWriter(flatKV), + flatKVAcc.Apply, buildMemIAVLIteratorBuilder(memIAVL), NewMemiavlMigrationIterator(memIAVL.GetDB(), []string{keys.EVMStoreKey}), NewMigrationMetrics(ctx, Version1_MigrateEVM, 10*time.Second), @@ -185,9 +188,15 @@ func buildMigrateEVMRouter( if err != nil { return nil, fmt.Errorf("AllModulesExcept: %w", err) } - nonEVMRoute, err := routeToMemIAVL(memIAVL, nonEVMModules...) + nonEVMRoute, err := NewRoute( + buildMemIAVLReader(memIAVL), + memIAVLAcc.Apply, + buildMemIAVLIteratorBuilder(memIAVL), + buildMemIAVLProofBuilder(memIAVL), + nonEVMModules..., + ) if err != nil { - return nil, fmt.Errorf("routeToMemIAVL: %w", err) + return nil, fmt.Errorf("NewRoute: %w", err) } evmRoute, err := migrationManager.BuildRoute(keys.EVMStoreKey) @@ -200,7 +209,7 @@ func buildMigrateEVMRouter( return nil, fmt.Errorf("NewModuleRouter: %w", err) } - return moduleRouter, nil + return newFlushingRouter(moduleRouter, memIAVLAcc.Flush, flatKVAcc.Flush), nil } /* Data flow: EVMMigrated (1) @@ -235,14 +244,26 @@ func buildEVMMigratedRouter( if err != nil { return nil, fmt.Errorf("AllModulesExcept: %w", err) } - nonEVMRoute, err := routeToMemIAVL(memIAVL, nonEVMModules...) + nonEVMRoute, err := NewRoute( + buildMemIAVLReader(memIAVL), + buildMemIAVLWriter(memIAVL), + buildMemIAVLIteratorBuilder(memIAVL), + buildMemIAVLProofBuilder(memIAVL), + nonEVMModules..., + ) if err != nil { - return nil, fmt.Errorf("routeToMemIAVL: %w", err) + return nil, fmt.Errorf("NewRoute: %w", err) } - evmRoute, err := routeToFlatKV(flatKV, keys.EVMStoreKey) + evmRoute, err := NewRoute( + buildFlatKVReader(flatKV), + buildFlatKVWriter(flatKV), + nil, // iteration not supported by flatkv + nil, // proof building not supported by flatkv + keys.EVMStoreKey, + ) if err != nil { - return nil, fmt.Errorf("routeToFlatKV: %w", err) + return nil, fmt.Errorf("NewRoute: %w", err) } moduleRouter, err := NewModuleRouter(nonEVMRoute, evmRoute) @@ -294,15 +315,18 @@ func buildMigrateAllButBankRouter( return nil, fmt.Errorf("AllModulesExcept: %w", err) } + memIAVLAcc := newAccumulatingWriter(buildMemIAVLWriter(memIAVL)) + flatKVAcc := newAccumulatingWriter(buildFlatKVWriter(flatKV)) + // Manages migration and routing for all keys except evm/ (already migrated) and bank/ (not migrating yet) migrationManager, err := NewMigrationManager( migrationBatchSize, Version1_MigrateEVM, Version2_MigrateAllButBank, buildMemIAVLReader(memIAVL), - buildMemIAVLWriter(memIAVL), + memIAVLAcc.Apply, buildFlatKVReader(flatKV), - buildFlatKVWriter(flatKV), + flatKVAcc.Apply, buildMemIAVLIteratorBuilder(memIAVL), NewMemiavlMigrationIterator(memIAVL.GetDB(), allModulesButEvmAndBank), NewMigrationMetrics(ctx, Version2_MigrateAllButBank, 10*time.Second), @@ -311,14 +335,26 @@ func buildMigrateAllButBankRouter( return nil, fmt.Errorf("NewMigrationManager: %w", err) } - bankRoute, err := routeToMemIAVL(memIAVL, keys.BankStoreKey) + bankRoute, err := NewRoute( + buildMemIAVLReader(memIAVL), + memIAVLAcc.Apply, + buildMemIAVLIteratorBuilder(memIAVL), + buildMemIAVLProofBuilder(memIAVL), + keys.BankStoreKey, + ) if err != nil { - return nil, fmt.Errorf("routeToMemIAVL: %w", err) + return nil, fmt.Errorf("NewRoute: %w", err) } - evmRoute, err := routeToFlatKV(flatKV, keys.EVMStoreKey) + evmRoute, err := NewRoute( + buildFlatKVReader(flatKV), + flatKVAcc.Apply, + nil, // iteration not supported by flatkv + nil, // proof building not supported by flatkv + keys.EVMStoreKey, + ) if err != nil { - return nil, fmt.Errorf("routeToFlatKV: %w", err) + return nil, fmt.Errorf("NewRoute: %w", err) } allOtherModulesRoute, err := migrationManager.BuildRoute(allModulesButEvmAndBank...) @@ -331,7 +367,7 @@ func buildMigrateAllButBankRouter( return nil, fmt.Errorf("NewModuleRouter: %w", err) } - return moduleRouter, nil + return newFlushingRouter(moduleRouter, memIAVLAcc.Flush, flatKVAcc.Flush), nil } /* Data flow: AllMigratedButBank (2) @@ -366,14 +402,30 @@ func buildAllMigratedButBankRouter( if err != nil { return nil, fmt.Errorf("AllModulesExcept: %w", err) } - nonBankRoute, err := routeToFlatKV(flatKV, allButBankModules...) + + // Steady-state mode: each backend is written by exactly one route + // (flatkv for non-bank, memiavl for bank), so there is no fan-out to + // coalesce and no accumulating writer is needed. + nonBankRoute, err := NewRoute( + buildFlatKVReader(flatKV), + buildFlatKVWriter(flatKV), + nil, // iteration not supported by flatkv + nil, // proof building not supported by flatkv + allButBankModules..., + ) if err != nil { - return nil, fmt.Errorf("routeToFlatKV: %w", err) + return nil, fmt.Errorf("NewRoute: %w", err) } - bankRoute, err := routeToMemIAVL(memIAVL, keys.BankStoreKey) + bankRoute, err := NewRoute( + buildMemIAVLReader(memIAVL), + buildMemIAVLWriter(memIAVL), + buildMemIAVLIteratorBuilder(memIAVL), + buildMemIAVLProofBuilder(memIAVL), + keys.BankStoreKey, + ) if err != nil { - return nil, fmt.Errorf("routeToMemIAVL: %w", err) + return nil, fmt.Errorf("NewRoute: %w", err) } moduleRouter, err := NewModuleRouter(nonBankRoute, bankRoute) @@ -424,6 +476,9 @@ func buildMigrateBankRouter( return nil, fmt.Errorf("AllModulesExcept: %w", err) } + memIAVLAcc := newAccumulatingWriter(buildMemIAVLWriter(memIAVL)) + flatKVAcc := newAccumulatingWriter(buildFlatKVWriter(flatKV)) + // Manages migration and routing for keys in the bank/ module (the // final module remaining in memiavl; every other module already // lives in flatkv from prior migrations). @@ -432,9 +487,9 @@ func buildMigrateBankRouter( Version2_MigrateAllButBank, Version3_FlatKVOnly, buildMemIAVLReader(memIAVL), - buildMemIAVLWriter(memIAVL), + memIAVLAcc.Apply, buildFlatKVReader(flatKV), - buildFlatKVWriter(flatKV), + flatKVAcc.Apply, buildMemIAVLIteratorBuilder(memIAVL), NewMemiavlMigrationIterator(memIAVL.GetDB(), []string{keys.BankStoreKey}), NewMigrationMetrics(ctx, Version3_FlatKVOnly, 10*time.Second), @@ -448,9 +503,15 @@ func buildMigrateBankRouter( return nil, fmt.Errorf("BuildRoute: %w", err) } - allOtherModulesRoute, err := routeToFlatKV(flatKV, allButBankModules...) + allOtherModulesRoute, err := NewRoute( + buildFlatKVReader(flatKV), + flatKVAcc.Apply, + nil, // iteration not supported by flatkv + nil, // proof building not supported by flatkv + allButBankModules..., + ) if err != nil { - return nil, fmt.Errorf("routeToFlatKV: %w", err) + return nil, fmt.Errorf("NewRoute: %w", err) } moduleRouter, err := NewModuleRouter(bankRoute, allOtherModulesRoute) @@ -458,7 +519,7 @@ func buildMigrateBankRouter( return nil, fmt.Errorf("NewModuleRouter: %w", err) } - return moduleRouter, nil + return newFlushingRouter(moduleRouter, memIAVLAcc.Flush, flatKVAcc.Flush), nil } /* Data flow: FlatKVOnly (3) @@ -476,6 +537,8 @@ func buildFlatKVOnlyRouter( return nil, fmt.Errorf("flatKV is nil") } + // Steady-state mode: a single route writes flatkv, so there is no + // fan-out to coalesce and no accumulating writer is needed. router, err := NewPassthroughRouter( buildFlatKVReader(flatKV), buildFlatKVWriter(flatKV), @@ -518,16 +581,24 @@ func buildTestOnlyDualWriteRouter( return nil, fmt.Errorf("flatKV is nil") } + memIAVLAcc := newAccumulatingWriter(buildMemIAVLWriter(memIAVL)) + flatKVAcc := newAccumulatingWriter(buildFlatKVWriter(flatKV)) + // Sends evm/ traffic to both memIAVL and flatKV. // Note that a TestOnlyDualWriteRouter ignores module names; it's only job is to duplicate traffic. // The routes given to the dual write router do not specify modules for this reason. - memiavlEvmRoute, err := routeToMemIAVL(memIAVL) + memiavlEvmRoute, err := NewRoute( + buildMemIAVLReader(memIAVL), + memIAVLAcc.Apply, + buildMemIAVLIteratorBuilder(memIAVL), + buildMemIAVLProofBuilder(memIAVL), + ) if err != nil { - return nil, fmt.Errorf("routeToMemIAVL: %w", err) + return nil, fmt.Errorf("NewRoute: %w", err) } dualWriteRouter, err := NewTestOnlyDualWriteRouter( memiavlEvmRoute, - buildFlatKVWriter(flatKV), + flatKVAcc.Apply, ) if err != nil { return nil, fmt.Errorf("NewTestOnlyDualWriteRouter: %w", err) @@ -537,9 +608,15 @@ func buildTestOnlyDualWriteRouter( if err != nil { return nil, fmt.Errorf("AllModulesExcept: %w", err) } - nonEVMRoute, err := routeToMemIAVL(memIAVL, nonEVMModules...) + nonEVMRoute, err := NewRoute( + buildMemIAVLReader(memIAVL), + memIAVLAcc.Apply, + buildMemIAVLIteratorBuilder(memIAVL), + buildMemIAVLProofBuilder(memIAVL), + nonEVMModules..., + ) if err != nil { - return nil, fmt.Errorf("routeToMemIAVL: %w", err) + return nil, fmt.Errorf("NewRoute: %w", err) } evmRoute, err := dualWriteRouter.BuildRoute(keys.EVMStoreKey) @@ -552,7 +629,7 @@ func buildTestOnlyDualWriteRouter( return nil, fmt.Errorf("NewModuleRouter: %w", err) } - return moduleRouter, nil + return newFlushingRouter(moduleRouter, memIAVLAcc.Flush, flatKVAcc.Flush), nil } // Build a function capable of reading data from memiavl. @@ -618,25 +695,3 @@ func buildFlatKVWriter(flatKV flatkv.Store) DBWriter { return nil } } - -// Build a route to a memiavl store for the given module names. -func routeToMemIAVL(memIAVL *memiavl.CommitStore, moduleNames ...string) (*Route, error) { - return NewRoute( - buildMemIAVLReader(memIAVL), - buildMemIAVLWriter(memIAVL), - buildMemIAVLIteratorBuilder(memIAVL), - buildMemIAVLProofBuilder(memIAVL), - moduleNames..., - ) -} - -// Build a route to a flatkv store for the given module names. -func routeToFlatKV(flatKV flatkv.Store, moduleNames ...string) (*Route, error) { - return NewRoute( - buildFlatKVReader(flatKV), - buildFlatKVWriter(flatKV), - nil, // iteration not supported - nil, // proof building not supported - moduleNames..., - ) -} diff --git a/sei-db/state_db/sc/migration/router_builder_test.go b/sei-db/state_db/sc/migration/router_builder_test.go new file mode 100644 index 0000000000..a072c6133c --- /dev/null +++ b/sei-db/state_db/sc/migration/router_builder_test.go @@ -0,0 +1,70 @@ +package migration + +import ( + "testing" + + "github.com/sei-protocol/sei-chain/sei-db/common/keys" + "github.com/sei-protocol/sei-chain/sei-db/common/testutil" + "github.com/sei-protocol/sei-chain/sei-db/config" + "github.com/sei-protocol/sei-chain/sei-db/proto" + "github.com/sei-protocol/sei-chain/sei-db/state_db/sc/flatkv" + "github.com/stretchr/testify/require" +) + +// countingFlatKV decorates a flatkv.Store, counting ApplyChangeSets calls so a +// test can assert how many physical flatKV writes a single dispatch produces. +// All other methods are inherited from the embedded Store. +type countingFlatKV struct { + flatkv.Store + applyCount int +} + +func (c *countingFlatKV) ApplyChangeSets(cs []*proto.NamedChangeSet) error { + c.applyCount++ + return c.Store.ApplyChangeSets(cs) +} + +// TestBuildRouter_FlatKVWrittenOncePerDispatch asserts that a single top-level +// ApplyChangeSets produces exactly one physical flatKV ApplyChangeSets, even in +// the modes where two routes fan out to flatKV (MigrateAllButBank: evm/ direct +// + migration manager; MigrateBank: all-but-bank direct + migration manager). +// EVMMigrated is included as a control (a single flatKV route). +func TestBuildRouter_FlatKVWrittenOncePerDispatch(t *testing.T) { + rng := testutil.NewTestRandom() + + // A module handled by the migration manager / non-evm-non-bank routes. + otherModules, err := keys.AllModulesExcept(keys.EVMStoreKey, keys.BankStoreKey) + require.NoError(t, err) + require.NotEmpty(t, otherModules) + otherModule := otherModules[0] + + cases := []struct { + name string + mode config.WriteMode + }{ + {"MigrateAllButBank", config.MigrateAllButBank}, + {"MigrateBank", config.MigrateBank}, + {"EVMMigrated", config.EVMMigrated}, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + memiavlDB := NewTestMemIAVLCommitStore(t, t.TempDir(), keys.MemIAVLStoreKeys) + flatKVDB := NewTestFlatKVCommitStore(t, t.TempDir()) + counting := &countingFlatKV{Store: flatKVDB} + + router, err := BuildRouter(t.Context(), tc.mode, memiavlDB, counting, 100) + require.NoError(t, err) + + cs := []*proto.NamedChangeSet{ + namedCS(keys.EVMStoreKey, randomEVMKVPair(rng)), + namedCS(keys.BankStoreKey, kv("bk", "bv")), + namedCS(otherModule, kv("ok", "ov")), + } + require.NoError(t, router.ApplyChangeSets(cs, true)) + + require.Equal(t, 1, counting.applyCount, + "a single dispatch must produce exactly one flatKV ApplyChangeSets") + }) + } +}