From f14bd680a091dfb6b87f11890a6f6b09da9c7543 Mon Sep 17 00:00:00 2001 From: mpetrun5 Date: Thu, 12 Feb 2026 16:40:48 +0100 Subject: [PATCH 1/5] feat: validate lighter order value --- app/app.go | 1 + chains/lighter/config.go | 8 +++++ chains/lighter/config_test.go | 21 ++++++++++-- chains/lighter/message/lighter.go | 27 +++++++++++++-- chains/lighter/message/lighter_test.go | 47 +++++++++++++++++++++++++- 5 files changed, 98 insertions(+), 6 deletions(-) diff --git a/app/app.go b/app/app.go index 5bff102d..49552d1a 100644 --- a/app/app.go +++ b/app/app.go @@ -391,6 +391,7 @@ func Run() error { lighterConfig.WithdrawalAddress, lighterConfig.UsdcAddress, lighterConfig.RepaymentAddress, + lighterConfig.ConfirmationsByValue, lighterAPI, coordinator, host, diff --git a/chains/lighter/config.go b/chains/lighter/config.go index a7404eb0..e7e94cda 100644 --- a/chains/lighter/config.go +++ b/chains/lighter/config.go @@ -19,6 +19,8 @@ type LighterConfig struct { WithdrawalAddress common.Address UsdcAddress common.Address RepaymentAddress string + // usd bucket -> confirmations + ConfirmationsByValue map[uint64]uint64 } func NewLighterConfig(solverConfig solverConfig.SolverConfig) (*LighterConfig, error) { @@ -37,6 +39,12 @@ func NewLighterConfig(solverConfig solverConfig.SolverConfig) (*LighterConfig, e return nil, fmt.Errorf("withdrawal address not configured") } + confirmations := make(map[uint64]uint64) + for _, confirmation := range solverConfig.Chains[LIGHTER_CAIP].Confirmations { + // nolint:gosec + confirmations[uint64(confirmation.MaxAmountUSD)] = uint64(confirmation.Confirmations) + } + return &LighterConfig{ WithdrawalAddress: common.HexToAddress(withdrawalAddress), RepaymentAddress: solverConfig.ProtocolsMetadata.Lighter.RepaymentAddress, diff --git a/chains/lighter/config_test.go b/chains/lighter/config_test.go index f843e17b..5dbc80dc 100644 --- a/chains/lighter/config_test.go +++ b/chains/lighter/config_test.go @@ -71,9 +71,23 @@ func (s *NewLighterConfigTestSuite) Test_ValidConfig() { Decimals: 6, } + expectedBlockConfirmations := make(map[uint64]uint64) + expectedBlockConfirmations[1000] = 5 + expectedBlockConfirmations[2000] = 10 + solverChains := make(map[string]solverConfig.Chain) solverChains["eip155:42161"] = solverConfig.Chain{ Tokens: tokens, + Confirmations: []solverConfig.Confirmations{ + { + Confirmations: 5, + MaxAmountUSD: 1000, + }, + { + Confirmations: 10, + MaxAmountUSD: 2000, + }, + }, } config, err := lighter.NewLighterConfig(solverConfig.SolverConfig{ @@ -90,8 +104,9 @@ func (s *NewLighterConfigTestSuite) Test_ValidConfig() { s.Nil(err) s.Equal(config, &lighter.LighterConfig{ - WithdrawalAddress: common.HexToAddress("withdrawal"), - UsdcAddress: common.HexToAddress("usdc"), - RepaymentAddress: "3", + WithdrawalAddress: common.HexToAddress("withdrawal"), + UsdcAddress: common.HexToAddress("usdc"), + RepaymentAddress: "3", + ConfirmationsByValue: expectedBlockConfirmations, }) } diff --git a/chains/lighter/message/lighter.go b/chains/lighter/message/lighter.go index d8d0fd1d..f82f3324 100644 --- a/chains/lighter/message/lighter.go +++ b/chains/lighter/message/lighter.go @@ -5,7 +5,10 @@ import ( "encoding/json" "errors" "fmt" + "maps" + "math" "math/big" + "slices" "strconv" "github.com/ethereum/go-ethereum/common" @@ -24,8 +27,9 @@ import ( ) var ( - ARBITRUM_CHAIN_ID = big.NewInt(42161) - USDC_ACCOUNT_INDEX uint64 = 3 + ARBITRUM_CHAIN_ID = big.NewInt(42161) + USDC_ACCOUNT_INDEX uint64 = 3 + USDC_DECIMALS float64 = 6 ) type Coordinator interface { @@ -47,12 +51,14 @@ type LighterMessageHandler struct { usdcAddress common.Address repaymentAccount string txFetcher TxFetcher + confirmations map[uint64]uint64 } func NewLighterMessageHandler( lighterAddress common.Address, usdcAddress common.Address, repaymentAccount string, + confirmations map[uint64]uint64, txFetcher TxFetcher, coordinator Coordinator, host host.Host, @@ -70,6 +76,7 @@ func NewLighterMessageHandler( comm: comm, fetcher: fetcher, sigChn: sigChn, + confirmations: confirmations, } } @@ -149,9 +156,25 @@ func (h *LighterMessageHandler) verifyWithdrawal(tx *lighter.LighterTx) error { return errors.New("only usdc asset supported on lighter") } + if err := h.verifyOrderSize(tx.Transfer.Amount / uint64(math.Pow(10, USDC_DECIMALS))); err != nil { + return err + } + return nil } +func (h *LighterMessageHandler) verifyOrderSize(orderValue uint64) error { + buckets := slices.Collect(maps.Keys(h.confirmations)) + slices.Sort(buckets) + for _, bucket := range buckets { + if orderValue < bucket { + return nil + } + } + + return fmt.Errorf("order value %d exceeds confirmation buckets", orderValue) +} + func (h *LighterMessageHandler) calldata(tx *lighter.LighterTx) ([]byte, error) { return consts.LighterABI.Pack( "fulfillWithdraw", diff --git a/chains/lighter/message/lighter_test.go b/chains/lighter/message/lighter_test.go index 5f4cb885..43ad9f06 100644 --- a/chains/lighter/message/lighter_test.go +++ b/chains/lighter/message/lighter_test.go @@ -54,11 +54,14 @@ func (s *LighterMessageHandlerTestSuite) SetupTest() { s.mockTxFetcher = mock_message.NewMockTxFetcher(ctrl) s.sigChn = make(chan interface{}, 1) + confirmations := make(map[uint64]uint64) + confirmations[200] = 0 s.handler = message.NewLighterMessageHandler( common.Address{}, common.Address{}, "3", + confirmations, s.mockTxFetcher, s.mockCoordinator, s.mockHost, @@ -173,7 +176,7 @@ func (s *LighterMessageHandlerTestSuite) Test_HandleMessage_InvalidAsset() { DepositTxHash: "orderHash", } s.mockTxFetcher.EXPECT().GetTx(ad.OrderHash).Return(&lighter.LighterTx{ - Type: lighter.TxTypeL2Withdraw, + Type: lighter.TxTypeL2Transfer, Transfer: &lighter.Transfer{ Amount: 2000001, AssetIndex: 2, @@ -238,6 +241,48 @@ func (s *LighterMessageHandlerTestSuite) Test_HandleMessage_InvalidAccount() { s.NotNil(err) } +func (s *LighterMessageHandlerTestSuite) Test_HandleMessage_InvalidOrderValue() { + s.mockCommunication.EXPECT().Broadcast( + gomock.Any(), + gomock.Any(), + comm.LighterMsg, + "lighter", + ).Return(nil) + p, _ := pstoremem.NewPeerstore() + s.mockHost.EXPECT().Peerstore().Return(p) + + errChn := make(chan error, 1) + ad := &message.LighterData{ + ErrChn: errChn, + Nonce: big.NewInt(101), + LiquidityPool: common.HexToAddress("0xbe526bA5d1ad94cC59D7A79d99A59F607d31A657"), + OrderHash: "orderHash", + DepositTxHash: "orderHash", + } + s.mockTxFetcher.EXPECT().GetTx(ad.OrderHash).Return(&lighter.LighterTx{ + Type: lighter.TxTypeL2Transfer, + Transfer: &lighter.Transfer{ + Amount: 200000001, + AssetIndex: 3, + ToAccountIndex: 3, + Memo: []byte{238, 123, 250, 212, 202, 237, 62, 98, 106, 248, 169, 199, 213, 3, 76, 213, 137, 238, 73, 144, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + }, + }, nil) + + m := &coreMessage.Message{ + Data: ad, + Source: 0, + Destination: 10, + } + prop, err := s.handler.HandleMessage(m) + + s.Nil(prop) + s.NotNil(err) + + err = <-errChn + s.NotNil(err) +} + func (s *LighterMessageHandlerTestSuite) Test_HandleMessage_MissingTx() { s.mockCommunication.EXPECT().Broadcast( gomock.Any(), From e8657415779cb1506f44c443cfdb753cbd188271 Mon Sep 17 00:00:00 2001 From: mpetrun5 Date: Thu, 12 Feb 2026 17:26:21 +0100 Subject: [PATCH 2/5] Update tests --- chains/lighter/config.go | 7 ++++--- chains/lighter/config_test.go | 2 ++ 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/chains/lighter/config.go b/chains/lighter/config.go index e7e94cda..60dd75c7 100644 --- a/chains/lighter/config.go +++ b/chains/lighter/config.go @@ -46,8 +46,9 @@ func NewLighterConfig(solverConfig solverConfig.SolverConfig) (*LighterConfig, e } return &LighterConfig{ - WithdrawalAddress: common.HexToAddress(withdrawalAddress), - RepaymentAddress: solverConfig.ProtocolsMetadata.Lighter.RepaymentAddress, - UsdcAddress: common.HexToAddress(usdcConfig.Address), + WithdrawalAddress: common.HexToAddress(withdrawalAddress), + RepaymentAddress: solverConfig.ProtocolsMetadata.Lighter.RepaymentAddress, + UsdcAddress: common.HexToAddress(usdcConfig.Address), + ConfirmationsByValue: confirmations, }, nil } diff --git a/chains/lighter/config_test.go b/chains/lighter/config_test.go index 5dbc80dc..4f6f5792 100644 --- a/chains/lighter/config_test.go +++ b/chains/lighter/config_test.go @@ -78,6 +78,8 @@ func (s *NewLighterConfigTestSuite) Test_ValidConfig() { solverChains := make(map[string]solverConfig.Chain) solverChains["eip155:42161"] = solverConfig.Chain{ Tokens: tokens, + } + solverChains["lighter:1"] = solverConfig.Chain{ Confirmations: []solverConfig.Confirmations{ { Confirmations: 5, From fc2dfd8b27c7962b58aa64825e2cc0e041d8a7c0 Mon Sep 17 00:00:00 2001 From: mpetrun5 Date: Mon, 16 Feb 2026 16:10:50 +0100 Subject: [PATCH 3/5] feat: add process time histogram metrics --- app/app.go | 32 +++++++++--------- cache/signature.go | 12 +++++-- cache/signature_test.go | 9 ++++- metrics/metrics.go | 6 ++-- metrics/mpc.go | 32 ++++++++++++++++++ tss/coordinator.go | 9 +++++ tss/ecdsa/keygen/keygen_test.go | 6 ++-- tss/ecdsa/resharing/resharing_test.go | 10 +++--- tss/ecdsa/signing/signing_test.go | 8 ++--- tss/mock/coordinator.go | 48 +++++++++++++++++++++++++++ tss/test/utils.go | 3 ++ 11 files changed, 141 insertions(+), 34 deletions(-) diff --git a/app/app.go b/app/app.go index 49552d1a..3d88aa95 100644 --- a/app/app.go +++ b/app/app.go @@ -34,6 +34,7 @@ import ( "github.com/sprintertech/sprinter-signing/chains/evm/calls/events" evmListener "github.com/sprintertech/sprinter-signing/chains/evm/listener" evmMessage "github.com/sprintertech/sprinter-signing/chains/evm/message" + "github.com/sprintertech/sprinter-signing/metrics" lifiConfig "github.com/sprintertech/lifi-solver/pkg/config" "github.com/sprintertech/sprinter-signing/chains/lighter" @@ -44,7 +45,6 @@ import ( "github.com/sprintertech/sprinter-signing/config" "github.com/sprintertech/sprinter-signing/jobs" "github.com/sprintertech/sprinter-signing/keyshare" - "github.com/sprintertech/sprinter-signing/metrics" "github.com/sprintertech/sprinter-signing/price" "github.com/sprintertech/sprinter-signing/protocol/across" "github.com/sprintertech/sprinter-signing/protocol/lifi" @@ -109,16 +109,8 @@ func Run() error { panicOnError(err) log.Info().Str("peerID", host.ID().String()).Msg("Successfully created libp2p host") - communication := p2p.NewCommunication(host, "p2p/sprinter") - electorFactory := elector.NewCoordinatorElectorFactory(host, configuration.RelayerConfig.BullyConfig) - coordinator := tss.NewCoordinator(host, communication, electorFactory) - - db, err := lvldb.NewLvlDB(viper.GetString(config.BlockstoreFlagName)) - if err != nil { - panicOnError(err) - } - blockstore := store.NewBlockStore(db) - keyshareStore := keyshare.NewECDSAKeyshareStore(configuration.RelayerConfig.MpcConfig.KeysharePath) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() mp, err := observability.InitMetricProvider(context.Background(), configuration.RelayerConfig.OpenTelemetryCollectorURL) panicOnError(err) @@ -127,14 +119,22 @@ func Run() error { log.Error().Msgf("Error shutting down meter provider: %v", err) } }() + sygmaMetrics, err := metrics.NewSprinterMetrics(ctx, mp.Meter("relayer-metric-provider"), configuration.RelayerConfig.Env, configuration.RelayerConfig.Id, Version) + if err != nil { + panic(err) + } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + communication := p2p.NewCommunication(host, "p2p/sprinter") + electorFactory := elector.NewCoordinatorElectorFactory(host, configuration.RelayerConfig.BullyConfig) + coordinator := tss.NewCoordinator(host, communication, sygmaMetrics, electorFactory) - sygmaMetrics, err := metrics.NewSygmaMetrics(ctx, mp.Meter("relayer-metric-provider"), configuration.RelayerConfig.Env, configuration.RelayerConfig.Id, Version) + db, err := lvldb.NewLvlDB(viper.GetString(config.BlockstoreFlagName)) if err != nil { - panic(err) + panicOnError(err) } + blockstore := store.NewBlockStore(db) + keyshareStore := keyshare.NewECDSAKeyshareStore(configuration.RelayerConfig.MpcConfig.KeysharePath) + msgChan := make(chan []*message.Message) sigChn := make(chan interface{}) @@ -142,7 +142,7 @@ func Run() error { configuration.RelayerConfig.CoinmarketcapConfig.Url, configuration.RelayerConfig.CoinmarketcapConfig.ApiKey) - signatureCache := cache.NewSignatureCache(communication) + signatureCache := cache.NewSignatureCache(communication, sygmaMetrics) go signatureCache.Watch(ctx, sigChn) supportedChains := make(map[uint64]struct{}) diff --git a/cache/signature.go b/cache/signature.go index 8d4baa32..04b07495 100644 --- a/cache/signature.go +++ b/cache/signature.go @@ -16,12 +16,17 @@ const ( SIGNATURE_TTL = time.Minute * 10 ) +type Metrics interface { + EndProcess(sessionID string) +} + type SignatureCache struct { sigCache *ttlcache.Cache[string, []byte] comm comm.Communication + metrics Metrics } -func NewSignatureCache(c comm.Communication) *SignatureCache { +func NewSignatureCache(c comm.Communication, metrics Metrics) *SignatureCache { cache := ttlcache.New( ttlcache.WithTTL[string, []byte](SIGNATURE_TTL), ) @@ -29,6 +34,7 @@ func NewSignatureCache(c comm.Communication) *SignatureCache { sc := &SignatureCache{ sigCache: cache, comm: c, + metrics: metrics, } go cache.Start() @@ -43,7 +49,7 @@ func (s *SignatureCache) Subscribe(ctx context.Context, id string, sigChannel ch return } - ping := time.Tick(time.Millisecond * 250) + ping := time.Tick(time.Millisecond * 50) for { select { case <-ctx.Done(): @@ -81,6 +87,7 @@ func (s *SignatureCache) Watch(ctx context.Context, sigChn chan interface{}) { { sig := sig.(signing.EcdsaSignature) s.sigCache.Set(sig.ID, sig.Signature, ttlcache.DefaultTTL) + s.metrics.EndProcess(sig.ID) } case msg := <-msgChn: { @@ -92,6 +99,7 @@ func (s *SignatureCache) Watch(ctx context.Context, sigChn chan interface{}) { log.Debug().Msgf("Received signature for ID: %s", msg.ID) s.sigCache.Set(msg.ID, msg.Signature, ttlcache.DefaultTTL) + s.metrics.EndProcess(msg.ID) } case <-ctx.Done(): { diff --git a/cache/signature_test.go b/cache/signature_test.go index 6016af69..8f43c163 100644 --- a/cache/signature_test.go +++ b/cache/signature_test.go @@ -10,6 +10,7 @@ import ( mock_communication "github.com/sprintertech/sprinter-signing/comm/mock" "github.com/sprintertech/sprinter-signing/tss/ecdsa/signing" "github.com/sprintertech/sprinter-signing/tss/message" + mock_tss "github.com/sprintertech/sprinter-signing/tss/mock" "github.com/stretchr/testify/suite" "go.uber.org/mock/gomock" ) @@ -21,6 +22,7 @@ type SignatureCacheTestSuite struct { ctx context.Context mockCommunication *mock_communication.MockCommunication + mockMetrics *mock_tss.MockMetrics cancel context.CancelFunc sigChn chan interface{} msgChn chan *comm.WrappedMessage @@ -46,7 +48,8 @@ func (s *SignatureCacheTestSuite) SetupTest() { s.cancel = cancel s.ctx = ctx - s.sc = cache.NewSignatureCache(s.mockCommunication) + s.mockMetrics = mock_tss.NewMockMetrics(gomock.NewController(s.T())) + s.sc = cache.NewSignatureCache(s.mockCommunication, s.mockMetrics) go s.sc.Watch(s.ctx, s.sigChn) time.Sleep(time.Millisecond * 100) } @@ -65,6 +68,7 @@ func (s *SignatureCacheTestSuite) Test_Signature_ValidSignatureResult() { Signature: []byte("signature"), ID: "signatureID", } + s.mockMetrics.EXPECT().EndProcess(expectedSig.ID) s.sigChn <- expectedSig time.Sleep(time.Millisecond * 100) @@ -79,6 +83,7 @@ func (s *SignatureCacheTestSuite) Test_Signature_ValidMessage() { Signature: []byte("signature"), ID: "signatureID", } + s.mockMetrics.EXPECT().EndProcess(expectedSig.ID) wMsgBytes, _ := message.MarshalSignatureMessage(expectedSig.ID, expectedSig.Signature) wMsg := &comm.WrappedMessage{ Payload: wMsgBytes, @@ -98,6 +103,7 @@ func (s *SignatureCacheTestSuite) Test_Subscribe_ValidMessage_EarlyExit() { Signature: []byte("signature"), ID: "signatureID", } + s.mockMetrics.EXPECT().EndProcess(expectedSig.ID) wMsgBytes, _ := message.MarshalSignatureMessage(expectedSig.ID, expectedSig.Signature) wMsg := &comm.WrappedMessage{ Payload: wMsgBytes, @@ -122,6 +128,7 @@ func (s *SignatureCacheTestSuite) Test_Subscribe_ValidMessage() { Signature: []byte("signature"), ID: "signatureID", } + s.mockMetrics.EXPECT().EndProcess(expectedSig.ID) wMsgBytes, _ := message.MarshalSignatureMessage(expectedSig.ID, expectedSig.Signature) wMsg := &comm.WrappedMessage{ Payload: wMsgBytes, diff --git a/metrics/metrics.go b/metrics/metrics.go index 2e5455eb..328e84b0 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -11,14 +11,14 @@ import ( api "go.opentelemetry.io/otel/metric" ) -type SygmaMetrics struct { +type SprinterMetrics struct { *observability.RelayerMetrics *MpcMetrics *HostMetrics } // NewSygmaMetrics creates an instance of metrics -func NewSygmaMetrics(ctx context.Context, meter api.Meter, env, relayerID, version string) (*SygmaMetrics, error) { +func NewSprinterMetrics(ctx context.Context, meter api.Meter, env, relayerID, version string) (*SprinterMetrics, error) { attributes := []attribute.KeyValue{attribute.String("relayerid", relayerID), attribute.String("env", env), attribute.String("version", version)} opts := api.WithAttributes(attributes...) relayerMetrics, err := observability.NewRelayerMetrics(ctx, meter, attributes...) @@ -36,7 +36,7 @@ func NewSygmaMetrics(ctx context.Context, meter api.Meter, env, relayerID, versi return nil, err } - return &SygmaMetrics{ + return &SprinterMetrics{ RelayerMetrics: relayerMetrics, MpcMetrics: mpcMetrics, HostMetrics: hostMetrics, diff --git a/metrics/mpc.go b/metrics/mpc.go index 55200161..f281ff01 100644 --- a/metrics/mpc.go +++ b/metrics/mpc.go @@ -2,8 +2,11 @@ package metrics import ( "context" + "sync" + "time" "github.com/libp2p/go-libp2p/core/peer" + "github.com/rs/zerolog/log" "go.opentelemetry.io/otel/metric" ) @@ -12,6 +15,10 @@ type MpcMetrics struct { availableRelayersGauge metric.Int64ObservableGauge totalRelayerCount *int64 availableRelayerCount *int64 + + sessionTimeHistogram metric.Float64Histogram + sessionStartTime map[string]time.Time + histogramMutex sync.Mutex } // NewMpcMetrics initializes metrics related to the MPC set @@ -41,11 +48,16 @@ func NewMpcMetrics(ctx context.Context, meter metric.Meter, opts metric.Measurem return nil, err } + sessionTimeHistogram, err := meter.Float64Histogram("relayer.SessionTime") + return &MpcMetrics{ totalRelayersGauge: totalRelayersGauge, availableRelayersGauge: availableRelayersGauge, totalRelayerCount: totalRelayerCount, availableRelayerCount: availableRelayerCount, + sessionTimeHistogram: sessionTimeHistogram, + sessionStartTime: make(map[string]time.Time), + histogramMutex: sync.Mutex{}, }, nil } @@ -53,3 +65,23 @@ func (m *MpcMetrics) TrackRelayerStatus(unavailable peer.IDSlice, all peer.IDSli *m.totalRelayerCount = int64(len(all)) *m.availableRelayerCount = int64(len(all) - len(unavailable)) } + +func (m *MpcMetrics) StartProcess(sessionID string) { + m.histogramMutex.Lock() + defer m.histogramMutex.Unlock() + + m.sessionStartTime[sessionID] = time.Now() +} + +func (m *MpcMetrics) EndProcess(sessionID string) { + m.histogramMutex.Lock() + defer m.histogramMutex.Unlock() + + startTime, ok := m.sessionStartTime[sessionID] + if !ok { + log.Warn().Msgf("Session start time with ID %s not found", sessionID) + return + } + + m.sessionTimeHistogram.Record(context.Background(), time.Since(startTime).Seconds()) +} diff --git a/tss/coordinator.go b/tss/coordinator.go index a37b84d3..c93662a3 100644 --- a/tss/coordinator.go +++ b/tss/coordinator.go @@ -37,6 +37,11 @@ type TssProcess interface { Timeout() time.Duration } +type Metrics interface { + StartProcess(sessionID string) + EndProcess(sessionID string) +} + type Coordinator struct { host host.Host communication comm.Communication @@ -44,6 +49,7 @@ type Coordinator struct { pendingProcesses map[string]bool processLock sync.Mutex + metrics Metrics CoordinatorTimeout time.Duration InitiatePeriod time.Duration @@ -52,12 +58,14 @@ type Coordinator struct { func NewCoordinator( host host.Host, communication comm.Communication, + metrics Metrics, electorFactory *elector.CoordinatorElectorFactory, ) *Coordinator { return &Coordinator{ host: host, communication: communication, electorFactory: electorFactory, + metrics: metrics, pendingProcesses: make(map[string]bool), @@ -79,6 +87,7 @@ func (c *Coordinator) Execute(ctx context.Context, tssProcesses []TssProcess, re c.processLock.Lock() c.pendingProcesses[sessionID] = true c.processLock.Unlock() + c.metrics.StartProcess(sessionID) ctx, cancel := context.WithCancel(ctx) p := pool.New().WithContext(ctx).WithCancelOnError() diff --git a/tss/ecdsa/keygen/keygen_test.go b/tss/ecdsa/keygen/keygen_test.go index 810ac286..88587b1a 100644 --- a/tss/ecdsa/keygen/keygen_test.go +++ b/tss/ecdsa/keygen/keygen_test.go @@ -8,7 +8,6 @@ import ( "testing" "time" - "github.com/golang/mock/gomock" "github.com/libp2p/go-libp2p/core/peer" "github.com/sourcegraph/conc/pool" "github.com/sprintertech/sprinter-signing/comm" @@ -17,6 +16,7 @@ import ( "github.com/sprintertech/sprinter-signing/tss/ecdsa/keygen" tsstest "github.com/sprintertech/sprinter-signing/tss/test" "github.com/stretchr/testify/suite" + "go.uber.org/mock/gomock" ) type KeygenTestSuite struct { @@ -40,7 +40,7 @@ func (s *KeygenTestSuite) Test_ValidKeygenProcess() { communicationMap[host.ID()] = &communication keygen := keygen.NewKeygen("keygen", s.Threshold, host, &communication, s.MockECDSAStorer) electorFactory := elector.NewCoordinatorElectorFactory(host, s.BullyConfig) - coordinator := tss.NewCoordinator(host, &communication, electorFactory) + coordinator := tss.NewCoordinator(host, &communication, s.MockMetrics, electorFactory) coordinators = append(coordinators, coordinator) processes = append(processes, keygen) } @@ -76,7 +76,7 @@ func (s *KeygenTestSuite) Test_KeygenTimeout() { communicationMap[host.ID()] = &communication keygen := keygen.NewKeygen("keygen2", s.Threshold, host, &communication, s.MockECDSAStorer) electorFactory := elector.NewCoordinatorElectorFactory(host, s.BullyConfig) - coordinator := tss.NewCoordinator(host, &communication, electorFactory) + coordinator := tss.NewCoordinator(host, &communication, s.MockMetrics, electorFactory) keygen.TssTimeout = time.Millisecond coordinators = append(coordinators, coordinator) processes = append(processes, keygen) diff --git a/tss/ecdsa/resharing/resharing_test.go b/tss/ecdsa/resharing/resharing_test.go index b8396cdd..c324a993 100644 --- a/tss/ecdsa/resharing/resharing_test.go +++ b/tss/ecdsa/resharing/resharing_test.go @@ -8,7 +8,6 @@ import ( "fmt" "testing" - "github.com/golang/mock/gomock" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" @@ -20,6 +19,7 @@ import ( "github.com/sprintertech/sprinter-signing/tss/ecdsa/resharing" tsstest "github.com/sprintertech/sprinter-signing/tss/test" "github.com/stretchr/testify/suite" + "go.uber.org/mock/gomock" ) type ResharingTestSuite struct { @@ -60,7 +60,7 @@ func (s *ResharingTestSuite) Test_ValidResharingProcess_OldAndNewSubset() { s.MockECDSAStorer.EXPECT().StoreKeyshare(gomock.Any()).Return(nil) resharing := resharing.NewResharing("resharing2", 1, host, &communication, s.MockECDSAStorer) electorFactory := elector.NewCoordinatorElectorFactory(host, s.BullyConfig) - coordinators = append(coordinators, tss.NewCoordinator(host, &communication, electorFactory)) + coordinators = append(coordinators, tss.NewCoordinator(host, &communication, s.MockMetrics, electorFactory)) processes = append(processes, resharing) } tsstest.SetupCommunication(communicationMap) @@ -107,7 +107,7 @@ func (s *ResharingTestSuite) Test_ValidResharingProcess_RemovePeer() { s.MockECDSAStorer.EXPECT().StoreKeyshare(gomock.Any()).Return(nil) resharing := resharing.NewResharing("resharing2", 1, host, &communication, s.MockECDSAStorer) electorFactory := elector.NewCoordinatorElectorFactory(host, s.BullyConfig) - coordinators = append(coordinators, tss.NewCoordinator(host, &communication, electorFactory)) + coordinators = append(coordinators, tss.NewCoordinator(host, &communication, s.MockMetrics, electorFactory)) processes = append(processes, resharing) } tsstest.SetupCommunication(communicationMap) @@ -157,7 +157,7 @@ func (s *ResharingTestSuite) Test_InvalidResharingProcess_InvalidOldThreshold_Le s.MockECDSAStorer.EXPECT().GetKeyshare().Return(share, nil) resharing := resharing.NewResharing("resharing3", 1, host, &communication, s.MockECDSAStorer) electorFactory := elector.NewCoordinatorElectorFactory(host, s.BullyConfig) - coordinators = append(coordinators, tss.NewCoordinator(host, &communication, electorFactory)) + coordinators = append(coordinators, tss.NewCoordinator(host, &communication, s.MockMetrics, electorFactory)) processes = append(processes, resharing) } tsstest.SetupCommunication(communicationMap) @@ -206,7 +206,7 @@ func (s *ResharingTestSuite) Test_InvalidResharingProcess_InvalidOldThreshold_Bi s.MockECDSAStorer.EXPECT().GetKeyshare().Return(share, nil) resharing := resharing.NewResharing("resharing4", 1, host, &communication, s.MockECDSAStorer) electorFactory := elector.NewCoordinatorElectorFactory(host, s.BullyConfig) - coordinators = append(coordinators, tss.NewCoordinator(host, &communication, electorFactory)) + coordinators = append(coordinators, tss.NewCoordinator(host, &communication, s.MockMetrics, electorFactory)) processes = append(processes, resharing) } tsstest.SetupCommunication(communicationMap) diff --git a/tss/ecdsa/signing/signing_test.go b/tss/ecdsa/signing/signing_test.go index c0d41dfd..6771ac6c 100644 --- a/tss/ecdsa/signing/signing_test.go +++ b/tss/ecdsa/signing/signing_test.go @@ -51,7 +51,7 @@ func (s *SigningTestSuite) Test_ValidSigningProcess() { panic(err) } electorFactory := elector.NewCoordinatorElectorFactory(host, s.BullyConfig) - coordinators = append(coordinators, tss.NewCoordinator(host, &communication, electorFactory)) + coordinators = append(coordinators, tss.NewCoordinator(host, &communication, s.MockMetrics, electorFactory)) processes = append(processes, signing) } tsstest.SetupCommunication(communicationMap) @@ -101,7 +101,7 @@ func (s *SigningTestSuite) Test_ValidSigningProcess_ManualCoordinator() { panic(err) } electorFactory := elector.NewCoordinatorElectorFactory(host, s.BullyConfig) - coordinators = append(coordinators, tss.NewCoordinator(host, &communication, electorFactory)) + coordinators = append(coordinators, tss.NewCoordinator(host, &communication, s.MockMetrics, electorFactory)) processes = append(processes, signing) } tsstest.SetupCommunication(communicationMap) @@ -155,7 +155,7 @@ func (s *SigningTestSuite) Test_SigningTimeout() { panic(err) } electorFactory := elector.NewCoordinatorElectorFactory(host, s.BullyConfig) - coordinator := tss.NewCoordinator(host, &communication, electorFactory) + coordinator := tss.NewCoordinator(host, &communication, s.MockMetrics, electorFactory) signing.TssTimeout = time.Nanosecond coordinators = append(coordinators, coordinator) processes = append(processes, signing) @@ -188,7 +188,7 @@ func (s *SigningTestSuite) Test_PendingProcessExists() { communicationMap[host.ID()] = &communication keygen := keygen.NewKeygen("keygen3", s.Threshold, host, &communication, s.MockECDSAStorer) electorFactory := elector.NewCoordinatorElectorFactory(host, s.BullyConfig) - coordinators = append(coordinators, tss.NewCoordinator(host, &communication, electorFactory)) + coordinators = append(coordinators, tss.NewCoordinator(host, &communication, s.MockMetrics, electorFactory)) processes = append(processes, keygen) } tsstest.SetupCommunication(communicationMap) diff --git a/tss/mock/coordinator.go b/tss/mock/coordinator.go index 091c8bd0..162fc71c 100644 --- a/tss/mock/coordinator.go +++ b/tss/mock/coordinator.go @@ -152,3 +152,51 @@ func (mr *MockTssProcessMockRecorder) ValidCoordinators() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ValidCoordinators", reflect.TypeOf((*MockTssProcess)(nil).ValidCoordinators)) } + +// MockMetrics is a mock of Metrics interface. +type MockMetrics struct { + ctrl *gomock.Controller + recorder *MockMetricsMockRecorder + isgomock struct{} +} + +// MockMetricsMockRecorder is the mock recorder for MockMetrics. +type MockMetricsMockRecorder struct { + mock *MockMetrics +} + +// NewMockMetrics creates a new mock instance. +func NewMockMetrics(ctrl *gomock.Controller) *MockMetrics { + mock := &MockMetrics{ctrl: ctrl} + mock.recorder = &MockMetricsMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockMetrics) EXPECT() *MockMetricsMockRecorder { + return m.recorder +} + +// EndProcess mocks base method. +func (m *MockMetrics) EndProcess(sessionID string) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "EndProcess", sessionID) +} + +// EndProcess indicates an expected call of EndProcess. +func (mr *MockMetricsMockRecorder) EndProcess(sessionID any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EndProcess", reflect.TypeOf((*MockMetrics)(nil).EndProcess), sessionID) +} + +// StartProcess mocks base method. +func (m *MockMetrics) StartProcess(sessionID string) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "StartProcess", sessionID) +} + +// StartProcess indicates an expected call of StartProcess. +func (mr *MockMetricsMockRecorder) StartProcess(sessionID any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartProcess", reflect.TypeOf((*MockMetrics)(nil).StartProcess), sessionID) +} diff --git a/tss/test/utils.go b/tss/test/utils.go index 6f39dcef..5662c24e 100644 --- a/tss/test/utils.go +++ b/tss/test/utils.go @@ -27,6 +27,7 @@ type CoordinatorTestSuite struct { MockECDSAStorer *mock_tss.MockECDSAKeyshareStorer MockCommunication *mock_comm.MockCommunication MockTssProcess *mock_tss.MockTssProcess + MockMetrics *mock_tss.MockMetrics Hosts []host.Host Threshold int @@ -39,6 +40,8 @@ func (s *CoordinatorTestSuite) SetupTest() { s.MockECDSAStorer = mock_tss.NewMockECDSAKeyshareStorer(s.GomockController) s.MockCommunication = mock_comm.NewMockCommunication(s.GomockController) s.MockTssProcess = mock_tss.NewMockTssProcess(s.GomockController) + s.MockMetrics = mock_tss.NewMockMetrics(s.GomockController) + s.MockMetrics.EXPECT().StartProcess(gomock.Any()).AnyTimes() s.PartyNumber = 3 s.Threshold = 1 From f8f095707a897bb6ed4e7f94fcc66b27aaafe547 Mon Sep 17 00:00:00 2001 From: mpetrun5 Date: Mon, 16 Feb 2026 16:15:21 +0100 Subject: [PATCH 4/5] Cleanup session start times periodically --- metrics/mpc.go | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/metrics/mpc.go b/metrics/mpc.go index f281ff01..6272537b 100644 --- a/metrics/mpc.go +++ b/metrics/mpc.go @@ -2,23 +2,26 @@ package metrics import ( "context" - "sync" "time" + "github.com/jellydator/ttlcache/v3" "github.com/libp2p/go-libp2p/core/peer" "github.com/rs/zerolog/log" "go.opentelemetry.io/otel/metric" ) +const ( + SESSION_TTL = time.Minute * 10 +) + type MpcMetrics struct { totalRelayersGauge metric.Int64ObservableGauge availableRelayersGauge metric.Int64ObservableGauge totalRelayerCount *int64 availableRelayerCount *int64 - sessionTimeHistogram metric.Float64Histogram - sessionStartTime map[string]time.Time - histogramMutex sync.Mutex + sessionTimeHistogram metric.Float64Histogram + sessionStartTimeCache *ttlcache.Cache[string, time.Time] } // NewMpcMetrics initializes metrics related to the MPC set @@ -56,8 +59,9 @@ func NewMpcMetrics(ctx context.Context, meter metric.Meter, opts metric.Measurem totalRelayerCount: totalRelayerCount, availableRelayerCount: availableRelayerCount, sessionTimeHistogram: sessionTimeHistogram, - sessionStartTime: make(map[string]time.Time), - histogramMutex: sync.Mutex{}, + sessionStartTimeCache: ttlcache.New( + ttlcache.WithTTL[string, time.Time](SESSION_TTL), + ), }, nil } @@ -67,21 +71,15 @@ func (m *MpcMetrics) TrackRelayerStatus(unavailable peer.IDSlice, all peer.IDSli } func (m *MpcMetrics) StartProcess(sessionID string) { - m.histogramMutex.Lock() - defer m.histogramMutex.Unlock() - - m.sessionStartTime[sessionID] = time.Now() + m.sessionStartTimeCache.Set(sessionID, time.Now(), ttlcache.DefaultTTL) } func (m *MpcMetrics) EndProcess(sessionID string) { - m.histogramMutex.Lock() - defer m.histogramMutex.Unlock() - - startTime, ok := m.sessionStartTime[sessionID] - if !ok { + startTime := m.sessionStartTimeCache.Get(sessionID) + if startTime == nil { log.Warn().Msgf("Session start time with ID %s not found", sessionID) return } - m.sessionTimeHistogram.Record(context.Background(), time.Since(startTime).Seconds()) + m.sessionTimeHistogram.Record(context.Background(), time.Since(startTime.Value()).Seconds()) } From e531ec3f9fe9ca4aa3ff425cf9100df942f0815f Mon Sep 17 00:00:00 2001 From: mpetrun5 Date: Tue, 17 Feb 2026 10:27:53 +0100 Subject: [PATCH 5/5] Lint --- app/app.go | 8 ++------ metrics/mpc.go | 3 +++ 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/app/app.go b/app/app.go index 3d88aa95..87a56a6e 100644 --- a/app/app.go +++ b/app/app.go @@ -120,18 +120,14 @@ func Run() error { } }() sygmaMetrics, err := metrics.NewSprinterMetrics(ctx, mp.Meter("relayer-metric-provider"), configuration.RelayerConfig.Env, configuration.RelayerConfig.Id, Version) - if err != nil { - panic(err) - } + panicOnError(err) communication := p2p.NewCommunication(host, "p2p/sprinter") electorFactory := elector.NewCoordinatorElectorFactory(host, configuration.RelayerConfig.BullyConfig) coordinator := tss.NewCoordinator(host, communication, sygmaMetrics, electorFactory) db, err := lvldb.NewLvlDB(viper.GetString(config.BlockstoreFlagName)) - if err != nil { - panicOnError(err) - } + panicOnError(err) blockstore := store.NewBlockStore(db) keyshareStore := keyshare.NewECDSAKeyshareStore(configuration.RelayerConfig.MpcConfig.KeysharePath) diff --git a/metrics/mpc.go b/metrics/mpc.go index 6272537b..5d8c0a3f 100644 --- a/metrics/mpc.go +++ b/metrics/mpc.go @@ -52,6 +52,9 @@ func NewMpcMetrics(ctx context.Context, meter metric.Meter, opts metric.Measurem } sessionTimeHistogram, err := meter.Float64Histogram("relayer.SessionTime") + if err != nil { + return nil, err + } return &MpcMetrics{ totalRelayersGauge: totalRelayersGauge,