diff --git a/universalClient/tss/dkls/keygen.go b/universalClient/tss/dkls/keygen.go index 9bf7b2516..476d813f2 100644 --- a/universalClient/tss/dkls/keygen.go +++ b/universalClient/tss/dkls/keygen.go @@ -91,14 +91,6 @@ func (s *keygenSession) Step() ([]Message, bool, error) { break } - // If receiver is self, queue locally for next step - if receiver == s.partyID { - if err := s.InputMessage(msgData); err != nil { - return nil, false, fmt.Errorf("failed to queue local message: %w", err) - } - continue - } - messages = append(messages, Message{ Receiver: receiver, Data: msgData, @@ -109,7 +101,6 @@ func (s *keygenSession) Step() ([]Message, bool, error) { return messages, false, nil } -// InputMessage processes an incoming protocol message. func (s *keygenSession) InputMessage(data []byte) error { buf := make([]byte, len(data)) copy(buf, data) diff --git a/universalClient/tss/dkls/keyrefresh.go b/universalClient/tss/dkls/keyrefresh.go index 615973285..fb87bffcc 100644 --- a/universalClient/tss/dkls/keyrefresh.go +++ b/universalClient/tss/dkls/keyrefresh.go @@ -100,13 +100,6 @@ func (s *keyrefreshSession) Step() ([]Message, bool, error) { break } - if receiver == s.partyID { - if err := s.InputMessage(msgData); err != nil { - return nil, false, fmt.Errorf("failed to queue local message: %w", err) - } - continue - } - messages = append(messages, Message{ Receiver: receiver, Data: msgData, @@ -117,7 +110,6 @@ func (s *keyrefreshSession) Step() ([]Message, bool, error) { return messages, false, nil } -// InputMessage processes an incoming protocol message. func (s *keyrefreshSession) InputMessage(data []byte) error { buf := make([]byte, len(data)) copy(buf, data) diff --git a/universalClient/tss/dkls/quorumchange.go b/universalClient/tss/dkls/quorumchange.go index 90f60537f..c1048d1ef 100644 --- a/universalClient/tss/dkls/quorumchange.go +++ b/universalClient/tss/dkls/quorumchange.go @@ -118,13 +118,6 @@ func (s *quorumchangeSession) Step() ([]Message, bool, error) { break } - if receiver == s.partyID { - if err := s.InputMessage(msgData); err != nil { - return nil, false, fmt.Errorf("failed to queue local message: %w", err) - } - continue - } - messages = append(messages, Message{ Receiver: receiver, Data: msgData, diff --git a/universalClient/tss/dkls/sign.go b/universalClient/tss/dkls/sign.go index d706d6b40..4a656f4c9 100644 --- a/universalClient/tss/dkls/sign.go +++ b/universalClient/tss/dkls/sign.go @@ -125,13 +125,6 @@ func (s *signSession) Step() ([]Message, bool, error) { break } - if receiver == s.partyID { - if err := s.InputMessage(msgData); err != nil { - return nil, false, fmt.Errorf("failed to queue local message: %w", err) - } - continue - } - messages = append(messages, Message{ Receiver: receiver, Data: msgData, diff --git a/universalClient/tss/sessionmanager/sessionmanager.go b/universalClient/tss/sessionmanager/sessionmanager.go index 00c9f7d7b..f93aeb895 100644 --- a/universalClient/tss/sessionmanager/sessionmanager.go +++ b/universalClient/tss/sessionmanager/sessionmanager.go @@ -279,35 +279,27 @@ func (sm *SessionManager) processSessionStep(ctx context.Context, eventID string return fmt.Errorf("session for event %s does not exist", eventID) } - session := state.session - - // Step the session (serialize to prevent concurrent access - DKLS may not be thread-safe) state.stepMu.Lock() - messages, finished, err := session.Step() + messages, finished, err := state.session.Step() state.stepMu.Unlock() if err != nil { return fmt.Errorf("failed to step session %s: %w", eventID, err) } - // Send output messages for _, dklsMsg := range messages { - // Find peerID for receiver partyID peerID, err := sm.coordinator.GetPeerIDFromPartyID(ctx, dklsMsg.Receiver) if err != nil { - sm.logger.Warn(). - Err(err). + sm.logger.Warn().Err(err). Str("receiver_party_id", dklsMsg.Receiver). Msg("failed to get peerID for receiver") continue } - // Create coordinator message coordMsg := coordinator.Message{ - Type: "step", - EventID: eventID, - Payload: dklsMsg.Data, - Participants: nil, // Participants not needed for step messages + Type: "step", + EventID: eventID, + Payload: dklsMsg.Data, } msgBytes, err := json.Marshal(coordMsg) if err != nil { @@ -315,10 +307,9 @@ func (sm *SessionManager) processSessionStep(ctx context.Context, eventID string continue } - // Send message if err := sm.send(ctx, peerID, msgBytes); err != nil { - sm.logger.Warn(). - Err(err). + sm.logger.Warn().Err(err). + Str("event_id", eventID). Str("receiver", dklsMsg.Receiver). Str("peer_id", peerID). Msg("failed to send step message") @@ -331,7 +322,6 @@ func (sm *SessionManager) processSessionStep(ctx context.Context, eventID string Msg("sent step message") } - // If finished, handle result if finished { return sm.handleSessionFinished(ctx, eventID, state) } diff --git a/universalClient/tss/tss.go b/universalClient/tss/tss.go index a22eadc46..b386133e4 100644 --- a/universalClient/tss/tss.go +++ b/universalClient/tss/tss.go @@ -236,27 +236,30 @@ func NewNode(ctx context.Context, cfg Config) (*Node, error) { pushSigner: cfg.PushSigner, stopCh: make(chan struct{}), registeredPeers: make(map[string]bool), - txResolver: txresolver.NewResolver(txresolver.Config{ - EventStore: evtStore, - Chains: cfg.Chains, - PushSigner: cfg.PushSigner, - CheckInterval: sessionExpiryCheckInterval, - Logger: logger, - }), } - // Create broadcaster after node so the closure can capture `node`. + getTSSAddress := func(ctx context.Context) (string, error) { + if node.coordinator == nil { + return "", fmt.Errorf("coordinator not initialized") + } + return node.coordinator.GetTSSAddress(ctx) + } + + node.txResolver = txresolver.NewResolver(txresolver.Config{ + EventStore: evtStore, + Chains: cfg.Chains, + PushSigner: cfg.PushSigner, + CheckInterval: sessionExpiryCheckInterval, + Logger: logger, + GetTSSAddress: getTSSAddress, + }) + node.txBroadcaster = txbroadcaster.NewBroadcaster(txbroadcaster.Config{ EventStore: evtStore, Chains: cfg.Chains, CheckInterval: sessionExpiryCheckInterval, Logger: logger, - GetTSSAddress: func(ctx context.Context) (string, error) { - if node.coordinator == nil { - return "", fmt.Errorf("coordinator not initialized") - } - return node.coordinator.GetTSSAddress(ctx) - }, + GetTSSAddress: getTSSAddress, }) node.expirySweeper = expirysweeper.NewSweeper(expirysweeper.Config{ diff --git a/universalClient/tss/txbroadcaster/broadcaster.go b/universalClient/tss/txbroadcaster/broadcaster.go index d4f3bd592..b5b99c4c1 100644 --- a/universalClient/tss/txbroadcaster/broadcaster.go +++ b/universalClient/tss/txbroadcaster/broadcaster.go @@ -2,52 +2,17 @@ package txbroadcaster import ( "context" - "encoding/hex" "encoding/json" - "fmt" - "math/big" "time" "github.com/rs/zerolog" - uexecutortypes "github.com/pushchain/push-chain-node/x/uexecutor/types" - utsstypes "github.com/pushchain/push-chain-node/x/utss/types" - "github.com/pushchain/push-chain-node/universalClient/chains" - "github.com/pushchain/push-chain-node/universalClient/chains/common" "github.com/pushchain/push-chain-node/universalClient/store" "github.com/pushchain/push-chain-node/universalClient/tss/eventstore" + "github.com/pushchain/push-chain-node/universalClient/tss/txflow" ) -// --------------------------------------------------------------------------- -// Signed event data types -// --------------------------------------------------------------------------- - -// SigningData holds the signing parameters persisted by sessionManager when marking SIGNED. -type SigningData struct { - Signature string `json:"signature"` // hex-encoded 64/65 byte signature - SigningHash string `json:"signing_hash"` // hex-encoded signing hash - Nonce uint64 `json:"nonce"` - TSSFundMigrationAmount *big.Int `json:"tss_fund_migration_amount,omitempty"` -} - -// SignedOutboundData wraps OutboundCreatedEvent with signing data. -type SignedOutboundData struct { - uexecutortypes.OutboundCreatedEvent - SigningData *SigningData `json:"signing_data,omitempty"` -} - -// SignedFundMigrationData wraps FundMigrationInitiatedEventData with signing data. -type SignedFundMigrationData struct { - utsstypes.FundMigrationInitiatedEventData - SigningData *SigningData `json:"signing_data,omitempty"` -} - -// --------------------------------------------------------------------------- -// Broadcaster -// --------------------------------------------------------------------------- - -// Config holds configuration for the broadcaster. type Config struct { EventStore *eventstore.Store Chains *chains.Chains @@ -56,7 +21,6 @@ type Config struct { GetTSSAddress func(ctx context.Context) (string, error) } -// Broadcaster polls SIGNED events and broadcasts them to external chains. type Broadcaster struct { eventStore *eventstore.Store chains *chains.Chains @@ -65,7 +29,6 @@ type Broadcaster struct { getTSSAddress func(ctx context.Context) (string, error) } -// NewBroadcaster creates a new tx broadcaster. func NewBroadcaster(cfg Config) *Broadcaster { interval := cfg.CheckInterval if interval == 0 { @@ -143,7 +106,7 @@ func (b *Broadcaster) broadcastEvent(ctx context.Context, event *store.Event) { // broadcastOutbound parses outbound event data and delegates to chain-specific broadcast. func (b *Broadcaster) broadcastOutbound(ctx context.Context, event *store.Event) { - var data SignedOutboundData + var data txflow.SignedOutboundData if err := json.Unmarshal(event.EventData, &data); err != nil { b.logger.Warn().Err(err).Str("event_id", event.EventID).Msg("failed to parse signed outbound data") return @@ -161,9 +124,9 @@ func (b *Broadcaster) broadcastOutbound(ctx context.Context, event *store.Event) } if b.chains.IsEVMChain(chainID) { - b.broadcastEVM(ctx, event, &data, chainID) + b.broadcastOutboundEVM(ctx, event, &data, chainID) } else { - b.broadcastSVM(ctx, event, &data, chainID) + b.broadcastOutboundSVM(ctx, event, &data, chainID) } } @@ -173,7 +136,7 @@ func (b *Broadcaster) broadcastOutbound(ctx context.Context, event *store.Event) // broadcastFundMigration parses fund migration event data and delegates to chain-specific broadcast. func (b *Broadcaster) broadcastFundMigration(ctx context.Context, event *store.Event) { - var data SignedFundMigrationData + var data txflow.SignedFundMigrationData if err := json.Unmarshal(event.EventData, &data); err != nil { b.logger.Warn().Err(err).Str("event_id", event.EventID).Msg("failed to parse fund migration signed data") return @@ -197,25 +160,6 @@ func (b *Broadcaster) broadcastFundMigration(ctx context.Context, event *store.E // Helpers // --------------------------------------------------------------------------- -// decodeSigningData extracts the UnsignedSigningReq and raw signature bytes from SigningData. -func decodeSigningData(sd *SigningData) (*common.UnsignedSigningReq, []byte, error) { - signingHash, err := hex.DecodeString(sd.SigningHash) - if err != nil { - return nil, nil, fmt.Errorf("failed to decode signing hash: %w", err) - } - - signature, err := hex.DecodeString(sd.Signature) - if err != nil { - return nil, nil, fmt.Errorf("failed to decode signature: %w", err) - } - - return &common.UnsignedSigningReq{ - SigningHash: signingHash, - Nonce: sd.Nonce, - TSSFundMigrationAmount: sd.TSSFundMigrationAmount, - }, signature, nil -} - // markBroadcasted updates the event status to BROADCASTED with the given tx hash. func (b *Broadcaster) markBroadcasted(event *store.Event, chainID, txHash string) { caipTxHash := chainID + ":" + txHash @@ -226,6 +170,9 @@ func (b *Broadcaster) markBroadcasted(event *store.Event, chainID, txHash string b.logger.Warn().Err(err).Str("event_id", event.EventID).Msg("failed to update event to BROADCASTED") return } - b.logger.Info().Str("event_id", event.EventID).Str("tx_hash", txHash).Str("chain", chainID). - Msg("marked BROADCASTED") + b.logger.Info(). + Str("event_id", event.EventID). + Str("type", event.Type). + Str("chain", chainID). + Msg("event marked as BROADCASTED") } diff --git a/universalClient/tss/txbroadcaster/broadcaster_test.go b/universalClient/tss/txbroadcaster/broadcaster_test.go index 7a601df74..ec7c343fe 100644 --- a/universalClient/tss/txbroadcaster/broadcaster_test.go +++ b/universalClient/tss/txbroadcaster/broadcaster_test.go @@ -26,6 +26,7 @@ import ( "github.com/pushchain/push-chain-node/universalClient/config" "github.com/pushchain/push-chain-node/universalClient/store" "github.com/pushchain/push-chain-node/universalClient/tss/eventstore" + "github.com/pushchain/push-chain-node/universalClient/tss/txflow" ) type mockTxBuilder struct{ mock.Mock } @@ -78,9 +79,9 @@ func (m *mockTxBuilder) BroadcastFundMigrationTx(ctx context.Context, req *commo type mockChainClient struct{ builder *mockTxBuilder } -func (m *mockChainClient) Start(context.Context) error { return nil } -func (m *mockChainClient) Stop() error { return nil } -func (m *mockChainClient) IsHealthy() bool { return true } +func (m *mockChainClient) Start(context.Context) error { return nil } +func (m *mockChainClient) Stop() error { return nil } +func (m *mockChainClient) IsHealthy() bool { return true } func (m *mockChainClient) GetTxBuilder() (common.TxBuilder, error) { return m.builder, nil } func setupTestDB(t *testing.T) (*eventstore.Store, *gorm.DB) { @@ -120,7 +121,7 @@ func makeSignedOutboundData(t *testing.T, destChain string, nonce uint64) []byte t.Helper() sig := hex.EncodeToString(make([]byte, 64)) hash := hex.EncodeToString(make([]byte, 32)) - data := SignedOutboundData{ + data := txflow.SignedOutboundData{ OutboundCreatedEvent: uexecutortypes.OutboundCreatedEvent{ TxID: "tx-123", UniversalTxId: "utx-456", @@ -128,7 +129,7 @@ func makeSignedOutboundData(t *testing.T, destChain string, nonce uint64) []byte Recipient: "0xRecipient", Amount: "1000000", }, - SigningData: &SigningData{ + SigningData: &txflow.SigningData{ Signature: sig, SigningHash: hash, Nonce: nonce, @@ -160,7 +161,7 @@ func insertSignedSVMEventWithDeadline(t *testing.T, db *gorm.DB, eventID, destCh t.Helper() sig := hex.EncodeToString(make([]byte, 64)) hash := hex.EncodeToString(make([]byte, 32)) - data := SignedOutboundData{ + data := txflow.SignedOutboundData{ OutboundCreatedEvent: uexecutortypes.OutboundCreatedEvent{ TxID: "tx-123", UniversalTxId: "utx-456", @@ -169,7 +170,7 @@ func insertSignedSVMEventWithDeadline(t *testing.T, db *gorm.DB, eventID, destCh Amount: "1000000", SigningDeadline: deadlineUnix, }, - SigningData: &SigningData{ + SigningData: &txflow.SigningData{ Signature: sig, SigningHash: hash, Nonce: nonce, @@ -248,28 +249,9 @@ func TestEVM_BroadcastSuccess_MarksBroadcasted(t *testing.T) { builder.AssertNotCalled(t, "GetNextNonce", mock.Anything, mock.Anything, mock.Anything) } -func TestEVM_BroadcastFails_NonceConsumedOnRecheck_MarksBroadcasted(t *testing.T) { - // Broadcast fails, but nonce check shows it was consumed (race with another node). - evtStore, db := setupTestDB(t) - builder := &mockTxBuilder{} - client := &mockChainClient{builder: builder} - ch := newTestChains(t, "eip155:1", uregistrytypes.VmType_EVM, client) - - insertSignedEvent(t, db, "ev-1", "eip155:1", 5) - - builder.On("BroadcastOutboundSigningRequest", mock.Anything, mock.Anything, mock.Anything, mock.Anything). - Return("0xfailed", fmt.Errorf("some RPC error")) - builder.On("GetNextNonce", mock.Anything, "0xTSS", true).Return(uint64(6), nil) - - b := newBroadcaster(evtStore, ch, "0xTSS") - b.processSigned(context.Background()) - - ev := getEvent(t, db, "ev-1") - require.Equal(t, store.StatusBroadcasted, ev.Status) -} - -func TestEVM_BroadcastFails_NonceNotConsumed_StaysSigned(t *testing.T) { - // Broadcast fails with no txHash (assembly failure) → stay SIGNED for retry. +func TestEVM_BroadcastAssemblyFails_StaysSigned(t *testing.T) { + // Broadcast returns empty txHash (assembly/encode failure before sending) → + // nonce check is never reached; stay SIGNED for retry. evtStore, db := setupTestDB(t) builder := &mockTxBuilder{} client := &mockChainClient{builder: builder} @@ -335,8 +317,31 @@ func TestEVM_GetTSSAddressNil_UsesEmptyAddress(t *testing.T) { builder.AssertCalled(t, "GetNextNonce", mock.Anything, "", true) } +func TestSVM_DeadlineZero_ClusterConfirmsExpiry_MarksBroadcasted(t *testing.T) { + // Legacy event without a signing deadline. `now > 0` enters the deadline + // branch and any fresh cluster time (>> 0) trips the expiry case → + // BROADCASTED("") for the resolver to REVERT. + evtStore, db := setupTestDB(t) + builder := &mockTxBuilder{} + client := &mockChainClient{builder: builder} + ch := newTestChains(t, "solana:mainnet", uregistrytypes.VmType_SVM, client) + + insertSignedEvent(t, db, "ev-1", "solana:mainnet", 0) + builder.On("IsAlreadyExecuted", mock.Anything, "tx-123").Return(false, time.Now().Unix(), nil) + + b := newBroadcaster(evtStore, ch, "") + b.processSigned(context.Background()) + + ev := getEvent(t, db, "ev-1") + require.Equal(t, store.StatusBroadcasted, ev.Status) + require.Equal(t, "solana:mainnet:", ev.BroadcastedTxHash) + builder.AssertNotCalled(t, "BroadcastOutboundSigningRequest", mock.Anything, mock.Anything, mock.Anything, mock.Anything) +} + func TestSVM_BroadcastSuccess_MarksBroadcasted(t *testing.T) { - // Broadcast succeeds → BROADCASTED with tx hash. + // Broadcast succeeds → BROADCASTED with tx hash. Future deadline keeps the + // broadcaster out of the cluster-time branch (deadline=0 events take the + // legacy hand-off-to-resolver path; tested separately). evtStore, db := setupTestDB(t) builder := &mockTxBuilder{} client := &mockChainClient{builder: builder} @@ -357,12 +362,13 @@ func TestSVM_BroadcastSuccess_MarksBroadcasted(t *testing.T) { func TestSVM_BroadcastFails_PDAExists_MarksBroadcasted(t *testing.T) { // Broadcast fails, but ExecutedTx PDA exists → another relayer processed it → BROADCASTED. + // Future deadline so the broadcaster goes to broadcast attempt (not cluster check). evtStore, db := setupTestDB(t) builder := &mockTxBuilder{} client := &mockChainClient{builder: builder} ch := newTestChains(t, "solana:mainnet", uregistrytypes.VmType_SVM, client) - insertSignedEvent(t, db, "ev-1", "solana:mainnet", 0) + insertSignedSVMEventWithDeadline(t, db, "ev-1", "solana:mainnet", 0, time.Now().Unix()+600) builder.On("BroadcastOutboundSigningRequest", mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return("", fmt.Errorf("tx simulation failed: account already exists")) @@ -484,12 +490,13 @@ func TestSVM_PastLocalDeadline_RPCError_StaysSigned(t *testing.T) { func TestSVM_BroadcastFails_PDACheckFails_StaysSigned(t *testing.T) { // Broadcast fails, PDA check also fails (RPC truly down) → stays SIGNED for retry. + // Future deadline so the broadcaster goes to broadcast attempt (not cluster check). evtStore, db := setupTestDB(t) builder := &mockTxBuilder{} client := &mockChainClient{builder: builder} ch := newTestChains(t, "solana:mainnet", uregistrytypes.VmType_SVM, client) - insertSignedEvent(t, db, "ev-1", "solana:mainnet", 0) + insertSignedSVMEventWithDeadline(t, db, "ev-1", "solana:mainnet", 0, time.Now().Unix()+600) builder.On("BroadcastOutboundSigningRequest", mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return("", fmt.Errorf("RPC timeout")) @@ -581,7 +588,7 @@ func makeSignedFundMigrationDataWithTransfer(t *testing.T, chainID string, nonce t.Helper() sig := hex.EncodeToString(make([]byte, 65)) hash := hex.EncodeToString(make([]byte, 32)) - data := SignedFundMigrationData{ + data := txflow.SignedFundMigrationData{ FundMigrationInitiatedEventData: utsstypes.FundMigrationInitiatedEventData{ MigrationID: 1, OldKeyID: "old-key", @@ -593,7 +600,7 @@ func makeSignedFundMigrationDataWithTransfer(t *testing.T, chainID string, nonce GasLimit: 21100, L1GasFee: "150", }, - SigningData: &SigningData{ + SigningData: &txflow.SigningData{ Signature: sig, SigningHash: hash, Nonce: nonce, diff --git a/universalClient/tss/txbroadcaster/evm.go b/universalClient/tss/txbroadcaster/evm.go index 9279608f8..15da0cdb7 100644 --- a/universalClient/tss/txbroadcaster/evm.go +++ b/universalClient/tss/txbroadcaster/evm.go @@ -7,9 +7,10 @@ import ( "github.com/pushchain/push-chain-node/universalClient/chains/common" "github.com/pushchain/push-chain-node/universalClient/store" "github.com/pushchain/push-chain-node/universalClient/tss/coordinator" + "github.com/pushchain/push-chain-node/universalClient/tss/txflow" ) -// broadcastEVM broadcasts a signed EVM outbound transaction. +// broadcastOutboundEVM broadcasts a signed EVM outbound transaction. // // All validators produce the same signed tx (deterministic TSS output), so the // tx hash is known before broadcasting (computed from the assembled signed tx). @@ -20,21 +21,23 @@ import ( // 3. Error → check finalized nonce on chain: // - nonce consumed (tx landed) → BROADCASTED with tx hash // - nonce NOT consumed → keep SIGNED, retry next tick -func (b *Broadcaster) broadcastEVM(ctx context.Context, event *store.Event, data *SignedOutboundData, chainID string) { +func (b *Broadcaster) broadcastOutboundEVM(ctx context.Context, event *store.Event, data *txflow.SignedOutboundData, chainID string) { + log := b.logger.With().Str("event_id", event.EventID).Str("chain", chainID).Logger() + client, err := b.chains.GetClient(chainID) if err != nil { - b.logger.Warn().Err(err).Str("event_id", event.EventID).Msg("failed to get chain client") + log.Warn().Err(err).Msg("failed to get chain client") return } builder, err := client.GetTxBuilder() if err != nil { - b.logger.Warn().Err(err).Str("event_id", event.EventID).Msg("failed to get tx builder") + log.Warn().Err(err).Msg("failed to get tx builder") return } - signingReq, signature, err := decodeSigningData(data.SigningData) + signingReq, signature, err := txflow.DecodeSigningData(data.SigningData) if err != nil { - b.logger.Warn().Err(err).Str("event_id", event.EventID).Msg("failed to decode signing data") + log.Warn().Err(err).Msg("failed to decode signing data") return } @@ -49,8 +52,7 @@ func (b *Broadcaster) broadcastEVM(ctx context.Context, event *store.Event, data // Broadcast failed — check if the tx landed on chain anyway (another node, or "already known") if txHash == "" { - b.logger.Warn().Err(broadcastErr).Str("event_id", event.EventID).Str("chain", chainID). - Msg("failed to assemble tx, will retry next tick") + log.Warn().Err(broadcastErr).Msg("failed to assemble tx, will retry next tick") return } @@ -59,8 +61,7 @@ func (b *Broadcaster) broadcastEVM(ctx context.Context, event *store.Event, data var addrErr error tssAddress, addrErr = b.getTSSAddress(ctx) if addrErr != nil { - b.logger.Warn().Err(addrErr).Str("event_id", event.EventID). - Msg("failed to get TSS address for nonce check, will retry next tick") + log.Warn().Err(addrErr).Msg("failed to get TSS address for nonce check, will retry next tick") return } } @@ -70,32 +71,34 @@ func (b *Broadcaster) broadcastEVM(ctx context.Context, event *store.Event, data // broadcastFundMigrationEVM broadcasts a signed EVM fund migration transaction. // Same nonce-consumed recovery pattern as outbound, but uses old TSS address for nonce check. -func (b *Broadcaster) broadcastFundMigrationEVM(ctx context.Context, event *store.Event, data *SignedFundMigrationData, chainID string) { +func (b *Broadcaster) broadcastFundMigrationEVM(ctx context.Context, event *store.Event, data *txflow.SignedFundMigrationData, chainID string) { + log := b.logger.With().Str("event_id", event.EventID).Str("chain", chainID).Logger() + oldTSSAddr, err := coordinator.DeriveEVMAddressFromPubkey(data.OldTssPubkey) if err != nil { - b.logger.Warn().Err(err).Str("event_id", event.EventID).Msg("failed to derive old TSS address") + log.Warn().Err(err).Msg("failed to derive old TSS address") return } currentTSSAddr, err := coordinator.DeriveEVMAddressFromPubkey(data.CurrentTssPubkey) if err != nil { - b.logger.Warn().Err(err).Str("event_id", event.EventID).Msg("failed to derive new TSS address") + log.Warn().Err(err).Msg("failed to derive new TSS address") return } client, err := b.chains.GetClient(chainID) if err != nil { - b.logger.Warn().Err(err).Str("event_id", event.EventID).Msg("failed to get chain client") + log.Warn().Err(err).Msg("failed to get chain client") return } builder, err := client.GetTxBuilder() if err != nil { - b.logger.Warn().Err(err).Str("event_id", event.EventID).Msg("failed to get tx builder") + log.Warn().Err(err).Msg("failed to get tx builder") return } - signingReq, signature, err := decodeSigningData(data.SigningData) + signingReq, signature, err := txflow.DecodeSigningData(data.SigningData) if err != nil { - b.logger.Warn().Err(err).Str("event_id", event.EventID).Msg("failed to decode signing data") + log.Warn().Err(err).Msg("failed to decode signing data") return } @@ -121,8 +124,7 @@ func (b *Broadcaster) broadcastFundMigrationEVM(ctx context.Context, event *stor } if txHash == "" { - b.logger.Warn().Err(broadcastErr).Str("event_id", event.EventID).Str("chain", chainID). - Msg("failed to assemble fund migration tx, will retry next tick") + log.Warn().Err(broadcastErr).Msg("failed to assemble fund migration tx, will retry next tick") return } @@ -130,8 +132,9 @@ func (b *Broadcaster) broadcastFundMigrationEVM(ctx context.Context, event *stor b.checkNonceAndMarkBroadcasted(ctx, event, builder, chainID, txHash, oldTSSAddr, data.SigningData.Nonce, broadcastErr) } -// checkNonceAndMarkBroadcasted checks if a nonce has been consumed on-chain despite broadcast error. -// If consumed, the tx landed and we mark BROADCASTED. Otherwise keep SIGNED for retry. +// checkNonceAndMarkBroadcasted checks if a nonce has been consumed on-chain +// despite a broadcast error. If consumed, the tx landed (possibly via another +// node) and we mark BROADCASTED. Otherwise keep SIGNED for retry. func (b *Broadcaster) checkNonceAndMarkBroadcasted( ctx context.Context, event *store.Event, @@ -140,19 +143,16 @@ func (b *Broadcaster) checkNonceAndMarkBroadcasted( eventNonce uint64, broadcastErr error, ) { - finalizedNonce, err := builder.GetNextNonce(ctx, signerAddr, true) - if err == nil && eventNonce < finalizedNonce { - // Nonce consumed — tx is on chain. Mark BROADCASTED so the resolver can verify it. - b.logger.Info().Err(broadcastErr).Str("event_id", event.EventID).Str("chain", chainID). - Str("tx_hash", txHash). + log := b.logger.With().Str("event_id", event.EventID).Str("chain", chainID).Logger() + + verdict, finalizedNonce := txflow.CheckNonce(ctx, builder, signerAddr, eventNonce) + if verdict == txflow.NonceConsumed { + log.Debug().Err(broadcastErr).Str("tx_hash", txHash). Uint64("event_nonce", eventNonce).Uint64("finalized_nonce", finalizedNonce). Msg("broadcast failed but tx already on chain, marking BROADCASTED") b.markBroadcasted(event, chainID, txHash) return } - // Nonce not consumed — transient error (RPC down, gas issues, etc.). - // Keep as SIGNED and retry next tick. - b.logger.Debug().Err(broadcastErr).Str("event_id", event.EventID).Str("chain", chainID). - Msg("broadcast failed, will retry next tick") + log.Debug().Err(broadcastErr).Msg("broadcast failed, will retry next tick") } diff --git a/universalClient/tss/txbroadcaster/svm.go b/universalClient/tss/txbroadcaster/svm.go index fd0f9436b..5fa7dce5d 100644 --- a/universalClient/tss/txbroadcaster/svm.go +++ b/universalClient/tss/txbroadcaster/svm.go @@ -5,9 +5,10 @@ import ( "time" "github.com/pushchain/push-chain-node/universalClient/store" + "github.com/pushchain/push-chain-node/universalClient/tss/txflow" ) -// broadcastSVM broadcasts a signed Solana transaction and moves the event to +// broadcastOutboundSVM broadcasts a signed Solana transaction and moves the event to // its next state. // // Three phases, top to bottom: @@ -30,20 +31,22 @@ import ( // - BROADCASTED(real-hash) → broadcast succeeded // - BROADCASTED("") → peer landed it, or cluster confirmed expiry // - stay SIGNED → retry next tick -func (b *Broadcaster) broadcastSVM(ctx context.Context, event *store.Event, data *SignedOutboundData, chainID string) { +func (b *Broadcaster) broadcastOutboundSVM(ctx context.Context, event *store.Event, data *txflow.SignedOutboundData, chainID string) { + log := b.logger.With().Str("event_id", event.EventID).Str("chain", chainID).Logger() + client, err := b.chains.GetClient(chainID) if err != nil { - b.logger.Warn().Err(err).Str("event_id", event.EventID).Msg("failed to get chain client") + log.Warn().Err(err).Msg("failed to get chain client") return } builder, err := client.GetTxBuilder() if err != nil { - b.logger.Warn().Err(err).Str("event_id", event.EventID).Msg("failed to get tx builder") + log.Warn().Err(err).Msg("failed to get tx builder") return } - signingReq, signature, err := decodeSigningData(data.SigningData) + signingReq, signature, err := txflow.DecodeSigningData(data.SigningData) if err != nil { - b.logger.Warn().Err(err).Str("event_id", event.EventID).Msg("failed to decode signing data") + log.Warn().Err(err).Msg("failed to decode signing data") return } @@ -55,20 +58,18 @@ func (b *Broadcaster) broadcastSVM(ctx context.Context, event *store.Event, data // Past local deadline — confirm with the cluster before giving up. if now > deadline { executed, clusterTime, checkErr := builder.IsAlreadyExecuted(ctx, txID) - log := b.logger.With(). - Str("event_id", event.EventID).Str("chain", chainID). - Int64("signing_deadline", deadline).Int64("cluster_block_time", clusterTime).Logger() + dlog := log.With().Int64("signing_deadline", deadline).Int64("cluster_block_time", clusterTime).Logger() switch { case checkErr != nil: - log.Debug().Err(checkErr).Msg("SVM cluster check failed at deadline, retry next tick") + dlog.Debug().Err(checkErr).Msg("SVM cluster check failed at deadline, retry next tick") return case executed: - log.Info().Msg("SVM tx executed by peer past local deadline, marking BROADCASTED") + dlog.Debug().Msg("SVM tx executed by peer past local deadline, marking BROADCASTED") b.markBroadcasted(event, chainID, "") return case clusterTime > deadline: - log.Warn().Msg("SVM deadline cluster-confirmed expired, marking BROADCASTED for resolver REVERT") + dlog.Debug().Msg("SVM deadline cluster-confirmed expired, marking BROADCASTED for resolver REVERT") b.markBroadcasted(event, chainID, "") return } @@ -84,14 +85,11 @@ func (b *Broadcaster) broadcastSVM(ctx context.Context, event *store.Event, data // Race: a peer may have landed the same signed tx in the meantime. if executed, _, _ := builder.IsAlreadyExecuted(ctx, txID); executed { - b.logger.Info().Err(broadcastErr).Str("event_id", event.EventID).Str("chain", chainID). - Msg("SVM broadcast failed but tx executed on chain (race), marking BROADCASTED") + log.Debug().Err(broadcastErr).Msg("SVM broadcast failed but tx executed on chain (race), marking BROADCASTED") b.markBroadcasted(event, chainID, "") return } - b.logger.Info().Err(broadcastErr). - Str("event_id", event.EventID).Str("chain", chainID). - Int64("signing_deadline", deadline). + log.Debug().Err(broadcastErr).Int64("signing_deadline", deadline). Msg("SVM broadcast failed, staying SIGNED for next tick") } diff --git a/universalClient/tss/txflow/nonce.go b/universalClient/tss/txflow/nonce.go new file mode 100644 index 000000000..6a99c3320 --- /dev/null +++ b/universalClient/tss/txflow/nonce.go @@ -0,0 +1,41 @@ +package txflow + +import ( + "context" + + "github.com/pushchain/push-chain-node/universalClient/chains/common" +) + +// NonceVerdict captures the outcome of comparing the signed nonce against +// the chain's finalized nonce. EVM-only — SVM does not use nonces this way. +type NonceVerdict int + +const ( + // NonceUnknown means the RPC failed and the caller should defer the decision. + NonceUnknown NonceVerdict = iota + // NonceConsumed means the chain advanced past the signed nonce. Some other + // tx took that slot; our signed tx can never land. + NonceConsumed + // NonceAvailable means the chain hasn't consumed the signed nonce yet. The + // tx may still be in mempool, or was dropped — a re-broadcast may land it. + NonceAvailable +) + +// CheckNonce compares signedNonce against the chain's finalized nonce for +// `signer`. Used by: +// - broadcaster (post-broadcast-error path) to detect "the tx already +// landed via another node despite our RPC error" +// - resolver (tx-not-found path) to distinguish dead tx (REVERT) from +// mempool-drop (rewind to SIGNED). +// +// The returned finalizedNonce is for logging / observability. +func CheckNonce(ctx context.Context, builder common.TxBuilder, signer string, signedNonce uint64) (NonceVerdict, uint64) { + finalizedNonce, err := builder.GetNextNonce(ctx, signer, true) + if err != nil { + return NonceUnknown, 0 + } + if signedNonce < finalizedNonce { + return NonceConsumed, finalizedNonce + } + return NonceAvailable, finalizedNonce +} diff --git a/universalClient/tss/txflow/parse.go b/universalClient/tss/txflow/parse.go new file mode 100644 index 000000000..6bb71857e --- /dev/null +++ b/universalClient/tss/txflow/parse.go @@ -0,0 +1,66 @@ +package txflow + +import ( + "encoding/hex" + "encoding/json" + "fmt" + + "github.com/pushchain/push-chain-node/universalClient/chains/common" + "github.com/pushchain/push-chain-node/universalClient/store" + "github.com/pushchain/push-chain-node/universalClient/tss/coordinator" +) + +// DecodeSigningData converts the persisted hex-encoded signature + signing +// hash into the byte forms the chain-specific tx builders consume. +func DecodeSigningData(sd *SigningData) (*common.UnsignedSigningReq, []byte, error) { + signingHash, err := hex.DecodeString(sd.SigningHash) + if err != nil { + return nil, nil, fmt.Errorf("failed to decode signing hash: %w", err) + } + signature, err := hex.DecodeString(sd.Signature) + if err != nil { + return nil, nil, fmt.Errorf("failed to decode signature: %w", err) + } + return &common.UnsignedSigningReq{ + SigningHash: signingHash, + Nonce: sd.Nonce, + TSSFundMigrationAmount: sd.TSSFundMigrationAmount, + }, signature, nil +} + +// ReadSignedNonce extracts the signed nonce from any signed outbound event +// payload. Returns ok=false when the payload is unparseable or signing data +// is missing — caller defers in that case. +func ReadSignedNonce(event *store.Event) (uint64, bool) { + var data SignedOutboundData + if err := json.Unmarshal(event.EventData, &data); err != nil || data.SigningData == nil { + return 0, false + } + return data.SigningData.Nonce, true +} + +// ReadSigningDeadline extracts the chain-emitted signing deadline from a +// signed outbound event payload. Returns 0 if the event is unparseable or +// the deadline was never set (legacy events). +func ReadSigningDeadline(event *store.Event) int64 { + var data SignedOutboundData + if err := json.Unmarshal(event.EventData, &data); err != nil { + return 0 + } + return data.SigningDeadline +} + +// ReadFundMigrationSigner derives the sender EVM address (old TSS) and reads +// the signed nonce from a fund migration event payload. Returns ok=false on +// missing/invalid fields — caller defers in that case. +func ReadFundMigrationSigner(event *store.Event) (signer string, nonce uint64, ok bool) { + var data SignedFundMigrationData + if err := json.Unmarshal(event.EventData, &data); err != nil || data.SigningData == nil || data.OldTssPubkey == "" { + return "", 0, false + } + addr, err := coordinator.DeriveEVMAddressFromPubkey(data.OldTssPubkey) + if err != nil { + return "", 0, false + } + return addr, data.SigningData.Nonce, true +} diff --git a/universalClient/tss/txflow/types.go b/universalClient/tss/txflow/types.go new file mode 100644 index 000000000..c51a31f81 --- /dev/null +++ b/universalClient/tss/txflow/types.go @@ -0,0 +1,40 @@ +// Package txflow holds the shared types and helpers used by both the +// transaction broadcaster and the resolver. Each module owns its own +// lifecycle (broadcaster pushes SIGNED→BROADCASTED, resolver pulls +// BROADCASTED→terminal), but they read the same persisted event payloads +// and apply the same rules (signed-vs-finalized nonce comparison, signing +// data decoding). Lifting those shared concerns here gives one source of +// truth without conflating the two modules' responsibilities. +package txflow + +import ( + "math/big" + + uexecutortypes "github.com/pushchain/push-chain-node/x/uexecutor/types" + utsstypes "github.com/pushchain/push-chain-node/x/utss/types" +) + +// SigningData holds the signing parameters persisted by sessionManager when +// marking an event SIGNED. Both broadcaster and resolver read these fields +// — broadcaster to assemble + send the tx, resolver to compare the signed +// nonce against the chain's finalized nonce. +type SigningData struct { + Signature string `json:"signature"` // hex-encoded 64/65 byte signature + SigningHash string `json:"signing_hash"` // hex-encoded signing hash + Nonce uint64 `json:"nonce"` + TSSFundMigrationAmount *big.Int `json:"tss_fund_migration_amount,omitempty"` +} + +// SignedOutboundData wraps OutboundCreatedEvent with the signing data the +// broadcaster needs to assemble the destination-chain tx. +type SignedOutboundData struct { + uexecutortypes.OutboundCreatedEvent + SigningData *SigningData `json:"signing_data,omitempty"` +} + +// SignedFundMigrationData wraps FundMigrationInitiatedEventData with the +// signing data needed for the migration sweep tx. +type SignedFundMigrationData struct { + utsstypes.FundMigrationInitiatedEventData + SigningData *SigningData `json:"signing_data,omitempty"` +} diff --git a/universalClient/tss/txresolver/evm.go b/universalClient/tss/txresolver/evm.go index 192192d28..ea9778974 100644 --- a/universalClient/tss/txresolver/evm.go +++ b/universalClient/tss/txresolver/evm.go @@ -4,109 +4,183 @@ import ( "context" "github.com/pushchain/push-chain-node/universalClient/store" + "github.com/pushchain/push-chain-node/universalClient/tss/txflow" ) -// txCheckResult represents the outcome of verifying a tx on chain with not-found retry handling. -type txCheckResult int +// Decision flow for EVM-broadcasted events (outbound and fund migration both +// follow this shape): +// +// - VerifyBroadcastedTx error → stay BROADCASTED (retry) +// - Tx found, insufficient confirmations → stay BROADCASTED (retry) +// - Tx found, status=1 (success) → COMPLETED / vote success +// - Tx found, status=0 (reverted on chain) → REVERT / vote failure with tx hash +// - Tx not found, signed nonce < finalized nonce → REVERT / vote failure (another tx +// consumed our nonce slot) +// - Tx not found, signed nonce >= finalized nonce → rewind to SIGNED so the broadcaster +// re-broadcasts (covers mempool drop) +// - Tx not found, nonce check unavailable → stay BROADCASTED (retry) +// +// The nonce IS the give-up signal; there is no max-retry counter. The two +// flows differ only in (a) which vote function records success/failure and +// (b) where the signer address comes from — current TSS for outbound, OLD TSS +// (derived from the event's old pubkey) for fund migration. +// +// Shared types (SignedOutboundData / SigningData) and helpers (DecodeSigningData, +// ReadSignedNonce, ReadFundMigrationSigner, CheckNonce, NonceVerdict) live in +// tss/txflow so the broadcaster applies the exact same rules. + +// resolveOutboundEVM resolves a BROADCASTED outbound on an EVM chain. +func (r *Resolver) resolveOutboundEVM(ctx context.Context, event *store.Event, chainID, rawTxHash string) { + log := r.logger.With(). + Str("event_id", event.EventID). + Str("type", event.Type). + Str("chain", chainID). + Str("tx_hash", rawTxHash).Logger() -const ( - txCheckRetry txCheckResult = iota // tx not found or not enough confirmations, retry later - txCheckMaxRetries // tx not found after max retries - txCheckReverted // tx found, confirmed, status=0 - txCheckSuccess // tx found, confirmed, status=1 -) + txID, utxID, err := extractOutboundIDs(event) + if err != nil { + log.Warn().Err(err).Msg("failed to extract outbound IDs") + return + } -// checkEVMTx verifies a tx on chain and handles the not-found retry counter. -// Returns the check result, block height, and raw tx hash for further processing. -func (r *Resolver) checkEVMTx(ctx context.Context, event *store.Event, chainID, rawTxHash string) (txCheckResult, uint64) { - found, blockHeight, confirmations, status, err := r.verifyTxOnChain(ctx, chainID, rawTxHash) + builder, err := r.getBuilder(chainID) if err != nil { - r.logger.Debug().Err(err).Str("event_id", event.EventID).Msg("tx verification error") - return txCheckRetry, 0 + log.Debug().Err(err).Msg("failed to get tx builder, will retry next tick") + return } - if !found { - r.notFoundCounts[event.EventID]++ - count := r.notFoundCounts[event.EventID] - r.logger.Debug(). - Str("event_id", event.EventID).Str("tx_hash", rawTxHash). - Int("not_found_count", count).Msg("tx not found on chain, will retry") + found, blockHeight, confirmations, status, vErr := builder.VerifyBroadcastedTx(ctx, rawTxHash) + if vErr != nil { + log.Debug().Err(vErr).Msg("tx verification error, will retry next tick") + return + } - if count >= maxNotFoundRetries { - delete(r.notFoundCounts, event.EventID) - return txCheckMaxRetries, 0 + if found { + if confirmations < r.chains.GetStandardConfirmations(chainID) { + return } - return txCheckRetry, 0 + if status == 0 { + gasFeeUsed, fErr := builder.GetGasFeeUsed(ctx, rawTxHash) + if fErr != nil { + log.Debug().Err(fErr).Msg("failed to fetch gas fee for reverted tx, will retry next tick") + return + } + _ = r.voteOutboundFailureAndMarkReverted(ctx, event, txID, utxID, rawTxHash, blockHeight, gasFeeUsed, + "tx execution reverted on destination chain") + return + } + if uerr := r.eventStore.Update(event.EventID, map[string]any{"status": store.StatusCompleted}); uerr != nil { + log.Warn().Err(uerr).Msg("failed to mark event COMPLETED") + return + } + log.Info().Msg("event marked as COMPLETED") + return } - delete(r.notFoundCounts, event.EventID) - - requiredConfs := r.chains.GetStandardConfirmations(chainID) - if confirmations < requiredConfs { - return txCheckRetry, 0 + signer, signedNonce, ok := r.outboundSigner(ctx, event) + if !ok { + return } - - if status == 0 { - return txCheckReverted, blockHeight + verdict, finalizedNonce := txflow.CheckNonce(ctx, builder, signer, signedNonce) + nlog := log.With().Uint64("signed_nonce", signedNonce).Uint64("finalized_nonce", finalizedNonce).Logger() + switch verdict { + case txflow.NonceUnknown: + nlog.Debug().Msg("could not fetch finalized nonce, will retry next tick") + case txflow.NonceConsumed: + nlog.Debug().Msg("EVM outbound tx not found and nonce already finalized → REVERT") + _ = r.voteOutboundFailureAndMarkReverted(ctx, event, txID, utxID, "", 0, "0", + "tx not executed on destination chain") + case txflow.NonceAvailable: + r.rewindToSigned(event, chainID, signedNonce, finalizedNonce) } - - return txCheckSuccess, blockHeight } -// resolveOutboundEVM checks the on-chain receipt for an outbound EVM tx. -// Success vote is done by destination chain event listener, not here. -func (r *Resolver) resolveOutboundEVM(ctx context.Context, event *store.Event, chainID, rawTxHash string) { - txID, utxID, err := extractOutboundIDs(event) +// resolveFundMigrationEVM mirrors resolveOutboundEVM. The signer comes from +// the event payload (OldTssPubkey) instead of the current TSS, and the +// success/failure path uses the fund-migration voting helper which both votes +// and marks the event in a single step. +func (r *Resolver) resolveFundMigrationEVM(ctx context.Context, event *store.Event, chainID, rawTxHash string, migrationID uint64) { + log := r.logger.With(). + Str("event_id", event.EventID). + Str("type", event.Type). + Str("chain", chainID). + Str("tx_hash", rawTxHash).Logger() + + builder, err := r.getBuilder(chainID) if err != nil { - r.logger.Warn().Err(err).Str("event_id", event.EventID).Msg("failed to extract outbound IDs") + log.Debug().Err(err).Msg("failed to get tx builder, will retry next tick") return } - result, blockHeight := r.checkEVMTx(ctx, event, chainID, rawTxHash) - - switch result { - case txCheckRetry: + found, _, confirmations, status, vErr := builder.VerifyBroadcastedTx(ctx, rawTxHash) + if vErr != nil { + log.Debug().Err(vErr).Msg("fund migration tx verification error, will retry next tick") return + } - case txCheckMaxRetries: - _ = r.voteOutboundFailureAndMarkReverted(ctx, event, txID, utxID, "", 0, "0", - "tx not found on destination chain after max retries") - - case txCheckReverted: - gasFeeUsed := "0" - if builder, err := r.getBuilder(chainID); err == nil { - if fee, err := builder.GetGasFeeUsed(ctx, rawTxHash); err == nil { - gasFeeUsed = fee - } - } - _ = r.voteOutboundFailureAndMarkReverted(ctx, event, txID, utxID, rawTxHash, blockHeight, gasFeeUsed, - "tx execution reverted on destination chain") - - case txCheckSuccess: - // Success vote done by destination chain event listener - if err := r.eventStore.Update(event.EventID, map[string]any{"status": store.StatusCompleted}); err != nil { - r.logger.Warn().Err(err).Str("event_id", event.EventID).Msg("failed to mark event COMPLETED") + if found { + if confirmations < r.chains.GetStandardConfirmations(chainID) { return } - r.logger.Info(). - Str("event_id", event.EventID).Str("tx_hash", rawTxHash). - Msg("outbound EVM tx marked COMPLETED") + r.voteFundMigrationAndMark(ctx, event, migrationID, rawTxHash, status != 0) + return + } + + signer, signedNonce, ok := txflow.ReadFundMigrationSigner(event) + if !ok { + log.Warn().Msg("fund migration tx not found and signer info unavailable, staying BROADCASTED") + return + } + verdict, finalizedNonce := txflow.CheckNonce(ctx, builder, signer, signedNonce) + nlog := log.With().Uint64("signed_nonce", signedNonce).Uint64("finalized_nonce", finalizedNonce).Logger() + switch verdict { + case txflow.NonceUnknown: + nlog.Debug().Msg("could not fetch finalized nonce, will retry next tick") + case txflow.NonceConsumed: + nlog.Debug().Msg("EVM fund migration tx not found and nonce already finalized → REVERT") + r.voteFundMigrationAndMark(ctx, event, migrationID, "", false) + case txflow.NonceAvailable: + r.rewindToSigned(event, chainID, signedNonce, finalizedNonce) } } -// resolveFundMigrationEVM checks the on-chain receipt for a fund migration EVM tx. -// Votes success/failure explicitly since there is no gateway event listener for native transfers. -func (r *Resolver) resolveFundMigrationEVM(ctx context.Context, event *store.Event, chainID, rawTxHash string, migrationID uint64) { - result, _ := r.checkEVMTx(ctx, event, chainID, rawTxHash) +// outboundSigner resolves the outbound signer + signed nonce, logging and +// returning ok=false when either is unavailable so the caller can defer +// without dragging the resolver-level guards into the main flow. +func (r *Resolver) outboundSigner(ctx context.Context, event *store.Event) (string, uint64, bool) { + log := r.logger.With().Str("event_id", event.EventID).Logger() - switch result { - case txCheckRetry: + signedNonce, ok := txflow.ReadSignedNonce(event) + if !ok { + log.Warn().Msg("EVM tx not found and signed nonce unavailable, staying BROADCASTED") + return "", 0, false + } + if r.getTSSAddress == nil { + log.Warn().Msg("EVM tx not found and no TSS-address resolver configured, staying BROADCASTED") + return "", 0, false + } + addr, err := r.getTSSAddress(ctx) + if err != nil { + log.Debug().Err(err).Msg("could not fetch TSS address, will retry next tick") + return "", 0, false + } + return addr, signedNonce, true +} + +// rewindToSigned moves a BROADCASTED event back to SIGNED so the broadcaster +// will re-broadcast on the next tick. Used when the EVM tx hash isn't visible +// on chain but the signed nonce is still available — covers mempool drops. +func (r *Resolver) rewindToSigned(event *store.Event, chainID string, signedNonce, finalizedNonce uint64) { + log := r.logger.With(). + Str("event_id", event.EventID). + Str("type", event.Type). + Str("chain", chainID). + Uint64("signed_nonce", signedNonce). + Uint64("finalized_nonce", finalizedNonce).Logger() + + if err := r.eventStore.Update(event.EventID, map[string]any{"status": store.StatusSigned}); err != nil { + log.Warn().Err(err).Msg("failed to rewind event to SIGNED for re-broadcast") return - case txCheckMaxRetries: - r.voteFundMigrationAndMark(ctx, event, migrationID, "", false) - case txCheckReverted: - r.voteFundMigrationAndMark(ctx, event, migrationID, rawTxHash, false) - case txCheckSuccess: - r.voteFundMigrationAndMark(ctx, event, migrationID, rawTxHash, true) } + log.Debug().Msg("event marked as SIGNED") } diff --git a/universalClient/tss/txresolver/resolver.go b/universalClient/tss/txresolver/resolver.go index 83c102fd9..cbf2e628d 100644 --- a/universalClient/tss/txresolver/resolver.go +++ b/universalClient/tss/txresolver/resolver.go @@ -19,46 +19,37 @@ import ( "github.com/pushchain/push-chain-node/universalClient/tss/eventstore" ) -// --------------------------------------------------------------------------- -// Resolver -// --------------------------------------------------------------------------- - -// Config holds configuration for the tx resolver. type Config struct { EventStore *eventstore.Store Chains *chains.Chains PushSigner *pushsigner.Signer CheckInterval time.Duration Logger zerolog.Logger + GetTSSAddress func(ctx context.Context) (string, error) } -// maxNotFoundRetries is the number of consecutive "not found" checks before reverting. -// At a 30s check interval this gives ~5 minutes for a tx to appear on chain. -const maxNotFoundRetries = 10 - // Resolver takes BROADCASTED txs and moves them to terminal status (COMPLETED or REVERTED). type Resolver struct { - eventStore *eventstore.Store - chains *chains.Chains - pushSigner *pushsigner.Signer - checkInterval time.Duration - logger zerolog.Logger - notFoundCounts map[string]int // eventID -> consecutive not-found count + eventStore *eventstore.Store + chains *chains.Chains + pushSigner *pushsigner.Signer + checkInterval time.Duration + logger zerolog.Logger + getTSSAddress func(ctx context.Context) (string, error) } -// NewResolver creates a new tx resolver. func NewResolver(cfg Config) *Resolver { interval := cfg.CheckInterval if interval == 0 { interval = 15 * time.Second } return &Resolver{ - eventStore: cfg.EventStore, - chains: cfg.Chains, - pushSigner: cfg.PushSigner, - checkInterval: interval, - logger: cfg.Logger.With().Str("component", "txresolver").Logger(), - notFoundCounts: make(map[string]int), + eventStore: cfg.EventStore, + chains: cfg.Chains, + pushSigner: cfg.PushSigner, + checkInterval: interval, + logger: cfg.Logger.With().Str("component", "txresolver").Logger(), + getTSSAddress: cfg.GetTSSAddress, } } @@ -209,14 +200,6 @@ func (r *Resolver) getBuilder(chainID string) (common.TxBuilder, error) { return client.GetTxBuilder() } -func (r *Resolver) verifyTxOnChain(ctx context.Context, chainID, txHash string) (bool, uint64, uint64, uint8, error) { - builder, err := r.getBuilder(chainID) - if err != nil { - return false, 0, 0, 0, err - } - return builder.VerifyBroadcastedTx(ctx, txHash) -} - // voteOutboundFailureAndMarkReverted votes failure for an outbound event and marks it REVERTED. func (r *Resolver) voteOutboundFailureAndMarkReverted(ctx context.Context, event *store.Event, txID, utxID, txHash string, blockHeight uint64, gasFeeUsed string, errorMsg string) error { if r.pushSigner == nil { @@ -242,8 +225,11 @@ func (r *Resolver) voteOutboundFailureAndMarkReverted(ctx context.Context, event return fmt.Errorf("failed to mark event %s as reverted: %w", event.EventID, err) } r.logger.Info(). - Str("event_id", event.EventID).Str("tx_id", txID). - Str("error_msg", errorMsg).Msg("voted outbound failure and marked REVERTED") + Str("event_id", event.EventID). + Str("type", event.Type). + Str("vote_tx_hash", voteTxHash). + Str("error_msg", errorMsg). + Msg("event marked as REVERTED") return nil } @@ -272,7 +258,9 @@ func (r *Resolver) voteFundMigrationAndMark(ctx context.Context, event *store.Ev } r.logger.Info(). - Str("event_id", event.EventID).Uint64("migration_id", migrationID). - Str("tx_hash", txHash).Bool("success", success).Str("status", newStatus). - Msg("voted fund migration and updated status") + Str("event_id", event.EventID). + Str("type", event.Type). + Uint64("migration_id", migrationID). + Str("vote_tx_hash", voteTxHash). + Msg("event marked as " + newStatus) } diff --git a/universalClient/tss/txresolver/resolver_test.go b/universalClient/tss/txresolver/resolver_test.go index 987c950f1..64f50350c 100644 --- a/universalClient/tss/txresolver/resolver_test.go +++ b/universalClient/tss/txresolver/resolver_test.go @@ -76,9 +76,9 @@ func (m *mockTxBuilder) BroadcastFundMigrationTx(ctx context.Context, req *commo type mockChainClient struct{ builder *mockTxBuilder } -func (m *mockChainClient) Start(context.Context) error { return nil } -func (m *mockChainClient) Stop() error { return nil } -func (m *mockChainClient) IsHealthy() bool { return true } +func (m *mockChainClient) Start(context.Context) error { return nil } +func (m *mockChainClient) Stop() error { return nil } +func (m *mockChainClient) IsHealthy() bool { return true } func (m *mockChainClient) GetTxBuilder() (common.TxBuilder, error) { return m.builder, nil } func setupTestDB(t *testing.T) (*eventstore.Store, *gorm.DB) { @@ -178,6 +178,19 @@ func newResolver(evtStore *eventstore.Store, ch *chains.Chains) *Resolver { }) } +// newResolverWithTSSAddress builds a Resolver that returns a fixed TSS address +// from GetTSSAddress — needed by tests that exercise the EVM nonce-based +// retry/revert path. +func newResolverWithTSSAddress(evtStore *eventstore.Store, ch *chains.Chains, addr string) *Resolver { + return NewResolver(Config{ + EventStore: evtStore, + Chains: ch, + CheckInterval: 0, + Logger: zerolog.Nop(), + GetTSSAddress: func(ctx context.Context) (string, error) { return addr, nil }, + }) +} + func TestParseCAIPTxHash(t *testing.T) { t.Run("valid CAIP tx hash", func(t *testing.T) { chainID, txHash, err := parseCAIPTxHash("eip155:1:0xabc123") @@ -269,10 +282,11 @@ func TestSVM_PDAExists_MarksCompleted(t *testing.T) { require.Equal(t, store.StatusCompleted, updated.Status) } -func TestSVM_PDANotFound_VotesFailureAndReverts(t *testing.T) { - // PDA not found → vote failure → REVERTED. - // Note: orphaned StoredIxData PDAs are reclaimed by the periodic - // RentReclaimer in svm/rent_reclaimer.go, not from this hot path. +func TestSVM_PDAAbsent_DeadlineZero_ClusterFresh_Reverts(t *testing.T) { + // Legacy event with no deadline (=0). PDA absent + fresh cluster time + // (>> 0) satisfies `clusterTime > deadline + slack` → reaches REVERT. + // No PushSigner → vote returns nil, status stays BROADCASTED. The point + // is that the resolver reaches the vote path (not defers). evtStore, db := setupTestDB(t) builder := &mockTxBuilder{} client := &mockChainClient{builder: builder} @@ -281,16 +295,15 @@ func TestSVM_PDANotFound_VotesFailureAndReverts(t *testing.T) { eventData := makeOutboundEventData("tx-123", "utx-456", "solana:mainnet") insertBroadcastedEvent(t, db, "ev-1", "solana:mainnet", "solana:mainnet:", eventData) - builder.On("IsAlreadyExecuted", mock.Anything, "tx-123").Return(false, int64(0), nil) + builder.On("IsAlreadyExecuted", mock.Anything, "tx-123").Return(false, time.Now().Unix(), nil) - // No PushSigner — voteFailure logs a warning and returns nil without marking REVERTED. resolver := newResolver(evtStore, ch) ev := getEvent(t, db, "ev-1") resolver.resolveSVM(context.Background(), &ev, "solana:mainnet") updated := getEvent(t, db, "ev-1") - require.Equal(t, store.StatusBroadcasted, updated.Status) - builder.AssertExpectations(t) + require.Equal(t, store.StatusBroadcasted, updated.Status) // no PushSigner → vote skipped + builder.AssertCalled(t, "IsAlreadyExecuted", mock.Anything, "tx-123") } func TestSVM_PDACheckFails_StaysBroadcasted(t *testing.T) { @@ -313,6 +326,111 @@ func TestSVM_PDACheckFails_StaysBroadcasted(t *testing.T) { require.Equal(t, store.StatusBroadcasted, updated.Status) // stays BROADCASTED } +// makeOutboundEventDataWithDeadline mirrors makeOutboundEventData but sets the +// chain-emitted signing deadline used by the resolver's cluster-time gate. +func makeOutboundEventDataWithDeadline(txID, utxID, destChain string, deadline int64) []byte { + data := uexecutortypes.OutboundCreatedEvent{ + TxID: txID, + UniversalTxId: utxID, + DestinationChain: destChain, + SigningDeadline: deadline, + } + b, _ := json.Marshal(data) + return b +} + +func TestSVM_PDAAbsent_ClusterTimeUnknown_DefersRevert(t *testing.T) { + // PDA absent + deadline set + cluster time = 0 (RPC didn't supply it) → + // stay BROADCASTED, defer REVERT until we can verify cluster health. + evtStore, db := setupTestDB(t) + builder := &mockTxBuilder{} + client := &mockChainClient{builder: builder} + ch := newTestChains(t, "solana:mainnet", uregistrytypes.VmType_SVM, client) + + eventData := makeOutboundEventDataWithDeadline("tx-123", "utx-456", "solana:mainnet", time.Now().Unix()-3600) + insertBroadcastedEvent(t, db, "ev-1", "solana:mainnet", "solana:mainnet:solTxSig", eventData) + + builder.On("IsAlreadyExecuted", mock.Anything, "tx-123").Return(false, int64(0), nil) + + resolver := newResolver(evtStore, ch) + ev := getEvent(t, db, "ev-1") + resolver.resolveSVM(context.Background(), &ev, "solana:mainnet") + + updated := getEvent(t, db, "ev-1") + require.Equal(t, store.StatusBroadcasted, updated.Status) +} + +func TestSVM_PDAAbsent_ClusterStale_DefersRevert(t *testing.T) { + // PDA absent + cluster time >120s old → cluster halted/lagging → defer REVERT. + evtStore, db := setupTestDB(t) + builder := &mockTxBuilder{} + client := &mockChainClient{builder: builder} + ch := newTestChains(t, "solana:mainnet", uregistrytypes.VmType_SVM, client) + + eventData := makeOutboundEventDataWithDeadline("tx-123", "utx-456", "solana:mainnet", time.Now().Unix()-3600) + insertBroadcastedEvent(t, db, "ev-1", "solana:mainnet", "solana:mainnet:solTxSig", eventData) + + // Cluster block time is 10 minutes old. + builder.On("IsAlreadyExecuted", mock.Anything, "tx-123").Return(false, time.Now().Unix()-600, nil) + + resolver := newResolver(evtStore, ch) + ev := getEvent(t, db, "ev-1") + resolver.resolveSVM(context.Background(), &ev, "solana:mainnet") + + updated := getEvent(t, db, "ev-1") + require.Equal(t, store.StatusBroadcasted, updated.Status) +} + +func TestSVM_PDAAbsent_ClusterStillInWindow_DefersRevert(t *testing.T) { + // PDA absent + cluster fresh but cluster's clock <= deadline+slack → + // the program still accepts late retries; defer REVERT. + evtStore, db := setupTestDB(t) + builder := &mockTxBuilder{} + client := &mockChainClient{builder: builder} + ch := newTestChains(t, "solana:mainnet", uregistrytypes.VmType_SVM, client) + + now := time.Now().Unix() + deadline := now - 30 // local says past, but well under slack + eventData := makeOutboundEventDataWithDeadline("tx-123", "utx-456", "solana:mainnet", deadline) + insertBroadcastedEvent(t, db, "ev-1", "solana:mainnet", "solana:mainnet:solTxSig", eventData) + + // Cluster time = now (fresh) but <= deadline+slack (deadline+60 = now+30). + builder.On("IsAlreadyExecuted", mock.Anything, "tx-123").Return(false, now, nil) + + resolver := newResolver(evtStore, ch) + ev := getEvent(t, db, "ev-1") + resolver.resolveSVM(context.Background(), &ev, "solana:mainnet") + + updated := getEvent(t, db, "ev-1") + require.Equal(t, store.StatusBroadcasted, updated.Status) +} + +func TestSVM_PDAAbsent_ClusterConfirmsExpiry_Reverts(t *testing.T) { + // PDA absent + cluster fresh + cluster past deadline+slack → REVERT path. + // (With no PushSigner the vote is logged but status stays BROADCASTED; + // what we assert is that the resolver reached the vote call.) + evtStore, db := setupTestDB(t) + builder := &mockTxBuilder{} + client := &mockChainClient{builder: builder} + ch := newTestChains(t, "solana:mainnet", uregistrytypes.VmType_SVM, client) + + now := time.Now().Unix() + eventData := makeOutboundEventDataWithDeadline("tx-123", "utx-456", "solana:mainnet", now-3600) + insertBroadcastedEvent(t, db, "ev-1", "solana:mainnet", "solana:mainnet:solTxSig", eventData) + + // Cluster time = now (fresh) and well past deadline+slack. + builder.On("IsAlreadyExecuted", mock.Anything, "tx-123").Return(false, now, nil) + + resolver := newResolver(evtStore, ch) + ev := getEvent(t, db, "ev-1") + resolver.resolveSVM(context.Background(), &ev, "solana:mainnet") + + // No PushSigner → vote returns nil early; status unchanged. The point is + // the resolver REACHED the vote (i.e., didn't defer); covered by absence + // of any defer log path and the mock having been called. + builder.AssertCalled(t, "IsAlreadyExecuted", mock.Anything, "tx-123") +} + func TestSVM_InvalidEventData_Skips(t *testing.T) { // Bad event data → logged and skipped (stays BROADCASTED). evtStore, db := setupTestDB(t) @@ -369,56 +487,6 @@ func TestResolveEventRouting(t *testing.T) { }) } -func TestNotFoundCountTracking(t *testing.T) { - t.Run("increments on not found", func(t *testing.T) { - evtStore, _ := setupTestDB(t) - resolver := NewResolver(Config{ - EventStore: evtStore, - Logger: zerolog.Nop(), - }) - - eventID := "test-event-1" - assert.Equal(t, 0, resolver.notFoundCounts[eventID]) - - resolver.notFoundCounts[eventID]++ - assert.Equal(t, 1, resolver.notFoundCounts[eventID]) - - resolver.notFoundCounts[eventID]++ - assert.Equal(t, 2, resolver.notFoundCounts[eventID]) - }) - - t.Run("cleared after max retries", func(t *testing.T) { - evtStore, _ := setupTestDB(t) - resolver := NewResolver(Config{ - EventStore: evtStore, - Logger: zerolog.Nop(), - }) - - eventID := "test-event-2" - resolver.notFoundCounts[eventID] = maxNotFoundRetries - - // Simulate cleanup - delete(resolver.notFoundCounts, eventID) - assert.Equal(t, 0, resolver.notFoundCounts[eventID]) - }) - - t.Run("cleared when tx found", func(t *testing.T) { - evtStore, _ := setupTestDB(t) - resolver := NewResolver(Config{ - EventStore: evtStore, - Logger: zerolog.Nop(), - }) - - eventID := "test-event-3" - resolver.notFoundCounts[eventID] = 5 - - // Simulate tx found — clear tracking - delete(resolver.notFoundCounts, eventID) - _, exists := resolver.notFoundCounts[eventID] - assert.False(t, exists) - }) -} - func TestVoteFailureAndMarkReverted(t *testing.T) { t.Run("no push signer logs warning and returns nil", func(t *testing.T) { evtStore, _ := setupTestDB(t) @@ -499,31 +567,155 @@ func TestFundMigrationEVM_TxReverted_VotesFailure(t *testing.T) { require.Equal(t, store.StatusBroadcasted, ev.Status) } -func TestFundMigrationEVM_TxNotFound_RetriesAndReverts(t *testing.T) { +// testOldTSSPubkey is a valid compressed secp256k1 pubkey that DeriveEVMAddressFromPubkey +// can parse into testOldTSSAddr. Used by fund migration nonce-based tests. +const testOldTSSPubkey = "03d5d5d290a0ecec420e843fc2a57f1696781ec657e204406fc67bb5fe0c751317" +const testOldTSSAddr = "0x9fed6f778a956244c06a3b905ba45bdb2ec3afea" + +// makeFundMigrationEventDataWithNonce mirrors makeFundMigrationEventData but +// adds OldTssPubkey + signing_data.nonce — the fields the resolver consults +// on tx-not-found. +func makeFundMigrationEventDataWithNonce(migrationID uint64, chain, oldPubkey string, nonce uint64) []byte { + b, _ := json.Marshal(map[string]any{ + "migration_id": migrationID, + "chain": chain, + "old_tss_pubkey": oldPubkey, + "signing_data": map[string]any{ + "nonce": nonce, + }, + }) + return b +} + +func insertBroadcastedFundMigrationEventWithNonce( + t *testing.T, db *gorm.DB, + eventID, chain, broadcastedTxHash string, + migrationID uint64, oldPubkey string, nonce uint64, +) { + t.Helper() + event := store.Event{ + EventID: eventID, + BlockHeight: 100, + ExpiryBlockHeight: 99999, + Type: store.EventTypeSignFundMigrate, + ConfirmationType: "INSTANT", + Status: store.StatusBroadcasted, + EventData: makeFundMigrationEventDataWithNonce(migrationID, chain, oldPubkey, nonce), + BroadcastedTxHash: broadcastedTxHash, + } + require.NoError(t, db.Create(&event).Error) +} + +func TestFundMigrationEVM_NotFound_NonceConsumed_VotesFailure(t *testing.T) { + // Fund migration tx not found AND old-TSS signed nonce already finalized → + // another tx consumed the slot. REVERT path. + evtStore, db := setupTestDB(t) + builder := &mockTxBuilder{} + client := &mockChainClient{builder: builder} + ch := newTestChains(t, "eip155:1", uregistrytypes.VmType_EVM, client) + + insertBroadcastedFundMigrationEventWithNonce( + t, db, "fm-consumed", "eip155:1", "eip155:1:0xmigmissing", 42, testOldTSSPubkey, 3, + ) + + builder.On("VerifyBroadcastedTx", mock.Anything, "0xmigmissing"). + Return(false, uint64(0), uint64(0), uint8(0), nil) + builder.On("GetNextNonce", mock.Anything, testOldTSSAddr, true).Return(uint64(5), nil) + + resolver := newResolver(evtStore, ch) // GetTSSAddress not needed; signer derived from event + resolver.processBroadcasted(context.Background()) + + // No PushSigner → vote skipped, status stays BROADCASTED. + ev := getEvent(t, db, "fm-consumed") + require.Equal(t, store.StatusBroadcasted, ev.Status) + builder.AssertCalled(t, "GetNextNonce", mock.Anything, testOldTSSAddr, true) +} + +func TestFundMigrationEVM_NotFound_NonceUnconsumed_RewindsToSigned(t *testing.T) { + // Fund migration tx not found AND old-TSS signed nonce not yet finalized → + // rewind to SIGNED for re-broadcast. evtStore, db := setupTestDB(t) builder := &mockTxBuilder{} client := &mockChainClient{builder: builder} ch := newTestChains(t, "eip155:1", uregistrytypes.VmType_EVM, client) - insertBroadcastedFundMigrationEvent(t, db, "fm-1", "eip155:1", "eip155:1:0xnotfound", 42) + insertBroadcastedFundMigrationEventWithNonce( + t, db, "fm-unconsumed", "eip155:1", "eip155:1:0xmigpending", 42, testOldTSSPubkey, 5, + ) - // Tx not found - builder.On("VerifyBroadcastedTx", mock.Anything, "0xnotfound"). + builder.On("VerifyBroadcastedTx", mock.Anything, "0xmigpending"). Return(false, uint64(0), uint64(0), uint8(0), nil) + builder.On("GetNextNonce", mock.Anything, testOldTSSAddr, true).Return(uint64(5), nil) resolver := newResolver(evtStore, ch) + resolver.processBroadcasted(context.Background()) - // Should increment not found count each time, stay BROADCASTED - for i := 0; i < maxNotFoundRetries-1; i++ { - resolver.processBroadcasted(context.Background()) - ev := getEvent(t, db, "fm-1") - require.Equal(t, store.StatusBroadcasted, ev.Status) - } + ev := getEvent(t, db, "fm-unconsumed") + require.Equal(t, store.StatusSigned, ev.Status) +} + +func TestFundMigrationEVM_NotFound_NonceRPCError_StaysBroadcasted(t *testing.T) { + // Fund migration tx not found and nonce RPC errors → defer (retry next tick). + evtStore, db := setupTestDB(t) + builder := &mockTxBuilder{} + client := &mockChainClient{builder: builder} + ch := newTestChains(t, "eip155:1", uregistrytypes.VmType_EVM, client) - // On max retries, without pushSigner vote is skipped + insertBroadcastedFundMigrationEventWithNonce( + t, db, "fm-rpc-err", "eip155:1", "eip155:1:0xmigmissing", 42, testOldTSSPubkey, 3, + ) + + builder.On("VerifyBroadcastedTx", mock.Anything, "0xmigmissing"). + Return(false, uint64(0), uint64(0), uint8(0), nil) + builder.On("GetNextNonce", mock.Anything, testOldTSSAddr, true).Return(uint64(0), assert.AnError) + + resolver := newResolver(evtStore, ch) resolver.processBroadcasted(context.Background()) - ev := getEvent(t, db, "fm-1") - require.Equal(t, store.StatusBroadcasted, ev.Status) // no signer = no revert + + ev := getEvent(t, db, "fm-rpc-err") + require.Equal(t, store.StatusBroadcasted, ev.Status) +} + +func TestFundMigrationEVM_NotFound_SignerInfoMissing_StaysBroadcasted(t *testing.T) { + // Fund migration tx not found but OldTssPubkey is missing from the event + // payload → can't derive signer → defer. Uses the standard fund-migration + // helper which doesn't populate OldTssPubkey. + evtStore, db := setupTestDB(t) + builder := &mockTxBuilder{} + client := &mockChainClient{builder: builder} + ch := newTestChains(t, "eip155:1", uregistrytypes.VmType_EVM, client) + + insertBroadcastedFundMigrationEvent(t, db, "fm-no-pubkey", "eip155:1", "eip155:1:0xmigmissing", 42) + + builder.On("VerifyBroadcastedTx", mock.Anything, "0xmigmissing"). + Return(false, uint64(0), uint64(0), uint8(0), nil) + + resolver := newResolver(evtStore, ch) + resolver.processBroadcasted(context.Background()) + + ev := getEvent(t, db, "fm-no-pubkey") + require.Equal(t, store.StatusBroadcasted, ev.Status) + builder.AssertNotCalled(t, "GetNextNonce", mock.Anything, mock.Anything, mock.Anything) +} + +func TestFundMigrationEVM_VerifyError_StaysBroadcasted(t *testing.T) { + // VerifyBroadcastedTx errors → defer (retry next tick); no nonce check, no vote. + evtStore, db := setupTestDB(t) + builder := &mockTxBuilder{} + client := &mockChainClient{builder: builder} + ch := newTestChains(t, "eip155:1", uregistrytypes.VmType_EVM, client) + + insertBroadcastedFundMigrationEvent(t, db, "fm-verify-err", "eip155:1", "eip155:1:0xmigerr", 42) + + builder.On("VerifyBroadcastedTx", mock.Anything, "0xmigerr"). + Return(false, uint64(0), uint64(0), uint8(0), assert.AnError) + + resolver := newResolver(evtStore, ch) + resolver.processBroadcasted(context.Background()) + + ev := getEvent(t, db, "fm-verify-err") + require.Equal(t, store.StatusBroadcasted, ev.Status) + builder.AssertNotCalled(t, "GetNextNonce", mock.Anything, mock.Anything, mock.Anything) } func TestFundMigrationEVM_InsufficientConfirmations_Retries(t *testing.T) { @@ -547,11 +739,6 @@ func TestFundMigrationEVM_InsufficientConfirmations_Retries(t *testing.T) { } func TestConstants(t *testing.T) { - t.Run("maxNotFoundRetries is reasonable", func(t *testing.T) { - // At 30s interval, 10 retries = ~5 minutes - assert.Equal(t, 10, maxNotFoundRetries) - }) - t.Run("processBroadcastedBatchSize", func(t *testing.T) { assert.Equal(t, 100, processBroadcastedBatchSize) }) @@ -577,16 +764,6 @@ func TestNewResolverDefaults(t *testing.T) { }) assert.Equal(t, 45*time.Second, r.checkInterval) }) - - t.Run("notFoundCounts map is initialized", func(t *testing.T) { - evtStore, _ := setupTestDB(t) - r := NewResolver(Config{ - EventStore: evtStore, - Logger: zerolog.Nop(), - }) - assert.NotNil(t, r.notFoundCounts) - assert.Len(t, r.notFoundCounts, 0) - }) } func TestResolveOutboundEVM_Success_MarksCompleted(t *testing.T) { @@ -670,6 +847,164 @@ func TestResolveOutboundEVM_Reverted_NoPushSigner_StaysBroadcasted(t *testing.T) builder.AssertCalled(t, "GetGasFeeUsed", mock.Anything, "0xreverted") } +// makeOutboundEventDataWithNonce mirrors makeOutboundEventData but adds the +// `signing_data.nonce` field the EVM resolver consults on tx-not-found. +func makeOutboundEventDataWithNonce(txID, utxID, destChain string, nonce uint64) []byte { + b, _ := json.Marshal(map[string]any{ + "tx_id": txID, + "utx_id": utxID, + "destination_chain": destChain, + "signing_data": map[string]any{ + "nonce": nonce, + }, + }) + return b +} + +const testEVMTSSAddr = "0x4D353565442Eb33b66ef88E14336F3F4Bf3a02FB" + +func TestResolveOutboundEVM_NotFound_NonceConsumed_Reverts(t *testing.T) { + // Tx not found AND signed nonce < finalized nonce → another tx consumed + // the slot. Our tx is dead, REVERT path is taken. (No PushSigner means + // the vote is logged but status stays BROADCASTED; we verify the resolver + // took the REVERT branch by checking the nonce RPC was called.) + evtStore, db := setupTestDB(t) + builder := &mockTxBuilder{} + client := &mockChainClient{builder: builder} + ch := newTestChains(t, "eip155:1", uregistrytypes.VmType_EVM, client) + + eventData := makeOutboundEventDataWithNonce("tx-100", "utx-200", "eip155:1", 5) + insertBroadcastedEvent(t, db, "ev-consumed-1", "eip155:1", "eip155:1:0xmissing", eventData) + + builder.On("VerifyBroadcastedTx", mock.Anything, "0xmissing"). + Return(false, uint64(0), uint64(0), uint8(0), nil) + // Finalized nonce = 7 → our nonce 5 is past finalized → consumed. + builder.On("GetNextNonce", mock.Anything, testEVMTSSAddr, true).Return(uint64(7), nil) + + resolver := newResolverWithTSSAddress(evtStore, ch, testEVMTSSAddr) + resolver.processBroadcasted(context.Background()) + + ev := getEvent(t, db, "ev-consumed-1") + require.Equal(t, store.StatusBroadcasted, ev.Status, "no PushSigner → vote skipped, status unchanged") + builder.AssertCalled(t, "GetNextNonce", mock.Anything, testEVMTSSAddr, true) +} + +func TestResolveOutboundEVM_NotFound_NonceUnconsumed_RewindsToSigned(t *testing.T) { + // Tx not found AND signed nonce >= finalized nonce → tx may still land + // (or was dropped from mempool). Rewind to SIGNED so the broadcaster + // re-broadcasts. + evtStore, db := setupTestDB(t) + builder := &mockTxBuilder{} + client := &mockChainClient{builder: builder} + ch := newTestChains(t, "eip155:1", uregistrytypes.VmType_EVM, client) + + eventData := makeOutboundEventDataWithNonce("tx-100", "utx-200", "eip155:1", 5) + insertBroadcastedEvent(t, db, "ev-unconsumed-1", "eip155:1", "eip155:1:0xstillpending", eventData) + + builder.On("VerifyBroadcastedTx", mock.Anything, "0xstillpending"). + Return(false, uint64(0), uint64(0), uint8(0), nil) + // Finalized nonce = 5 → our nonce 5 not yet finalized. + builder.On("GetNextNonce", mock.Anything, testEVMTSSAddr, true).Return(uint64(5), nil) + + resolver := newResolverWithTSSAddress(evtStore, ch, testEVMTSSAddr) + resolver.processBroadcasted(context.Background()) + + ev := getEvent(t, db, "ev-unconsumed-1") + require.Equal(t, store.StatusSigned, ev.Status) +} + +func TestResolveOutboundEVM_NotFound_NonceRPCError_StaysBroadcasted(t *testing.T) { + // Tx not found and nonce RPC errors → defer (retry next tick). + evtStore, db := setupTestDB(t) + builder := &mockTxBuilder{} + client := &mockChainClient{builder: builder} + ch := newTestChains(t, "eip155:1", uregistrytypes.VmType_EVM, client) + + eventData := makeOutboundEventDataWithNonce("tx-100", "utx-200", "eip155:1", 5) + insertBroadcastedEvent(t, db, "ev-rpc-err-1", "eip155:1", "eip155:1:0xmissing", eventData) + + builder.On("VerifyBroadcastedTx", mock.Anything, "0xmissing"). + Return(false, uint64(0), uint64(0), uint8(0), nil) + builder.On("GetNextNonce", mock.Anything, testEVMTSSAddr, true).Return(uint64(0), assert.AnError) + + resolver := newResolverWithTSSAddress(evtStore, ch, testEVMTSSAddr) + resolver.processBroadcasted(context.Background()) + + ev := getEvent(t, db, "ev-rpc-err-1") + require.Equal(t, store.StatusBroadcasted, ev.Status) +} + +func TestResolveOutboundEVM_NotFound_SignedNonceMissing_StaysBroadcasted(t *testing.T) { + // Tx not found and event payload has no signing_data.nonce → can't run + // nonce check → defer. Uses the standard outbound helper which doesn't + // populate signing_data. + evtStore, db := setupTestDB(t) + builder := &mockTxBuilder{} + client := &mockChainClient{builder: builder} + ch := newTestChains(t, "eip155:1", uregistrytypes.VmType_EVM, client) + + eventData := makeOutboundEventData("tx-100", "utx-200", "eip155:1") + insertBroadcastedEvent(t, db, "ev-no-nonce", "eip155:1", "eip155:1:0xmissing", eventData) + + builder.On("VerifyBroadcastedTx", mock.Anything, "0xmissing"). + Return(false, uint64(0), uint64(0), uint8(0), nil) + + resolver := newResolverWithTSSAddress(evtStore, ch, testEVMTSSAddr) + resolver.processBroadcasted(context.Background()) + + ev := getEvent(t, db, "ev-no-nonce") + require.Equal(t, store.StatusBroadcasted, ev.Status) + builder.AssertNotCalled(t, "GetNextNonce", mock.Anything, mock.Anything, mock.Anything) +} + +func TestResolveOutboundEVM_NotFound_TSSAddressFetchError_StaysBroadcasted(t *testing.T) { + // Tx not found and GetTSSAddress callback errors → defer (retry next tick). + evtStore, db := setupTestDB(t) + builder := &mockTxBuilder{} + client := &mockChainClient{builder: builder} + ch := newTestChains(t, "eip155:1", uregistrytypes.VmType_EVM, client) + + eventData := makeOutboundEventDataWithNonce("tx-100", "utx-200", "eip155:1", 5) + insertBroadcastedEvent(t, db, "ev-tss-err", "eip155:1", "eip155:1:0xmissing", eventData) + + builder.On("VerifyBroadcastedTx", mock.Anything, "0xmissing"). + Return(false, uint64(0), uint64(0), uint8(0), nil) + + resolver := NewResolver(Config{ + EventStore: evtStore, + Chains: ch, + CheckInterval: 0, + Logger: zerolog.Nop(), + GetTSSAddress: func(ctx context.Context) (string, error) { return "", assert.AnError }, + }) + resolver.processBroadcasted(context.Background()) + + ev := getEvent(t, db, "ev-tss-err") + require.Equal(t, store.StatusBroadcasted, ev.Status) + builder.AssertNotCalled(t, "GetNextNonce", mock.Anything, mock.Anything, mock.Anything) +} + +func TestResolveOutboundEVM_NotFound_NoTSSAddressResolver_StaysBroadcasted(t *testing.T) { + // Tx not found and GetTSSAddress is nil → can't run nonce check → defer. + evtStore, db := setupTestDB(t) + builder := &mockTxBuilder{} + client := &mockChainClient{builder: builder} + ch := newTestChains(t, "eip155:1", uregistrytypes.VmType_EVM, client) + + eventData := makeOutboundEventDataWithNonce("tx-100", "utx-200", "eip155:1", 5) + insertBroadcastedEvent(t, db, "ev-no-tss-1", "eip155:1", "eip155:1:0xmissing", eventData) + + builder.On("VerifyBroadcastedTx", mock.Anything, "0xmissing"). + Return(false, uint64(0), uint64(0), uint8(0), nil) + + resolver := newResolver(evtStore, ch) // no GetTSSAddress configured + resolver.processBroadcasted(context.Background()) + + ev := getEvent(t, db, "ev-no-tss-1") + require.Equal(t, store.StatusBroadcasted, ev.Status) + builder.AssertNotCalled(t, "GetNextNonce", mock.Anything, mock.Anything, mock.Anything) +} + func TestResolveOutboundEVM_VerifyError_StaysBroadcasted(t *testing.T) { evtStore, db := setupTestDB(t) builder := &mockTxBuilder{} diff --git a/universalClient/tss/txresolver/svm.go b/universalClient/tss/txresolver/svm.go index 610fb07e5..7ad38dba0 100644 --- a/universalClient/tss/txresolver/svm.go +++ b/universalClient/tss/txresolver/svm.go @@ -2,16 +2,16 @@ package txresolver import ( "context" - "encoding/json" "time" "github.com/pushchain/push-chain-node/universalClient/store" + "github.com/pushchain/push-chain-node/universalClient/tss/txflow" ) // svmRevertSlackSeconds is the buffer past the signed deadline before the // resolver finalizes REVERT. Gives an in-flight tx that's already confirmed // time to reach `finalized` before we vote against it. -const svmRevertSlackSeconds int64 = 60 +const svmRevertSlackSeconds int64 = 30 // svmClusterStaleSeconds is how far the latest finalized block's timestamp // can lag wall-clock before the cluster is treated as halted or stalled — @@ -19,24 +19,6 @@ const svmRevertSlackSeconds int64 = 60 // txs, so we defer REVERT. const svmClusterStaleSeconds int64 = 120 -// svmEventEnvelope is the slice of the persisted outbound event the resolver -// needs to make a REVERT decision: just the chain-emitted signing deadline. -type svmEventEnvelope struct { - SigningDeadline int64 `json:"signing_deadline,omitempty"` -} - -// extractSVMDeadline returns the unix-second deadline emitted by Push chain on -// the OutboundCreatedEvent. Zero means the destination chain didn't configure -// a deadline window — caller falls back to the pre-deadline eager-revert -// behavior. -func extractSVMDeadline(event *store.Event) int64 { - var env svmEventEnvelope - if err := json.Unmarshal(event.EventData, &env); err != nil { - return 0 - } - return env.SigningDeadline -} - // resolveSVM checks the on-chain ExecutedTx PDA and moves the event to COMPLETED or REVERTED. // // The REVERT decision is gated on the cluster's own clock (latest finalized @@ -51,73 +33,59 @@ func extractSVMDeadline(event *store.Event) int64 { // - PDA absent + cluster stale (>120s old) → stay BROADCASTED, retry. // - PDA absent + cluster says still in window → stay BROADCASTED, retry. // - PDA absent + cluster confirms past deadline → REVERT. -// -// Legacy events (deadline = 0) preserve the pre-deadline eager-revert path — -// REVERT as soon as PDA is absent, no cluster check needed. func (r *Resolver) resolveSVM(ctx context.Context, event *store.Event, chainID string) { + log := r.logger.With(). + Str("event_id", event.EventID). + Str("type", event.Type). + Str("chain_id", chainID).Logger() + txID, utxID, err := extractOutboundIDs(event) if err != nil { - r.logger.Warn().Err(err).Str("event_id", event.EventID).Msg("failed to extract outbound IDs for SVM resolve") + log.Warn().Err(err).Msg("failed to extract outbound IDs for SVM resolve") return } + log = log.With().Str("tx_id", txID).Logger() client, err := r.chains.GetClient(chainID) if err != nil { - r.logger.Warn().Err(err).Str("event_id", event.EventID).Str("chain_id", chainID). - Msg("failed to get chain client for SVM resolve") + log.Warn().Err(err).Msg("failed to get chain client for SVM resolve") return } builder, err := client.GetTxBuilder() if err != nil { - r.logger.Warn().Err(err).Str("event_id", event.EventID).Str("chain_id", chainID). - Msg("failed to get tx builder for SVM resolve") + log.Warn().Err(err).Msg("failed to get tx builder for SVM resolve") return } executed, clusterTime, err := builder.IsAlreadyExecuted(ctx, txID) if err != nil { - r.logger.Debug().Err(err).Str("event_id", event.EventID).Str("tx_id", txID). - Msg("SVM PDA check failed, will retry next tick") + log.Debug().Err(err).Msg("SVM PDA check failed, will retry next tick") return } if executed { if err := r.eventStore.Update(event.EventID, map[string]any{"status": store.StatusCompleted}); err != nil { - r.logger.Warn().Err(err).Str("event_id", event.EventID).Msg("failed to mark SVM event COMPLETED") + log.Warn().Err(err).Msg("failed to mark SVM event COMPLETED") return } - r.logger.Info().Str("event_id", event.EventID).Str("tx_id", txID).Str("chain_id", chainID). - Msg("SVM ExecutedTx PDA found, marked COMPLETED") + log.Info().Msg("event marked as COMPLETED") return } // PDA absent. Decide REVERT using the cluster's own clock so we don't // false-revert during halt/stall or host clock skew. - deadline := extractSVMDeadline(event) - if deadline == 0 { - // Legacy event: no deadline, fall back to eager revert. - _ = r.voteOutboundFailureAndMarkReverted(ctx, event, txID, utxID, "", 0, "0", "tx not executed on destination chain") - return - } + deadline := txflow.ReadSigningDeadline(event) + dlog := log.With().Int64("signing_deadline", deadline).Int64("cluster_block_time", clusterTime).Logger() switch { case clusterTime == 0: - r.logger.Debug(). - Str("event_id", event.EventID).Str("tx_id", txID).Str("chain_id", chainID). - Msg("SVM cluster time unavailable, deferring REVERT decision") + dlog.Debug().Msg("SVM cluster time unavailable, deferring REVERT decision") return case time.Now().Unix()-clusterTime > svmClusterStaleSeconds: - r.logger.Warn(). - Str("event_id", event.EventID).Str("tx_id", txID).Str("chain_id", chainID). - Int64("cluster_block_time", clusterTime). - Msg("SVM cluster appears stale, deferring REVERT") + dlog.Warn().Msg("SVM cluster appears stale, deferring REVERT") return case clusterTime <= deadline+svmRevertSlackSeconds: - r.logger.Debug(). - Str("event_id", event.EventID).Str("tx_id", txID).Str("chain_id", chainID). - Int64("signing_deadline", deadline). - Int64("cluster_block_time", clusterTime). - Msg("SVM PDA absent but cluster clock still inside deadline window, will retry next tick") + dlog.Debug().Msg("SVM PDA absent but cluster clock still inside deadline window, will retry next tick") return }