diff --git a/app/app.go b/app/app.go index 49552d1a..87a56a6e 100644 --- a/app/app.go +++ b/app/app.go @@ -34,6 +34,7 @@ import ( "github.com/sprintertech/sprinter-signing/chains/evm/calls/events" evmListener "github.com/sprintertech/sprinter-signing/chains/evm/listener" evmMessage "github.com/sprintertech/sprinter-signing/chains/evm/message" + "github.com/sprintertech/sprinter-signing/metrics" lifiConfig "github.com/sprintertech/lifi-solver/pkg/config" "github.com/sprintertech/sprinter-signing/chains/lighter" @@ -44,7 +45,6 @@ import ( "github.com/sprintertech/sprinter-signing/config" "github.com/sprintertech/sprinter-signing/jobs" "github.com/sprintertech/sprinter-signing/keyshare" - "github.com/sprintertech/sprinter-signing/metrics" "github.com/sprintertech/sprinter-signing/price" "github.com/sprintertech/sprinter-signing/protocol/across" "github.com/sprintertech/sprinter-signing/protocol/lifi" @@ -109,16 +109,8 @@ func Run() error { panicOnError(err) log.Info().Str("peerID", host.ID().String()).Msg("Successfully created libp2p host") - communication := p2p.NewCommunication(host, "p2p/sprinter") - electorFactory := elector.NewCoordinatorElectorFactory(host, configuration.RelayerConfig.BullyConfig) - coordinator := tss.NewCoordinator(host, communication, electorFactory) - - db, err := lvldb.NewLvlDB(viper.GetString(config.BlockstoreFlagName)) - if err != nil { - panicOnError(err) - } - blockstore := store.NewBlockStore(db) - keyshareStore := keyshare.NewECDSAKeyshareStore(configuration.RelayerConfig.MpcConfig.KeysharePath) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() mp, err := observability.InitMetricProvider(context.Background(), configuration.RelayerConfig.OpenTelemetryCollectorURL) panicOnError(err) @@ -127,14 +119,18 @@ func Run() error { log.Error().Msgf("Error shutting down meter provider: %v", err) } }() + sygmaMetrics, err := metrics.NewSprinterMetrics(ctx, mp.Meter("relayer-metric-provider"), configuration.RelayerConfig.Env, configuration.RelayerConfig.Id, Version) + panicOnError(err) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + communication := p2p.NewCommunication(host, "p2p/sprinter") + electorFactory := elector.NewCoordinatorElectorFactory(host, configuration.RelayerConfig.BullyConfig) + coordinator := tss.NewCoordinator(host, communication, sygmaMetrics, electorFactory) + + db, err := lvldb.NewLvlDB(viper.GetString(config.BlockstoreFlagName)) + panicOnError(err) + blockstore := store.NewBlockStore(db) + keyshareStore := keyshare.NewECDSAKeyshareStore(configuration.RelayerConfig.MpcConfig.KeysharePath) - sygmaMetrics, err := metrics.NewSygmaMetrics(ctx, mp.Meter("relayer-metric-provider"), configuration.RelayerConfig.Env, configuration.RelayerConfig.Id, Version) - if err != nil { - panic(err) - } msgChan := make(chan []*message.Message) sigChn := make(chan interface{}) @@ -142,7 +138,7 @@ func Run() error { configuration.RelayerConfig.CoinmarketcapConfig.Url, configuration.RelayerConfig.CoinmarketcapConfig.ApiKey) - signatureCache := cache.NewSignatureCache(communication) + signatureCache := cache.NewSignatureCache(communication, sygmaMetrics) go signatureCache.Watch(ctx, sigChn) supportedChains := make(map[uint64]struct{}) diff --git a/cache/signature.go b/cache/signature.go index 8d4baa32..04b07495 100644 --- a/cache/signature.go +++ b/cache/signature.go @@ -16,12 +16,17 @@ const ( SIGNATURE_TTL = time.Minute * 10 ) +type Metrics interface { + EndProcess(sessionID string) +} + type SignatureCache struct { sigCache *ttlcache.Cache[string, []byte] comm comm.Communication + metrics Metrics } -func NewSignatureCache(c comm.Communication) *SignatureCache { +func NewSignatureCache(c comm.Communication, metrics Metrics) *SignatureCache { cache := ttlcache.New( ttlcache.WithTTL[string, []byte](SIGNATURE_TTL), ) @@ -29,6 +34,7 @@ func NewSignatureCache(c comm.Communication) *SignatureCache { sc := &SignatureCache{ sigCache: cache, comm: c, + metrics: metrics, } go cache.Start() @@ -43,7 +49,7 @@ func (s *SignatureCache) Subscribe(ctx context.Context, id string, sigChannel ch return } - ping := time.Tick(time.Millisecond * 250) + ping := time.Tick(time.Millisecond * 50) for { select { case <-ctx.Done(): @@ -81,6 +87,7 @@ func (s *SignatureCache) Watch(ctx context.Context, sigChn chan interface{}) { { sig := sig.(signing.EcdsaSignature) s.sigCache.Set(sig.ID, sig.Signature, ttlcache.DefaultTTL) + s.metrics.EndProcess(sig.ID) } case msg := <-msgChn: { @@ -92,6 +99,7 @@ func (s *SignatureCache) Watch(ctx context.Context, sigChn chan interface{}) { log.Debug().Msgf("Received signature for ID: %s", msg.ID) s.sigCache.Set(msg.ID, msg.Signature, ttlcache.DefaultTTL) + s.metrics.EndProcess(msg.ID) } case <-ctx.Done(): { diff --git a/cache/signature_test.go b/cache/signature_test.go index 6016af69..8f43c163 100644 --- a/cache/signature_test.go +++ b/cache/signature_test.go @@ -10,6 +10,7 @@ import ( mock_communication "github.com/sprintertech/sprinter-signing/comm/mock" "github.com/sprintertech/sprinter-signing/tss/ecdsa/signing" "github.com/sprintertech/sprinter-signing/tss/message" + mock_tss "github.com/sprintertech/sprinter-signing/tss/mock" "github.com/stretchr/testify/suite" "go.uber.org/mock/gomock" ) @@ -21,6 +22,7 @@ type SignatureCacheTestSuite struct { ctx context.Context mockCommunication *mock_communication.MockCommunication + mockMetrics *mock_tss.MockMetrics cancel context.CancelFunc sigChn chan interface{} msgChn chan *comm.WrappedMessage @@ -46,7 +48,8 @@ func (s *SignatureCacheTestSuite) SetupTest() { s.cancel = cancel s.ctx = ctx - s.sc = cache.NewSignatureCache(s.mockCommunication) + s.mockMetrics = mock_tss.NewMockMetrics(gomock.NewController(s.T())) + s.sc = cache.NewSignatureCache(s.mockCommunication, s.mockMetrics) go s.sc.Watch(s.ctx, s.sigChn) time.Sleep(time.Millisecond * 100) } @@ -65,6 +68,7 @@ func (s *SignatureCacheTestSuite) Test_Signature_ValidSignatureResult() { Signature: []byte("signature"), ID: "signatureID", } + s.mockMetrics.EXPECT().EndProcess(expectedSig.ID) s.sigChn <- expectedSig time.Sleep(time.Millisecond * 100) @@ -79,6 +83,7 @@ func (s *SignatureCacheTestSuite) Test_Signature_ValidMessage() { Signature: []byte("signature"), ID: "signatureID", } + s.mockMetrics.EXPECT().EndProcess(expectedSig.ID) wMsgBytes, _ := message.MarshalSignatureMessage(expectedSig.ID, expectedSig.Signature) wMsg := &comm.WrappedMessage{ Payload: wMsgBytes, @@ -98,6 +103,7 @@ func (s *SignatureCacheTestSuite) Test_Subscribe_ValidMessage_EarlyExit() { Signature: []byte("signature"), ID: "signatureID", } + s.mockMetrics.EXPECT().EndProcess(expectedSig.ID) wMsgBytes, _ := message.MarshalSignatureMessage(expectedSig.ID, expectedSig.Signature) wMsg := &comm.WrappedMessage{ Payload: wMsgBytes, @@ -122,6 +128,7 @@ func (s *SignatureCacheTestSuite) Test_Subscribe_ValidMessage() { Signature: []byte("signature"), ID: "signatureID", } + s.mockMetrics.EXPECT().EndProcess(expectedSig.ID) wMsgBytes, _ := message.MarshalSignatureMessage(expectedSig.ID, expectedSig.Signature) wMsg := &comm.WrappedMessage{ Payload: wMsgBytes, diff --git a/metrics/metrics.go b/metrics/metrics.go index 2e5455eb..328e84b0 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -11,14 +11,14 @@ import ( api "go.opentelemetry.io/otel/metric" ) -type SygmaMetrics struct { +type SprinterMetrics struct { *observability.RelayerMetrics *MpcMetrics *HostMetrics } // NewSygmaMetrics creates an instance of metrics -func NewSygmaMetrics(ctx context.Context, meter api.Meter, env, relayerID, version string) (*SygmaMetrics, error) { +func NewSprinterMetrics(ctx context.Context, meter api.Meter, env, relayerID, version string) (*SprinterMetrics, error) { attributes := []attribute.KeyValue{attribute.String("relayerid", relayerID), attribute.String("env", env), attribute.String("version", version)} opts := api.WithAttributes(attributes...) relayerMetrics, err := observability.NewRelayerMetrics(ctx, meter, attributes...) @@ -36,7 +36,7 @@ func NewSygmaMetrics(ctx context.Context, meter api.Meter, env, relayerID, versi return nil, err } - return &SygmaMetrics{ + return &SprinterMetrics{ RelayerMetrics: relayerMetrics, MpcMetrics: mpcMetrics, HostMetrics: hostMetrics, diff --git a/metrics/mpc.go b/metrics/mpc.go index 55200161..5d8c0a3f 100644 --- a/metrics/mpc.go +++ b/metrics/mpc.go @@ -2,16 +2,26 @@ package metrics import ( "context" + "time" + "github.com/jellydator/ttlcache/v3" "github.com/libp2p/go-libp2p/core/peer" + "github.com/rs/zerolog/log" "go.opentelemetry.io/otel/metric" ) +const ( + SESSION_TTL = time.Minute * 10 +) + type MpcMetrics struct { totalRelayersGauge metric.Int64ObservableGauge availableRelayersGauge metric.Int64ObservableGauge totalRelayerCount *int64 availableRelayerCount *int64 + + sessionTimeHistogram metric.Float64Histogram + sessionStartTimeCache *ttlcache.Cache[string, time.Time] } // NewMpcMetrics initializes metrics related to the MPC set @@ -41,11 +51,20 @@ func NewMpcMetrics(ctx context.Context, meter metric.Meter, opts metric.Measurem return nil, err } + sessionTimeHistogram, err := meter.Float64Histogram("relayer.SessionTime") + if err != nil { + return nil, err + } + return &MpcMetrics{ totalRelayersGauge: totalRelayersGauge, availableRelayersGauge: availableRelayersGauge, totalRelayerCount: totalRelayerCount, availableRelayerCount: availableRelayerCount, + sessionTimeHistogram: sessionTimeHistogram, + sessionStartTimeCache: ttlcache.New( + ttlcache.WithTTL[string, time.Time](SESSION_TTL), + ), }, nil } @@ -53,3 +72,17 @@ func (m *MpcMetrics) TrackRelayerStatus(unavailable peer.IDSlice, all peer.IDSli *m.totalRelayerCount = int64(len(all)) *m.availableRelayerCount = int64(len(all) - len(unavailable)) } + +func (m *MpcMetrics) StartProcess(sessionID string) { + m.sessionStartTimeCache.Set(sessionID, time.Now(), ttlcache.DefaultTTL) +} + +func (m *MpcMetrics) EndProcess(sessionID string) { + startTime := m.sessionStartTimeCache.Get(sessionID) + if startTime == nil { + log.Warn().Msgf("Session start time with ID %s not found", sessionID) + return + } + + m.sessionTimeHistogram.Record(context.Background(), time.Since(startTime.Value()).Seconds()) +} diff --git a/tss/coordinator.go b/tss/coordinator.go index a37b84d3..c93662a3 100644 --- a/tss/coordinator.go +++ b/tss/coordinator.go @@ -37,6 +37,11 @@ type TssProcess interface { Timeout() time.Duration } +type Metrics interface { + StartProcess(sessionID string) + EndProcess(sessionID string) +} + type Coordinator struct { host host.Host communication comm.Communication @@ -44,6 +49,7 @@ type Coordinator struct { pendingProcesses map[string]bool processLock sync.Mutex + metrics Metrics CoordinatorTimeout time.Duration InitiatePeriod time.Duration @@ -52,12 +58,14 @@ type Coordinator struct { func NewCoordinator( host host.Host, communication comm.Communication, + metrics Metrics, electorFactory *elector.CoordinatorElectorFactory, ) *Coordinator { return &Coordinator{ host: host, communication: communication, electorFactory: electorFactory, + metrics: metrics, pendingProcesses: make(map[string]bool), @@ -79,6 +87,7 @@ func (c *Coordinator) Execute(ctx context.Context, tssProcesses []TssProcess, re c.processLock.Lock() c.pendingProcesses[sessionID] = true c.processLock.Unlock() + c.metrics.StartProcess(sessionID) ctx, cancel := context.WithCancel(ctx) p := pool.New().WithContext(ctx).WithCancelOnError() diff --git a/tss/ecdsa/keygen/keygen_test.go b/tss/ecdsa/keygen/keygen_test.go index 810ac286..88587b1a 100644 --- a/tss/ecdsa/keygen/keygen_test.go +++ b/tss/ecdsa/keygen/keygen_test.go @@ -8,7 +8,6 @@ import ( "testing" "time" - "github.com/golang/mock/gomock" "github.com/libp2p/go-libp2p/core/peer" "github.com/sourcegraph/conc/pool" "github.com/sprintertech/sprinter-signing/comm" @@ -17,6 +16,7 @@ import ( "github.com/sprintertech/sprinter-signing/tss/ecdsa/keygen" tsstest "github.com/sprintertech/sprinter-signing/tss/test" "github.com/stretchr/testify/suite" + "go.uber.org/mock/gomock" ) type KeygenTestSuite struct { @@ -40,7 +40,7 @@ func (s *KeygenTestSuite) Test_ValidKeygenProcess() { communicationMap[host.ID()] = &communication keygen := keygen.NewKeygen("keygen", s.Threshold, host, &communication, s.MockECDSAStorer) electorFactory := elector.NewCoordinatorElectorFactory(host, s.BullyConfig) - coordinator := tss.NewCoordinator(host, &communication, electorFactory) + coordinator := tss.NewCoordinator(host, &communication, s.MockMetrics, electorFactory) coordinators = append(coordinators, coordinator) processes = append(processes, keygen) } @@ -76,7 +76,7 @@ func (s *KeygenTestSuite) Test_KeygenTimeout() { communicationMap[host.ID()] = &communication keygen := keygen.NewKeygen("keygen2", s.Threshold, host, &communication, s.MockECDSAStorer) electorFactory := elector.NewCoordinatorElectorFactory(host, s.BullyConfig) - coordinator := tss.NewCoordinator(host, &communication, electorFactory) + coordinator := tss.NewCoordinator(host, &communication, s.MockMetrics, electorFactory) keygen.TssTimeout = time.Millisecond coordinators = append(coordinators, coordinator) processes = append(processes, keygen) diff --git a/tss/ecdsa/resharing/resharing_test.go b/tss/ecdsa/resharing/resharing_test.go index b8396cdd..c324a993 100644 --- a/tss/ecdsa/resharing/resharing_test.go +++ b/tss/ecdsa/resharing/resharing_test.go @@ -8,7 +8,6 @@ import ( "fmt" "testing" - "github.com/golang/mock/gomock" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" @@ -20,6 +19,7 @@ import ( "github.com/sprintertech/sprinter-signing/tss/ecdsa/resharing" tsstest "github.com/sprintertech/sprinter-signing/tss/test" "github.com/stretchr/testify/suite" + "go.uber.org/mock/gomock" ) type ResharingTestSuite struct { @@ -60,7 +60,7 @@ func (s *ResharingTestSuite) Test_ValidResharingProcess_OldAndNewSubset() { s.MockECDSAStorer.EXPECT().StoreKeyshare(gomock.Any()).Return(nil) resharing := resharing.NewResharing("resharing2", 1, host, &communication, s.MockECDSAStorer) electorFactory := elector.NewCoordinatorElectorFactory(host, s.BullyConfig) - coordinators = append(coordinators, tss.NewCoordinator(host, &communication, electorFactory)) + coordinators = append(coordinators, tss.NewCoordinator(host, &communication, s.MockMetrics, electorFactory)) processes = append(processes, resharing) } tsstest.SetupCommunication(communicationMap) @@ -107,7 +107,7 @@ func (s *ResharingTestSuite) Test_ValidResharingProcess_RemovePeer() { s.MockECDSAStorer.EXPECT().StoreKeyshare(gomock.Any()).Return(nil) resharing := resharing.NewResharing("resharing2", 1, host, &communication, s.MockECDSAStorer) electorFactory := elector.NewCoordinatorElectorFactory(host, s.BullyConfig) - coordinators = append(coordinators, tss.NewCoordinator(host, &communication, electorFactory)) + coordinators = append(coordinators, tss.NewCoordinator(host, &communication, s.MockMetrics, electorFactory)) processes = append(processes, resharing) } tsstest.SetupCommunication(communicationMap) @@ -157,7 +157,7 @@ func (s *ResharingTestSuite) Test_InvalidResharingProcess_InvalidOldThreshold_Le s.MockECDSAStorer.EXPECT().GetKeyshare().Return(share, nil) resharing := resharing.NewResharing("resharing3", 1, host, &communication, s.MockECDSAStorer) electorFactory := elector.NewCoordinatorElectorFactory(host, s.BullyConfig) - coordinators = append(coordinators, tss.NewCoordinator(host, &communication, electorFactory)) + coordinators = append(coordinators, tss.NewCoordinator(host, &communication, s.MockMetrics, electorFactory)) processes = append(processes, resharing) } tsstest.SetupCommunication(communicationMap) @@ -206,7 +206,7 @@ func (s *ResharingTestSuite) Test_InvalidResharingProcess_InvalidOldThreshold_Bi s.MockECDSAStorer.EXPECT().GetKeyshare().Return(share, nil) resharing := resharing.NewResharing("resharing4", 1, host, &communication, s.MockECDSAStorer) electorFactory := elector.NewCoordinatorElectorFactory(host, s.BullyConfig) - coordinators = append(coordinators, tss.NewCoordinator(host, &communication, electorFactory)) + coordinators = append(coordinators, tss.NewCoordinator(host, &communication, s.MockMetrics, electorFactory)) processes = append(processes, resharing) } tsstest.SetupCommunication(communicationMap) diff --git a/tss/ecdsa/signing/signing_test.go b/tss/ecdsa/signing/signing_test.go index c0d41dfd..6771ac6c 100644 --- a/tss/ecdsa/signing/signing_test.go +++ b/tss/ecdsa/signing/signing_test.go @@ -51,7 +51,7 @@ func (s *SigningTestSuite) Test_ValidSigningProcess() { panic(err) } electorFactory := elector.NewCoordinatorElectorFactory(host, s.BullyConfig) - coordinators = append(coordinators, tss.NewCoordinator(host, &communication, electorFactory)) + coordinators = append(coordinators, tss.NewCoordinator(host, &communication, s.MockMetrics, electorFactory)) processes = append(processes, signing) } tsstest.SetupCommunication(communicationMap) @@ -101,7 +101,7 @@ func (s *SigningTestSuite) Test_ValidSigningProcess_ManualCoordinator() { panic(err) } electorFactory := elector.NewCoordinatorElectorFactory(host, s.BullyConfig) - coordinators = append(coordinators, tss.NewCoordinator(host, &communication, electorFactory)) + coordinators = append(coordinators, tss.NewCoordinator(host, &communication, s.MockMetrics, electorFactory)) processes = append(processes, signing) } tsstest.SetupCommunication(communicationMap) @@ -155,7 +155,7 @@ func (s *SigningTestSuite) Test_SigningTimeout() { panic(err) } electorFactory := elector.NewCoordinatorElectorFactory(host, s.BullyConfig) - coordinator := tss.NewCoordinator(host, &communication, electorFactory) + coordinator := tss.NewCoordinator(host, &communication, s.MockMetrics, electorFactory) signing.TssTimeout = time.Nanosecond coordinators = append(coordinators, coordinator) processes = append(processes, signing) @@ -188,7 +188,7 @@ func (s *SigningTestSuite) Test_PendingProcessExists() { communicationMap[host.ID()] = &communication keygen := keygen.NewKeygen("keygen3", s.Threshold, host, &communication, s.MockECDSAStorer) electorFactory := elector.NewCoordinatorElectorFactory(host, s.BullyConfig) - coordinators = append(coordinators, tss.NewCoordinator(host, &communication, electorFactory)) + coordinators = append(coordinators, tss.NewCoordinator(host, &communication, s.MockMetrics, electorFactory)) processes = append(processes, keygen) } tsstest.SetupCommunication(communicationMap) diff --git a/tss/mock/coordinator.go b/tss/mock/coordinator.go index 091c8bd0..162fc71c 100644 --- a/tss/mock/coordinator.go +++ b/tss/mock/coordinator.go @@ -152,3 +152,51 @@ func (mr *MockTssProcessMockRecorder) ValidCoordinators() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ValidCoordinators", reflect.TypeOf((*MockTssProcess)(nil).ValidCoordinators)) } + +// MockMetrics is a mock of Metrics interface. +type MockMetrics struct { + ctrl *gomock.Controller + recorder *MockMetricsMockRecorder + isgomock struct{} +} + +// MockMetricsMockRecorder is the mock recorder for MockMetrics. +type MockMetricsMockRecorder struct { + mock *MockMetrics +} + +// NewMockMetrics creates a new mock instance. +func NewMockMetrics(ctrl *gomock.Controller) *MockMetrics { + mock := &MockMetrics{ctrl: ctrl} + mock.recorder = &MockMetricsMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockMetrics) EXPECT() *MockMetricsMockRecorder { + return m.recorder +} + +// EndProcess mocks base method. +func (m *MockMetrics) EndProcess(sessionID string) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "EndProcess", sessionID) +} + +// EndProcess indicates an expected call of EndProcess. +func (mr *MockMetricsMockRecorder) EndProcess(sessionID any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EndProcess", reflect.TypeOf((*MockMetrics)(nil).EndProcess), sessionID) +} + +// StartProcess mocks base method. +func (m *MockMetrics) StartProcess(sessionID string) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "StartProcess", sessionID) +} + +// StartProcess indicates an expected call of StartProcess. +func (mr *MockMetricsMockRecorder) StartProcess(sessionID any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartProcess", reflect.TypeOf((*MockMetrics)(nil).StartProcess), sessionID) +} diff --git a/tss/test/utils.go b/tss/test/utils.go index 6f39dcef..5662c24e 100644 --- a/tss/test/utils.go +++ b/tss/test/utils.go @@ -27,6 +27,7 @@ type CoordinatorTestSuite struct { MockECDSAStorer *mock_tss.MockECDSAKeyshareStorer MockCommunication *mock_comm.MockCommunication MockTssProcess *mock_tss.MockTssProcess + MockMetrics *mock_tss.MockMetrics Hosts []host.Host Threshold int @@ -39,6 +40,8 @@ func (s *CoordinatorTestSuite) SetupTest() { s.MockECDSAStorer = mock_tss.NewMockECDSAKeyshareStorer(s.GomockController) s.MockCommunication = mock_comm.NewMockCommunication(s.GomockController) s.MockTssProcess = mock_tss.NewMockTssProcess(s.GomockController) + s.MockMetrics = mock_tss.NewMockMetrics(s.GomockController) + s.MockMetrics.EXPECT().StartProcess(gomock.Any()).AnyTimes() s.PartyNumber = 3 s.Threshold = 1