Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 14 additions & 18 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/sprintertech/sprinter-signing/chains/evm/calls/events"
evmListener "github.com/sprintertech/sprinter-signing/chains/evm/listener"
evmMessage "github.com/sprintertech/sprinter-signing/chains/evm/message"
"github.com/sprintertech/sprinter-signing/metrics"

lifiConfig "github.com/sprintertech/lifi-solver/pkg/config"
"github.com/sprintertech/sprinter-signing/chains/lighter"
Expand All @@ -44,7 +45,6 @@ import (
"github.com/sprintertech/sprinter-signing/config"
"github.com/sprintertech/sprinter-signing/jobs"
"github.com/sprintertech/sprinter-signing/keyshare"
"github.com/sprintertech/sprinter-signing/metrics"
"github.com/sprintertech/sprinter-signing/price"
"github.com/sprintertech/sprinter-signing/protocol/across"
"github.com/sprintertech/sprinter-signing/protocol/lifi"
Expand Down Expand Up @@ -109,16 +109,8 @@ func Run() error {
panicOnError(err)
log.Info().Str("peerID", host.ID().String()).Msg("Successfully created libp2p host")

communication := p2p.NewCommunication(host, "p2p/sprinter")
electorFactory := elector.NewCoordinatorElectorFactory(host, configuration.RelayerConfig.BullyConfig)
coordinator := tss.NewCoordinator(host, communication, electorFactory)

db, err := lvldb.NewLvlDB(viper.GetString(config.BlockstoreFlagName))
if err != nil {
panicOnError(err)
}
blockstore := store.NewBlockStore(db)
keyshareStore := keyshare.NewECDSAKeyshareStore(configuration.RelayerConfig.MpcConfig.KeysharePath)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

mp, err := observability.InitMetricProvider(context.Background(), configuration.RelayerConfig.OpenTelemetryCollectorURL)
panicOnError(err)
Expand All @@ -127,22 +119,26 @@ func Run() error {
log.Error().Msgf("Error shutting down meter provider: %v", err)
}
}()
sygmaMetrics, err := metrics.NewSprinterMetrics(ctx, mp.Meter("relayer-metric-provider"), configuration.RelayerConfig.Env, configuration.RelayerConfig.Id, Version)
panicOnError(err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
communication := p2p.NewCommunication(host, "p2p/sprinter")
electorFactory := elector.NewCoordinatorElectorFactory(host, configuration.RelayerConfig.BullyConfig)
coordinator := tss.NewCoordinator(host, communication, sygmaMetrics, electorFactory)

db, err := lvldb.NewLvlDB(viper.GetString(config.BlockstoreFlagName))
panicOnError(err)
blockstore := store.NewBlockStore(db)
keyshareStore := keyshare.NewECDSAKeyshareStore(configuration.RelayerConfig.MpcConfig.KeysharePath)

sygmaMetrics, err := metrics.NewSygmaMetrics(ctx, mp.Meter("relayer-metric-provider"), configuration.RelayerConfig.Env, configuration.RelayerConfig.Id, Version)
if err != nil {
panic(err)
}
msgChan := make(chan []*message.Message)
sigChn := make(chan interface{})

priceAPI := price.NewCoinmarketcapAPI(
configuration.RelayerConfig.CoinmarketcapConfig.Url,
configuration.RelayerConfig.CoinmarketcapConfig.ApiKey)

signatureCache := cache.NewSignatureCache(communication)
signatureCache := cache.NewSignatureCache(communication, sygmaMetrics)
go signatureCache.Watch(ctx, sigChn)

supportedChains := make(map[uint64]struct{})
Expand Down
12 changes: 10 additions & 2 deletions cache/signature.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,25 @@ const (
SIGNATURE_TTL = time.Minute * 10
)

type Metrics interface {
EndProcess(sessionID string)
}

type SignatureCache struct {
sigCache *ttlcache.Cache[string, []byte]
comm comm.Communication
metrics Metrics
}

func NewSignatureCache(c comm.Communication) *SignatureCache {
func NewSignatureCache(c comm.Communication, metrics Metrics) *SignatureCache {
cache := ttlcache.New(
ttlcache.WithTTL[string, []byte](SIGNATURE_TTL),
)

sc := &SignatureCache{
sigCache: cache,
comm: c,
metrics: metrics,
}

go cache.Start()
Expand All @@ -43,7 +49,7 @@ func (s *SignatureCache) Subscribe(ctx context.Context, id string, sigChannel ch
return
}

ping := time.Tick(time.Millisecond * 250)
ping := time.Tick(time.Millisecond * 50)
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -81,6 +87,7 @@ func (s *SignatureCache) Watch(ctx context.Context, sigChn chan interface{}) {
{
sig := sig.(signing.EcdsaSignature)
s.sigCache.Set(sig.ID, sig.Signature, ttlcache.DefaultTTL)
s.metrics.EndProcess(sig.ID)
}
case msg := <-msgChn:
{
Expand All @@ -92,6 +99,7 @@ func (s *SignatureCache) Watch(ctx context.Context, sigChn chan interface{}) {

log.Debug().Msgf("Received signature for ID: %s", msg.ID)
s.sigCache.Set(msg.ID, msg.Signature, ttlcache.DefaultTTL)
s.metrics.EndProcess(msg.ID)
}
case <-ctx.Done():
{
Expand Down
9 changes: 8 additions & 1 deletion cache/signature_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
mock_communication "github.com/sprintertech/sprinter-signing/comm/mock"
"github.com/sprintertech/sprinter-signing/tss/ecdsa/signing"
"github.com/sprintertech/sprinter-signing/tss/message"
mock_tss "github.com/sprintertech/sprinter-signing/tss/mock"
"github.com/stretchr/testify/suite"
"go.uber.org/mock/gomock"
)
Expand All @@ -21,6 +22,7 @@ type SignatureCacheTestSuite struct {
ctx context.Context

mockCommunication *mock_communication.MockCommunication
mockMetrics *mock_tss.MockMetrics
cancel context.CancelFunc
sigChn chan interface{}
msgChn chan *comm.WrappedMessage
Expand All @@ -46,7 +48,8 @@ func (s *SignatureCacheTestSuite) SetupTest() {
s.cancel = cancel
s.ctx = ctx

s.sc = cache.NewSignatureCache(s.mockCommunication)
s.mockMetrics = mock_tss.NewMockMetrics(gomock.NewController(s.T()))
s.sc = cache.NewSignatureCache(s.mockCommunication, s.mockMetrics)
go s.sc.Watch(s.ctx, s.sigChn)
time.Sleep(time.Millisecond * 100)
}
Expand All @@ -65,6 +68,7 @@ func (s *SignatureCacheTestSuite) Test_Signature_ValidSignatureResult() {
Signature: []byte("signature"),
ID: "signatureID",
}
s.mockMetrics.EXPECT().EndProcess(expectedSig.ID)
s.sigChn <- expectedSig
time.Sleep(time.Millisecond * 100)

Expand All @@ -79,6 +83,7 @@ func (s *SignatureCacheTestSuite) Test_Signature_ValidMessage() {
Signature: []byte("signature"),
ID: "signatureID",
}
s.mockMetrics.EXPECT().EndProcess(expectedSig.ID)
wMsgBytes, _ := message.MarshalSignatureMessage(expectedSig.ID, expectedSig.Signature)
wMsg := &comm.WrappedMessage{
Payload: wMsgBytes,
Expand All @@ -98,6 +103,7 @@ func (s *SignatureCacheTestSuite) Test_Subscribe_ValidMessage_EarlyExit() {
Signature: []byte("signature"),
ID: "signatureID",
}
s.mockMetrics.EXPECT().EndProcess(expectedSig.ID)
wMsgBytes, _ := message.MarshalSignatureMessage(expectedSig.ID, expectedSig.Signature)
wMsg := &comm.WrappedMessage{
Payload: wMsgBytes,
Expand All @@ -122,6 +128,7 @@ func (s *SignatureCacheTestSuite) Test_Subscribe_ValidMessage() {
Signature: []byte("signature"),
ID: "signatureID",
}
s.mockMetrics.EXPECT().EndProcess(expectedSig.ID)
wMsgBytes, _ := message.MarshalSignatureMessage(expectedSig.ID, expectedSig.Signature)
wMsg := &comm.WrappedMessage{
Payload: wMsgBytes,
Expand Down
6 changes: 3 additions & 3 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ import (
api "go.opentelemetry.io/otel/metric"
)

type SygmaMetrics struct {
type SprinterMetrics struct {
*observability.RelayerMetrics
*MpcMetrics
*HostMetrics
}

// NewSygmaMetrics creates an instance of metrics
func NewSygmaMetrics(ctx context.Context, meter api.Meter, env, relayerID, version string) (*SygmaMetrics, error) {
func NewSprinterMetrics(ctx context.Context, meter api.Meter, env, relayerID, version string) (*SprinterMetrics, error) {
attributes := []attribute.KeyValue{attribute.String("relayerid", relayerID), attribute.String("env", env), attribute.String("version", version)}
opts := api.WithAttributes(attributes...)
relayerMetrics, err := observability.NewRelayerMetrics(ctx, meter, attributes...)
Expand All @@ -36,7 +36,7 @@ func NewSygmaMetrics(ctx context.Context, meter api.Meter, env, relayerID, versi
return nil, err
}

return &SygmaMetrics{
return &SprinterMetrics{
RelayerMetrics: relayerMetrics,
MpcMetrics: mpcMetrics,
HostMetrics: hostMetrics,
Expand Down
33 changes: 33 additions & 0 deletions metrics/mpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,26 @@ package metrics

import (
"context"
"time"

"github.com/jellydator/ttlcache/v3"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/rs/zerolog/log"
"go.opentelemetry.io/otel/metric"
)

const (
SESSION_TTL = time.Minute * 10
)

type MpcMetrics struct {
totalRelayersGauge metric.Int64ObservableGauge
availableRelayersGauge metric.Int64ObservableGauge
totalRelayerCount *int64
availableRelayerCount *int64

sessionTimeHistogram metric.Float64Histogram
sessionStartTimeCache *ttlcache.Cache[string, time.Time]
}

// NewMpcMetrics initializes metrics related to the MPC set
Expand Down Expand Up @@ -41,15 +51,38 @@ func NewMpcMetrics(ctx context.Context, meter metric.Meter, opts metric.Measurem
return nil, err
}

sessionTimeHistogram, err := meter.Float64Histogram("relayer.SessionTime")
if err != nil {
return nil, err
}

return &MpcMetrics{
totalRelayersGauge: totalRelayersGauge,
availableRelayersGauge: availableRelayersGauge,
totalRelayerCount: totalRelayerCount,
availableRelayerCount: availableRelayerCount,
sessionTimeHistogram: sessionTimeHistogram,
sessionStartTimeCache: ttlcache.New(
ttlcache.WithTTL[string, time.Time](SESSION_TTL),
),
}, nil
}

func (m *MpcMetrics) TrackRelayerStatus(unavailable peer.IDSlice, all peer.IDSlice) {
*m.totalRelayerCount = int64(len(all))
*m.availableRelayerCount = int64(len(all) - len(unavailable))
}

func (m *MpcMetrics) StartProcess(sessionID string) {
m.sessionStartTimeCache.Set(sessionID, time.Now(), ttlcache.DefaultTTL)
}

func (m *MpcMetrics) EndProcess(sessionID string) {
startTime := m.sessionStartTimeCache.Get(sessionID)
if startTime == nil {
log.Warn().Msgf("Session start time with ID %s not found", sessionID)
return
}

m.sessionTimeHistogram.Record(context.Background(), time.Since(startTime.Value()).Seconds())
}
9 changes: 9 additions & 0 deletions tss/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,19 @@ type TssProcess interface {
Timeout() time.Duration
}

type Metrics interface {
StartProcess(sessionID string)
EndProcess(sessionID string)
}

type Coordinator struct {
host host.Host
communication comm.Communication
electorFactory *elector.CoordinatorElectorFactory

pendingProcesses map[string]bool
processLock sync.Mutex
metrics Metrics

CoordinatorTimeout time.Duration
InitiatePeriod time.Duration
Expand All @@ -52,12 +58,14 @@ type Coordinator struct {
func NewCoordinator(
host host.Host,
communication comm.Communication,
metrics Metrics,
electorFactory *elector.CoordinatorElectorFactory,
) *Coordinator {
return &Coordinator{
host: host,
communication: communication,
electorFactory: electorFactory,
metrics: metrics,

pendingProcesses: make(map[string]bool),

Expand All @@ -79,6 +87,7 @@ func (c *Coordinator) Execute(ctx context.Context, tssProcesses []TssProcess, re
c.processLock.Lock()
c.pendingProcesses[sessionID] = true
c.processLock.Unlock()
c.metrics.StartProcess(sessionID)

ctx, cancel := context.WithCancel(ctx)
p := pool.New().WithContext(ctx).WithCancelOnError()
Expand Down
6 changes: 3 additions & 3 deletions tss/ecdsa/keygen/keygen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/sourcegraph/conc/pool"
"github.com/sprintertech/sprinter-signing/comm"
Expand All @@ -17,6 +16,7 @@ import (
"github.com/sprintertech/sprinter-signing/tss/ecdsa/keygen"
tsstest "github.com/sprintertech/sprinter-signing/tss/test"
"github.com/stretchr/testify/suite"
"go.uber.org/mock/gomock"
)

type KeygenTestSuite struct {
Expand All @@ -40,7 +40,7 @@ func (s *KeygenTestSuite) Test_ValidKeygenProcess() {
communicationMap[host.ID()] = &communication
keygen := keygen.NewKeygen("keygen", s.Threshold, host, &communication, s.MockECDSAStorer)
electorFactory := elector.NewCoordinatorElectorFactory(host, s.BullyConfig)
coordinator := tss.NewCoordinator(host, &communication, electorFactory)
coordinator := tss.NewCoordinator(host, &communication, s.MockMetrics, electorFactory)
coordinators = append(coordinators, coordinator)
processes = append(processes, keygen)
}
Expand Down Expand Up @@ -76,7 +76,7 @@ func (s *KeygenTestSuite) Test_KeygenTimeout() {
communicationMap[host.ID()] = &communication
keygen := keygen.NewKeygen("keygen2", s.Threshold, host, &communication, s.MockECDSAStorer)
electorFactory := elector.NewCoordinatorElectorFactory(host, s.BullyConfig)
coordinator := tss.NewCoordinator(host, &communication, electorFactory)
coordinator := tss.NewCoordinator(host, &communication, s.MockMetrics, electorFactory)
keygen.TssTimeout = time.Millisecond
coordinators = append(coordinators, coordinator)
processes = append(processes, keygen)
Expand Down
10 changes: 5 additions & 5 deletions tss/ecdsa/resharing/resharing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"fmt"
"testing"

"github.com/golang/mock/gomock"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
Expand All @@ -20,6 +19,7 @@ import (
"github.com/sprintertech/sprinter-signing/tss/ecdsa/resharing"
tsstest "github.com/sprintertech/sprinter-signing/tss/test"
"github.com/stretchr/testify/suite"
"go.uber.org/mock/gomock"
)

type ResharingTestSuite struct {
Expand Down Expand Up @@ -60,7 +60,7 @@ func (s *ResharingTestSuite) Test_ValidResharingProcess_OldAndNewSubset() {
s.MockECDSAStorer.EXPECT().StoreKeyshare(gomock.Any()).Return(nil)
resharing := resharing.NewResharing("resharing2", 1, host, &communication, s.MockECDSAStorer)
electorFactory := elector.NewCoordinatorElectorFactory(host, s.BullyConfig)
coordinators = append(coordinators, tss.NewCoordinator(host, &communication, electorFactory))
coordinators = append(coordinators, tss.NewCoordinator(host, &communication, s.MockMetrics, electorFactory))
processes = append(processes, resharing)
}
tsstest.SetupCommunication(communicationMap)
Expand Down Expand Up @@ -107,7 +107,7 @@ func (s *ResharingTestSuite) Test_ValidResharingProcess_RemovePeer() {
s.MockECDSAStorer.EXPECT().StoreKeyshare(gomock.Any()).Return(nil)
resharing := resharing.NewResharing("resharing2", 1, host, &communication, s.MockECDSAStorer)
electorFactory := elector.NewCoordinatorElectorFactory(host, s.BullyConfig)
coordinators = append(coordinators, tss.NewCoordinator(host, &communication, electorFactory))
coordinators = append(coordinators, tss.NewCoordinator(host, &communication, s.MockMetrics, electorFactory))
processes = append(processes, resharing)
}
tsstest.SetupCommunication(communicationMap)
Expand Down Expand Up @@ -157,7 +157,7 @@ func (s *ResharingTestSuite) Test_InvalidResharingProcess_InvalidOldThreshold_Le
s.MockECDSAStorer.EXPECT().GetKeyshare().Return(share, nil)
resharing := resharing.NewResharing("resharing3", 1, host, &communication, s.MockECDSAStorer)
electorFactory := elector.NewCoordinatorElectorFactory(host, s.BullyConfig)
coordinators = append(coordinators, tss.NewCoordinator(host, &communication, electorFactory))
coordinators = append(coordinators, tss.NewCoordinator(host, &communication, s.MockMetrics, electorFactory))
processes = append(processes, resharing)
}
tsstest.SetupCommunication(communicationMap)
Expand Down Expand Up @@ -206,7 +206,7 @@ func (s *ResharingTestSuite) Test_InvalidResharingProcess_InvalidOldThreshold_Bi
s.MockECDSAStorer.EXPECT().GetKeyshare().Return(share, nil)
resharing := resharing.NewResharing("resharing4", 1, host, &communication, s.MockECDSAStorer)
electorFactory := elector.NewCoordinatorElectorFactory(host, s.BullyConfig)
coordinators = append(coordinators, tss.NewCoordinator(host, &communication, electorFactory))
coordinators = append(coordinators, tss.NewCoordinator(host, &communication, s.MockMetrics, electorFactory))
processes = append(processes, resharing)
}
tsstest.SetupCommunication(communicationMap)
Expand Down
Loading
Loading