diff --git a/changelog.md b/changelog.md index 105ca4b643..445560fdb4 100644 --- a/changelog.md +++ b/changelog.md @@ -36,6 +36,7 @@ The `zetacored` binary must be upgraded to trigger chain parameters data migrati * [4511](https://github.com/zeta-chain/node/pull/4511) - false mempool congested warning * [4509](https://github.com/zeta-chain/node/pull/4509) - use outbound schedule interval in sui cctx scheduling * [4514](https://github.com/zeta-chain/node/pull/4514) - use Zeta height as a factor to calculate the EVM chain artificial height for TSS keysign +* [4513](https://github.com/zeta-chain/node/pull/4513) - use `outbound_schedule_interval` and `outbound_schedule_lookahead` in ton cctx scheduling ### Tests diff --git a/pkg/constant/constant.go b/pkg/constant/constant.go index bc89124810..d39d35fbd2 100644 --- a/pkg/constant/constant.go +++ b/pkg/constant/constant.go @@ -46,4 +46,22 @@ const ( // CmdUpdateERC20CustodyPauseStatus is used for CCTX of type cmd to give the instruction to the TSS to update the pause status of the ERC20 custody contract CmdUpdateERC20CustodyPauseStatus = "cmd_update_erc20_custody_pause_status" + + // MaxNonceOffsetFactor is the factor to determine the maximum nonce offset between first pending CCTX and the CCTX being scheduled. + // By setting a maximum nonce offset, the CCTX scheduler will conditionally hold on and wait for older pending CCTXs to be processed + // before scheduling higher nonce CCTXs, to avoid missing any pending CCTXs. + // + // The missed pending CCTXs situation is like this: + // - Pending nonces: [1000, 1020) + // - Pending CCTXs: [979, 980, 981, 982, 1000, 1001, ..., 1019], where 979 - 982 are missed. + // - OutboundScheduleLookahead == 20 + // - MaxNonceOffsetFactor == 1.0, results in max nonce offset = 20 * 1.0 = 20 + // + // In this case, the scheduler + // - should process CCTXs with nonces [979, 980, 981, 982] because they are missed pending CCTXs. + // - should NOT process CCTXs with nonces [1000, 1001, ..., 1019] because their nonces are much + // higher that of the first pending CCTX (1000 - 979 > 20). + // + // NOTE: 1.0 means use the same value as lookahead for the maximum nonce offset. + MaxNonceOffsetFactor = 1.0 ) diff --git a/zetaclient/chains/evm/evm.go b/zetaclient/chains/evm/evm.go index dccf478dfb..c3da06a06e 100644 --- a/zetaclient/chains/evm/evm.go +++ b/zetaclient/chains/evm/evm.go @@ -10,6 +10,7 @@ import ( "github.com/rs/zerolog" "github.com/zeta-chain/node/pkg/chains" + "github.com/zeta-chain/node/pkg/constant" "github.com/zeta-chain/node/pkg/scheduler" "github.com/zeta-chain/node/pkg/ticker" "github.com/zeta-chain/node/zetaclient/chains/base" @@ -25,14 +26,6 @@ type EVM struct { signer *signer.Signer } -const ( - // outboundLookBackFactor is the factor to determine how many nonces to look back for pending cctxs - // For example, give OutboundScheduleLookahead of 120, pending NonceLow of 1000 and factor of 1.0, - // the scheduler need to be able to pick up and schedule any pending cctx with nonce < 880 (1000 - 120 * 1.0) - // NOTE: 1.0 means look back the same number of cctxs as we look ahead - outboundLookBackFactor = 1.0 -) - func New(scheduler *scheduler.Scheduler, observer *observer.Observer, signer *signer.Signer) *EVM { return &EVM{ scheduler: scheduler, @@ -135,7 +128,7 @@ func (e *EVM) scheduleCCTX(ctx context.Context) error { chainID = e.observer.Chain().ChainId lookahead = e.observer.ChainParams().OutboundScheduleLookahead // #nosec G115 always in range - outboundScheduleLookBack = uint64(float64(lookahead) * outboundLookBackFactor) + maxNonceOffset = uint64(float64(lookahead) * constant.MaxNonceOffsetFactor) ) cctxList, err := e.observer.ZetaRepo().GetPendingCCTXs(ctx) @@ -162,7 +155,7 @@ func (e *EVM) scheduleCCTX(ctx context.Context) error { switch { case params.ReceiverChainId != chainID: return fmt.Errorf("chain id mismatch: want %d, got %d", chainID, params.ReceiverChainId) - case params.TssNonce > cctxList[0].GetCurrentOutboundParam().TssNonce+outboundScheduleLookBack: + case params.TssNonce > cctxList[0].GetCurrentOutboundParam().TssNonce+maxNonceOffset: return fmt.Errorf( "nonce %d is too high (%s). Earliest nonce %d", params.TssNonce, diff --git a/zetaclient/chains/solana/solana.go b/zetaclient/chains/solana/solana.go index aedccd38d5..675e0695a1 100644 --- a/zetaclient/chains/solana/solana.go +++ b/zetaclient/chains/solana/solana.go @@ -8,6 +8,7 @@ import ( "github.com/pkg/errors" "github.com/zeta-chain/node/pkg/chains" + "github.com/zeta-chain/node/pkg/constant" "github.com/zeta-chain/node/pkg/scheduler" "github.com/zeta-chain/node/pkg/ticker" "github.com/zeta-chain/node/zetaclient/chains/base" @@ -17,14 +18,6 @@ import ( "github.com/zeta-chain/node/zetaclient/logs" ) -const ( - // outboundLookbackFactor is the factor to determine how many nonces to look back for pending cctxs - // For example, give OutboundScheduleLookahead of 120, pending NonceLow of 1000 and factor of 1.0, - // the scheduler need to be able to pick up and schedule any pending cctx with nonce < 880 (1000 - 120 * 1.0) - // NOTE: 1.0 means look back the same number of cctxs as we look ahead - outboundLookbackFactor = 1.0 -) - // Solana represents Solana observer-signer. type Solana struct { scheduler *scheduler.Scheduler @@ -136,7 +129,7 @@ func (s *Solana) scheduleCCTX(ctx context.Context) error { // #nosec G115 positive interval = uint64(s.observer.ChainParams().OutboundScheduleInterval) scheduleLookahead = s.observer.ChainParams().OutboundScheduleLookahead - scheduleLookback = uint64(float64(scheduleLookahead) * outboundLookbackFactor) + maxNonceOffset = uint64(float64(scheduleLookahead) * constant.MaxNonceOffsetFactor) needsProcessingCtr = 0 ) @@ -146,7 +139,7 @@ func (s *Solana) scheduleCCTX(ctx context.Context) error { } // schedule keysign for each pending cctx - for _, cctx := range cctxList { + for i, cctx := range cctxList { var ( params = cctx.GetCurrentOutboundParam() inboundParams = cctx.GetInboundParams() @@ -157,10 +150,13 @@ func (s *Solana) scheduleCCTX(ctx context.Context) error { logger := s.observer.Logger().Outbound.With().Str(logs.FieldOutboundID, outboundID).Logger() switch { + case int64(i) == scheduleLookahead: + // stop if lookahead is reached + return nil case params.ReceiverChainId != chainID: logger.Error().Msg("chain id mismatch") continue - case params.TssNonce > cctxList[0].GetCurrentOutboundParam().TssNonce+scheduleLookback: + case params.TssNonce > cctxList[0].GetCurrentOutboundParam().TssNonce+maxNonceOffset: return fmt.Errorf( "nonce %d is too high (%s). Earliest nonce %d", params.TssNonce, diff --git a/zetaclient/chains/sui/sui.go b/zetaclient/chains/sui/sui.go index 91522755ac..91709ef8de 100644 --- a/zetaclient/chains/sui/sui.go +++ b/zetaclient/chains/sui/sui.go @@ -10,6 +10,7 @@ import ( "github.com/zeta-chain/node/pkg/bg" "github.com/zeta-chain/node/pkg/chains" + "github.com/zeta-chain/node/pkg/constant" "github.com/zeta-chain/node/pkg/scheduler" "github.com/zeta-chain/node/pkg/ticker" "github.com/zeta-chain/node/zetaclient/chains/base" @@ -26,14 +27,6 @@ type Sui struct { signer *signer.Signer } -const ( - // outboundLookbackFactor is the factor to determine how many nonces to look back for pending cctxs - // For example, give OutboundScheduleLookahead of 120, pending NonceLow of 1000 and factor of 1.0, - // the scheduler need to be able to pick up and schedule any pending cctx with nonce < 880 (1000 - 120 * 1.0) - // NOTE: 1.0 means look back the same number of cctxs as we look ahead - outboundLookbackFactor = 1.0 -) - // New Sui observer-signer constructor. func New(scheduler *scheduler.Scheduler, observer *observer.Observer, signer *signer.Signer) *Sui { return &Sui{scheduler, observer, signer} @@ -140,10 +133,10 @@ func (s *Sui) scheduleCCTX(ctx context.Context) error { interval = uint64(s.observer.ChainParams().OutboundScheduleInterval) lookahead = s.observer.ChainParams().OutboundScheduleLookahead // #nosec G115 always in range - lookback = uint64(float64(lookahead) * outboundLookbackFactor) + maxNonceOffset = uint64(float64(lookahead) * constant.MaxNonceOffsetFactor) firstNonce = cctxList[0].GetCurrentOutboundParam().TssNonce - maxNonce = firstNonce + lookback + maxNonce = firstNonce + maxNonceOffset ) for i, cctx := range cctxList { @@ -151,19 +144,18 @@ func (s *Sui) scheduleCCTX(ctx context.Context) error { outboundID = base.OutboundIDFromCCTX(cctx) outboundParams = cctx.GetCurrentOutboundParam() nonce = outboundParams.TssNonce + logger = s.outboundLogger(outboundID) ) - logger := s.outboundLogger(outboundID) - switch { case int64(i) == lookahead: - // take only first N cctxs + // stop if lookahead is reached return nil case outboundParams.ReceiverChainId != chainID: // should not happen logger.Error().Msg("chain id mismatch") continue - case nonce >= maxNonce: + case nonce > maxNonce: return fmt.Errorf("nonce %d is too high (%s). Earliest nonce %d", nonce, outboundID, firstNonce) case s.signer.IsOutboundActive(outboundID): // cctx is already being processed & broadcasted by signer diff --git a/zetaclient/chains/ton/ton.go b/zetaclient/chains/ton/ton.go index 320b1b871e..d06f3eccfd 100644 --- a/zetaclient/chains/ton/ton.go +++ b/zetaclient/chains/ton/ton.go @@ -9,9 +9,9 @@ import ( "github.com/rs/zerolog" "github.com/zeta-chain/node/pkg/chains" + "github.com/zeta-chain/node/pkg/constant" "github.com/zeta-chain/node/pkg/scheduler" "github.com/zeta-chain/node/pkg/ticker" - "github.com/zeta-chain/node/x/crosschain/types" "github.com/zeta-chain/node/zetaclient/chains/base" "github.com/zeta-chain/node/zetaclient/chains/ton/observer" "github.com/zeta-chain/node/zetaclient/chains/ton/signer" @@ -121,48 +121,70 @@ func (t *TON) scheduleCCTX(ctx context.Context) error { time.Sleep(delay) - // #nosec G115 always in range - zetaHeight := uint64(zetaBlock.Block.Height) - cctxs, err := t.observer.ZetaRepo().GetPendingCCTXs(ctx) if err != nil { return err } - for _, cctx := range cctxs { - outboundID := base.OutboundIDFromCCTX(cctx) - err := t.processCCTX(ctx, outboundID, cctx, zetaHeight) - if err != nil { - t.outboundLogger(outboundID).Error().Err(err).Msg("failed to schedule CCTX") - } + // no-op + if len(cctxs) == 0 { + return nil } - return nil -} + var ( + // #nosec G115 always in range + zetaHeight = uint64(zetaBlock.Block.Height) + chainID = t.observer.Chain().ChainId -func (t *TON) processCCTX(ctx context.Context, - outboundID string, - cctx *types.CrossChainTx, - zetaHeight uint64, -) error { - switch { - case t.signer.IsOutboundActive(outboundID): - return nil //no-op - case cctx.GetCurrentOutboundParam().ReceiverChainId != t.observer.Chain().ChainId: - return errors.New("chain id mismatch") - } + // #nosec G115 positive + interval = uint64(t.observer.ChainParams().OutboundScheduleInterval) + lookahead = t.observer.ChainParams().OutboundScheduleLookahead + // #nosec G115 always in range + maxNonceOffset = uint64(float64(lookahead) * constant.MaxNonceOffsetFactor) - // vote outbound if it's already confirmed - continueKeySign, err := t.observer.VoteOutboundIfConfirmed(ctx, cctx) - if err != nil { - return errors.Wrap(err, "failed to VoteOutboundIfConfirmed") - } - if !continueKeySign { - t.outboundLogger(outboundID).Info().Msg("schedule CCTX: outbound already processed") - return nil - } + firstNonce = cctxs[0].GetCurrentOutboundParam().TssNonce + maxNonce = firstNonce + maxNonceOffset + ) + + for i, cctx := range cctxs { + var ( + outboundID = base.OutboundIDFromCCTX(cctx) + outboundParams = cctx.GetCurrentOutboundParam() + nonce = outboundParams.TssNonce + logger = t.outboundLogger(outboundID) + ) + + switch { + case int64(i) == lookahead: + // stop if lookahead is reached + return nil + case outboundParams.ReceiverChainId != chainID: + // should not happen + logger.Error().Msg("chain id mismatch") + continue + case nonce > maxNonce: + return fmt.Errorf("nonce %d is too high (%s). Earliest nonce %d", nonce, outboundID, firstNonce) + case t.signer.IsOutboundActive(outboundID): + // cctx is already being processed & broadcasted by signer + continue + } + + // vote outbound if it's already confirmed + continueKeysign, err := t.observer.VoteOutboundIfConfirmed(ctx, cctx) + if err != nil { + logger.Error().Err(err).Msg("call to VoteOutboundIfConfirmed failed") + continue + } + if !continueKeysign { + logger.Info().Msg("outbound already processed") + continue + } - go t.signer.TryProcessOutbound(ctx, cctx, t.observer.ZetaRepo(), zetaHeight) + // schedule keysign if the interval has arrived + if nonce%interval == zetaHeight%interval { + go t.signer.TryProcessOutbound(ctx, cctx, t.observer.ZetaRepo(), zetaHeight) + } + } return nil }