Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions universalClient/tss/dkls/keygen.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,6 @@ func (s *keygenSession) Step() ([]Message, bool, error) {
break
}

// If receiver is self, queue locally for next step
if receiver == s.partyID {
if err := s.InputMessage(msgData); err != nil {
return nil, false, fmt.Errorf("failed to queue local message: %w", err)
}
continue
}

messages = append(messages, Message{
Receiver: receiver,
Data: msgData,
Expand All @@ -109,7 +101,6 @@ func (s *keygenSession) Step() ([]Message, bool, error) {
return messages, false, nil
}

// InputMessage processes an incoming protocol message.
func (s *keygenSession) InputMessage(data []byte) error {
buf := make([]byte, len(data))
copy(buf, data)
Expand Down
8 changes: 0 additions & 8 deletions universalClient/tss/dkls/keyrefresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,6 @@ func (s *keyrefreshSession) Step() ([]Message, bool, error) {
break
}

if receiver == s.partyID {
if err := s.InputMessage(msgData); err != nil {
return nil, false, fmt.Errorf("failed to queue local message: %w", err)
}
continue
}

messages = append(messages, Message{
Receiver: receiver,
Data: msgData,
Expand All @@ -117,7 +110,6 @@ func (s *keyrefreshSession) Step() ([]Message, bool, error) {
return messages, false, nil
}

// InputMessage processes an incoming protocol message.
func (s *keyrefreshSession) InputMessage(data []byte) error {
buf := make([]byte, len(data))
copy(buf, data)
Expand Down
7 changes: 0 additions & 7 deletions universalClient/tss/dkls/quorumchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,6 @@ func (s *quorumchangeSession) Step() ([]Message, bool, error) {
break
}

if receiver == s.partyID {
if err := s.InputMessage(msgData); err != nil {
return nil, false, fmt.Errorf("failed to queue local message: %w", err)
}
continue
}

messages = append(messages, Message{
Receiver: receiver,
Data: msgData,
Expand Down
7 changes: 0 additions & 7 deletions universalClient/tss/dkls/sign.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,6 @@ func (s *signSession) Step() ([]Message, bool, error) {
break
}

if receiver == s.partyID {
if err := s.InputMessage(msgData); err != nil {
return nil, false, fmt.Errorf("failed to queue local message: %w", err)
}
continue
}

messages = append(messages, Message{
Receiver: receiver,
Data: msgData,
Expand Down
24 changes: 7 additions & 17 deletions universalClient/tss/sessionmanager/sessionmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,46 +279,37 @@ func (sm *SessionManager) processSessionStep(ctx context.Context, eventID string
return fmt.Errorf("session for event %s does not exist", eventID)
}

session := state.session

// Step the session (serialize to prevent concurrent access - DKLS may not be thread-safe)
state.stepMu.Lock()
messages, finished, err := session.Step()
messages, finished, err := state.session.Step()
state.stepMu.Unlock()

if err != nil {
return fmt.Errorf("failed to step session %s: %w", eventID, err)
}

// Send output messages
for _, dklsMsg := range messages {
// Find peerID for receiver partyID
peerID, err := sm.coordinator.GetPeerIDFromPartyID(ctx, dklsMsg.Receiver)
if err != nil {
sm.logger.Warn().
Err(err).
sm.logger.Warn().Err(err).
Str("receiver_party_id", dklsMsg.Receiver).
Msg("failed to get peerID for receiver")
continue
}

// Create coordinator message
coordMsg := coordinator.Message{
Type: "step",
EventID: eventID,
Payload: dklsMsg.Data,
Participants: nil, // Participants not needed for step messages
Type: "step",
EventID: eventID,
Payload: dklsMsg.Data,
}
msgBytes, err := json.Marshal(coordMsg)
if err != nil {
sm.logger.Warn().Err(err).Msg("failed to marshal step message")
continue
}

// Send message
if err := sm.send(ctx, peerID, msgBytes); err != nil {
sm.logger.Warn().
Err(err).
sm.logger.Warn().Err(err).
Str("event_id", eventID).
Str("receiver", dklsMsg.Receiver).
Str("peer_id", peerID).
Msg("failed to send step message")
Expand All @@ -331,7 +322,6 @@ func (sm *SessionManager) processSessionStep(ctx context.Context, eventID string
Msg("sent step message")
}

// If finished, handle result
if finished {
return sm.handleSessionFinished(ctx, eventID, state)
}
Expand Down
31 changes: 17 additions & 14 deletions universalClient/tss/tss.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,27 +236,30 @@ func NewNode(ctx context.Context, cfg Config) (*Node, error) {
pushSigner: cfg.PushSigner,
stopCh: make(chan struct{}),
registeredPeers: make(map[string]bool),
txResolver: txresolver.NewResolver(txresolver.Config{
EventStore: evtStore,
Chains: cfg.Chains,
PushSigner: cfg.PushSigner,
CheckInterval: sessionExpiryCheckInterval,
Logger: logger,
}),
}

// Create broadcaster after node so the closure can capture `node`.
getTSSAddress := func(ctx context.Context) (string, error) {
if node.coordinator == nil {
return "", fmt.Errorf("coordinator not initialized")
}
return node.coordinator.GetTSSAddress(ctx)
}

node.txResolver = txresolver.NewResolver(txresolver.Config{
EventStore: evtStore,
Chains: cfg.Chains,
PushSigner: cfg.PushSigner,
CheckInterval: sessionExpiryCheckInterval,
Logger: logger,
GetTSSAddress: getTSSAddress,
})

node.txBroadcaster = txbroadcaster.NewBroadcaster(txbroadcaster.Config{
EventStore: evtStore,
Chains: cfg.Chains,
CheckInterval: sessionExpiryCheckInterval,
Logger: logger,
GetTSSAddress: func(ctx context.Context) (string, error) {
if node.coordinator == nil {
return "", fmt.Errorf("coordinator not initialized")
}
return node.coordinator.GetTSSAddress(ctx)
},
GetTSSAddress: getTSSAddress,
})

node.expirySweeper = expirysweeper.NewSweeper(expirysweeper.Config{
Expand Down
73 changes: 10 additions & 63 deletions universalClient/tss/txbroadcaster/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,52 +2,17 @@ package txbroadcaster

import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
"math/big"
"time"

"github.com/rs/zerolog"

uexecutortypes "github.com/pushchain/push-chain-node/x/uexecutor/types"
utsstypes "github.com/pushchain/push-chain-node/x/utss/types"

"github.com/pushchain/push-chain-node/universalClient/chains"
"github.com/pushchain/push-chain-node/universalClient/chains/common"
"github.com/pushchain/push-chain-node/universalClient/store"
"github.com/pushchain/push-chain-node/universalClient/tss/eventstore"
"github.com/pushchain/push-chain-node/universalClient/tss/txflow"
)

// ---------------------------------------------------------------------------
// Signed event data types
// ---------------------------------------------------------------------------

// SigningData holds the signing parameters persisted by sessionManager when marking SIGNED.
type SigningData struct {
Signature string `json:"signature"` // hex-encoded 64/65 byte signature
SigningHash string `json:"signing_hash"` // hex-encoded signing hash
Nonce uint64 `json:"nonce"`
TSSFundMigrationAmount *big.Int `json:"tss_fund_migration_amount,omitempty"`
}

// SignedOutboundData wraps OutboundCreatedEvent with signing data.
type SignedOutboundData struct {
uexecutortypes.OutboundCreatedEvent
SigningData *SigningData `json:"signing_data,omitempty"`
}

// SignedFundMigrationData wraps FundMigrationInitiatedEventData with signing data.
type SignedFundMigrationData struct {
utsstypes.FundMigrationInitiatedEventData
SigningData *SigningData `json:"signing_data,omitempty"`
}

// ---------------------------------------------------------------------------
// Broadcaster
// ---------------------------------------------------------------------------

// Config holds configuration for the broadcaster.
type Config struct {
EventStore *eventstore.Store
Chains *chains.Chains
Expand All @@ -56,7 +21,6 @@ type Config struct {
GetTSSAddress func(ctx context.Context) (string, error)
}

// Broadcaster polls SIGNED events and broadcasts them to external chains.
type Broadcaster struct {
eventStore *eventstore.Store
chains *chains.Chains
Expand All @@ -65,7 +29,6 @@ type Broadcaster struct {
getTSSAddress func(ctx context.Context) (string, error)
}

// NewBroadcaster creates a new tx broadcaster.
func NewBroadcaster(cfg Config) *Broadcaster {
interval := cfg.CheckInterval
if interval == 0 {
Expand Down Expand Up @@ -143,7 +106,7 @@ func (b *Broadcaster) broadcastEvent(ctx context.Context, event *store.Event) {

// broadcastOutbound parses outbound event data and delegates to chain-specific broadcast.
func (b *Broadcaster) broadcastOutbound(ctx context.Context, event *store.Event) {
var data SignedOutboundData
var data txflow.SignedOutboundData
if err := json.Unmarshal(event.EventData, &data); err != nil {
b.logger.Warn().Err(err).Str("event_id", event.EventID).Msg("failed to parse signed outbound data")
return
Expand All @@ -161,9 +124,9 @@ func (b *Broadcaster) broadcastOutbound(ctx context.Context, event *store.Event)
}

if b.chains.IsEVMChain(chainID) {
b.broadcastEVM(ctx, event, &data, chainID)
b.broadcastOutboundEVM(ctx, event, &data, chainID)
} else {
b.broadcastSVM(ctx, event, &data, chainID)
b.broadcastOutboundSVM(ctx, event, &data, chainID)
}
}

Expand All @@ -173,7 +136,7 @@ func (b *Broadcaster) broadcastOutbound(ctx context.Context, event *store.Event)

// broadcastFundMigration parses fund migration event data and delegates to chain-specific broadcast.
func (b *Broadcaster) broadcastFundMigration(ctx context.Context, event *store.Event) {
var data SignedFundMigrationData
var data txflow.SignedFundMigrationData
if err := json.Unmarshal(event.EventData, &data); err != nil {
b.logger.Warn().Err(err).Str("event_id", event.EventID).Msg("failed to parse fund migration signed data")
return
Expand All @@ -197,25 +160,6 @@ func (b *Broadcaster) broadcastFundMigration(ctx context.Context, event *store.E
// Helpers
// ---------------------------------------------------------------------------

// decodeSigningData extracts the UnsignedSigningReq and raw signature bytes from SigningData.
func decodeSigningData(sd *SigningData) (*common.UnsignedSigningReq, []byte, error) {
signingHash, err := hex.DecodeString(sd.SigningHash)
if err != nil {
return nil, nil, fmt.Errorf("failed to decode signing hash: %w", err)
}

signature, err := hex.DecodeString(sd.Signature)
if err != nil {
return nil, nil, fmt.Errorf("failed to decode signature: %w", err)
}

return &common.UnsignedSigningReq{
SigningHash: signingHash,
Nonce: sd.Nonce,
TSSFundMigrationAmount: sd.TSSFundMigrationAmount,
}, signature, nil
}

// markBroadcasted updates the event status to BROADCASTED with the given tx hash.
func (b *Broadcaster) markBroadcasted(event *store.Event, chainID, txHash string) {
caipTxHash := chainID + ":" + txHash
Expand All @@ -226,6 +170,9 @@ func (b *Broadcaster) markBroadcasted(event *store.Event, chainID, txHash string
b.logger.Warn().Err(err).Str("event_id", event.EventID).Msg("failed to update event to BROADCASTED")
return
}
b.logger.Info().Str("event_id", event.EventID).Str("tx_hash", txHash).Str("chain", chainID).
Msg("marked BROADCASTED")
b.logger.Info().
Str("event_id", event.EventID).
Str("type", event.Type).
Str("chain", chainID).
Msg("event marked as BROADCASTED")
}
Loading
Loading