Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions client/swagger/models/model_deal.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions docs/swagger/docs.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions docs/swagger/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions docs/swagger/swagger.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions handler/deal/send-manual.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ func (DefaultHandler) SendManualHandler(
AnnounceToIPNI: request.IPNI,
StartDelay: startDelay,
Duration: duration,
WalletID: &walletObj.ID,
}

// resolve actor lazily — only makes RPC call if ActorID not yet linked
Expand Down
21 changes: 7 additions & 14 deletions handler/deal/send-manual_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package deal
import (
"context"
"testing"
"time"

"github.com/data-preservation-programs/singularity/handler/handlererror"
"github.com/data-preservation-programs/singularity/model"
Expand Down Expand Up @@ -160,19 +159,13 @@ func TestSendManualHandler(t *testing.T) {
createTestWalletAndActor(t, db, true)

mockDealMaker := new(MockDealMaker)
mockDealMaker.On("MakeDeal", mock.Anything, actor, mock.Anything, replication.DealConfig{
Provider: proposal.ProviderID,
StartDelay: 24 * time.Hour,
Duration: 2400 * time.Hour,
Verified: proposal.Verified,
HTTPHeaders: map[string]string{"a": "b"},
URLTemplate: proposal.URLTemplate,
KeepUnsealed: proposal.KeepUnsealed,
AnnounceToIPNI: proposal.IPNI,
PricePerDeal: proposal.PricePerDeal,
PricePerGB: proposal.PricePerGB,
PricePerGBEpoch: proposal.PricePerGBEpoch,
}).Return(&model.Deal{}, nil)
mockDealMaker.On("MakeDeal", mock.Anything, actor, mock.Anything,
mock.MatchedBy(func(dc replication.DealConfig) bool {
return dc.Provider == proposal.ProviderID &&
dc.Verified == proposal.Verified &&
dc.WalletID != nil
}),
).Return(&model.Deal{}, nil)
// lotusClient is nil — GetOrCreateActor won't call lotus because ActorID is already set
resp, err := Default.SendManualHandler(ctx, db, nil, nil, mockDealMaker, proposal)
mockDealMaker.AssertExpectations(t)
Expand Down
3 changes: 0 additions & 3 deletions handler/file/deals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,10 @@ func TestGetFileDealsHandler(t *testing.T) {

deals := []model.Deal{{
PieceCID: model.CID(testCid1),
Actor: &model.Actor{},
}, {
PieceCID: model.CID(testCid2),
Actor: &model.Actor{},
}, {
PieceCID: model.CID(testCid2),
Actor: &model.Actor{},
}}
err = db.Create(deals).Error
require.NoError(t, err)
Expand Down
4 changes: 0 additions & 4 deletions handler/file/retrieve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ func TestRetrieveFileHandler(t *testing.T) {
State: model.DealActive,
PieceCID: model.CID(testCid),
Provider: "apples" + strconv.Itoa(i),
Actor: &model.Actor{},
}
err = db.Create(&deal).Error
require.NoError(t, err)
Expand All @@ -158,7 +157,6 @@ func TestRetrieveFileHandler(t *testing.T) {
State: state,
PieceCID: model.CID(testCid),
Provider: "oranges" + strconv.Itoa(i),
Actor: &model.Actor{},
}
err = db.Create(&deal).Error
require.NoError(t, err)
Expand Down Expand Up @@ -489,7 +487,6 @@ func BenchmarkFilecoinRetrieve(b *testing.B) {
State: model.DealActive,
PieceCID: model.CID(testCid),
Provider: "apples" + strconv.Itoa(i),
Actor: &model.Actor{},
}
err = db.Create(&deal).Error
require.NoError(b, err)
Expand All @@ -502,7 +499,6 @@ func BenchmarkFilecoinRetrieve(b *testing.B) {
State: state,
PieceCID: model.CID(testCid),
Provider: "oranges" + strconv.Itoa(i),
Actor: &model.Actor{},
}
err = db.Create(&deal).Error
require.NoError(b, err)
Expand Down
96 changes: 96 additions & 0 deletions model/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,16 @@ func AutoMigrate(db *gorm.DB) error {
return errors.Wrap(err, "failed to infer deal types")
}

// Drop legacy fk_deals_actor constraint (Deal.ClientID no longer FKs to Actor)
if err := dropDealActorFK(db); err != nil {
return errors.Wrap(err, "failed to drop deal-actor FK")
}

// Backfill wallet_id for deals that predate the column
if err := backfillDealWalletID(db); err != nil {
return errors.Wrap(err, "failed to backfill deal wallet IDs")
}

return nil
}

Expand Down Expand Up @@ -344,6 +354,92 @@ func inferDealTypes(db *gorm.DB) error {
return nil
}

// dropDealActorFK removes the legacy fk_deals_actor constraint if it exists.
// Deal.ClientID is now a plain string (no FK to actors table).
func dropDealActorFK(db *gorm.DB) error {
dialect := db.Dialector.Name()
if dialect == "sqlite" {
return nil
}

constraint := "fk_deals_actor"
var exists bool

if dialect == "postgres" {
err := db.Raw(`
SELECT EXISTS (
SELECT 1 FROM information_schema.table_constraints
WHERE table_name = 'deals' AND constraint_name = ?
)`, constraint).Scan(&exists).Error
if err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential migration safety issue: this swallows any error from information_schema lookup and returns nil. If metadata access fails for reasons other than "table/constraint does not exist", the legacy FK may remain in place and later inserts can still fail with FK violations. Suggestion: only ignore explicit not-found cases; return unexpected errors so migration fails fast.

return errors.Wrapf(err, "failed to check constraint %s", constraint)
}
} else if dialect == "mysql" {
err := db.Raw(`
SELECT COUNT(*) > 0 FROM information_schema.TABLE_CONSTRAINTS
WHERE TABLE_NAME = 'deals' AND CONSTRAINT_NAME = ?
`, constraint).Scan(&exists).Error
if err != nil {
return errors.Wrapf(err, "failed to check constraint %s", constraint)
}
}

if !exists {
return nil
}

logger.Infow("dropping legacy deal-actor FK constraint", "constraint", constraint)
if dialect == "postgres" {
return db.Exec(`ALTER TABLE deals DROP CONSTRAINT ` + constraint).Error
}
// mysql
return db.Exec(`ALTER TABLE deals DROP FOREIGN KEY ` + constraint).Error
}

// backfillDealWalletID sets wallet_id for existing deals that have a client_id
// matching an actor linked to a wallet. idempotent — only touches NULL wallet_id rows.
func backfillDealWalletID(db *gorm.DB) error {
var count int64
err := db.Raw(`SELECT COUNT(*) FROM deals WHERE wallet_id IS NULL AND client_id != ''`).Scan(&count).Error
if err != nil {
logger.Debugw("skipping wallet_id backfill", "error", err)
return nil
}
if count == 0 {
return nil
}

logger.Infow("backfilling deal wallet_id from client_id → actor → wallet", "count", count)

dialect := db.Dialector.Name()
var query string
if dialect == "sqlite" {
query = `
UPDATE deals SET wallet_id = (
SELECT w.id FROM wallets w
WHERE w.actor_id = deals.client_id
LIMIT 1
) WHERE wallet_id IS NULL AND client_id != ''
AND EXISTS (SELECT 1 FROM wallets w WHERE w.actor_id = deals.client_id)`
} else {
query = `
UPDATE deals d
SET wallet_id = (
SELECT w.id FROM wallets w
WHERE w.actor_id = d.client_id
LIMIT 1
) WHERE d.wallet_id IS NULL AND d.client_id != ''
AND EXISTS (SELECT 1 FROM wallets w WHERE w.actor_id = d.client_id)`
}

result := db.Exec(query)
if result.Error != nil {
return errors.Wrap(result.Error, "failed to backfill wallet_id")
}
logger.Infow("backfilled deal wallet_id", "updated", result.RowsAffected)
return nil
}

// DropAll removes all tables specified in the Tables slice from the database.
//
// This function is typically used during development or testing where a clean database
Expand Down
3 changes: 2 additions & 1 deletion model/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ type Deal struct {
ScheduleID *ScheduleID `json:"scheduleId" table:"verbose"`
Schedule *Schedule `gorm:"foreignKey:ScheduleID;constraint:OnDelete:SET NULL" json:"schedule,omitempty" swaggerignore:"true" table:"expand"`
ClientID string `gorm:"index:idx_pending" json:"clientId"`
Actor *Actor `gorm:"foreignKey:ClientID;constraint:OnDelete:SET NULL" json:"actor,omitempty" swaggerignore:"true" table:"expand"`
WalletID *uint `gorm:"index:idx_deal_wallet" json:"walletId,omitempty" table:"verbose"`
Wallet *Wallet `gorm:"foreignKey:WalletID;constraint:OnDelete:SET NULL" json:"wallet,omitempty" swaggerignore:"true" table:"expand"`
}

// Key returns a mostly unique key to match deal from locally proposed deals and deals from the chain.
Expand Down
2 changes: 2 additions & 0 deletions replication/makedeal.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ type DealConfig struct {
PricePerDeal float64
PricePerGB float64
PricePerGBEpoch float64
WalletID *uint
}

// GetPrice calculates the price of a deal based on the size of the piece being stored,
Expand Down Expand Up @@ -596,6 +597,7 @@ func (d DealMakerImpl) MakeDeal(ctx context.Context, actorObj model.Actor,
dealModel := &model.Deal{
State: model.DealProposed,
ClientID: actorObj.ID,
WalletID: dealConfig.WalletID,
Provider: dealConfig.Provider,
Label: cid.Cid(car.RootCID).String(),
PieceCID: car.PieceCID,
Expand Down
5 changes: 1 addition & 4 deletions replication/wallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,10 @@ func (w DatacapWalletChooser) getDatacapCached(ctx context.Context, wallet model
}

func (w DatacapWalletChooser) getPendingDeals(ctx context.Context, wallet model.Wallet) (int64, error) {
if wallet.ActorID == nil {
return 0, nil
}
var totalPieceSize int64
err := w.db.WithContext(ctx).Model(&model.Deal{}).
Select("COALESCE(SUM(piece_size), 0)").
Where("client_id = ? AND verified AND state = ?", *wallet.ActorID, model.DealProposed).
Where("wallet_id = ? AND verified AND state = ?", wallet.ID, model.DealProposed).
Scan(&totalPieceSize).
Error
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions replication/wallet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func TestDatacapWalletChooser_Choose(t *testing.T) {

require.NoError(t, db.Create(&model.Deal{
ClientID: "actorc",
WalletID: &wallets[2].ID,
Verified: true,
State: model.DealProposed,
PieceSize: 500000,
Expand Down
1 change: 1 addition & 0 deletions service/dealpusher/dealpusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ func (d *DealPusher) runSchedule(ctx context.Context, schedule *model.Schedule)
PricePerDeal: schedule.PricePerDeal,
PricePerGB: schedule.PricePerGB,
PricePerGBEpoch: schedule.PricePerGBEpoch,
WalletID: &walletObj.ID,
},
proposalSigner)
if err != nil {
Expand Down
13 changes: 7 additions & 6 deletions service/dealpusher/pdp_schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/cockroachdb/errors"
"github.com/data-preservation-programs/singularity/database"
"github.com/data-preservation-programs/singularity/handler/wallet"
"github.com/data-preservation-programs/singularity/model"
"github.com/data-preservation-programs/singularity/util"
"github.com/data-preservation-programs/singularity/util/keystore"
Expand Down Expand Up @@ -138,10 +137,6 @@ func (d *DealPusher) runPDPSchedule(ctx context.Context, schedule *model.Schedul
if err != nil {
return model.ScheduleError, errors.Wrap(err, "failed to choose wallet")
}
actorObj, err := wallet.GetOrCreateActor(ctx, db, d.lotusClient, &walletObj)
if err != nil {
return model.ScheduleError, errors.Wrapf(err, "failed to resolve actor for wallet %s", walletObj.Address)
}

evmSigner, err := keystore.EVMSigner(d.keyStore, walletObj)
if err != nil {
Expand All @@ -167,6 +162,11 @@ func (d *DealPusher) runPDPSchedule(ctx context.Context, schedule *model.Schedul
return model.ScheduleError, errors.Wrap(err, "failed waiting for PDP transaction confirmation")
}

clientID := ""
if walletObj.ActorID != nil {
clientID = *walletObj.ActorID
}

for _, car := range cars {
proofSetIDCopy := proofSetID
dealModel := &model.Deal{
Expand All @@ -177,7 +177,8 @@ func (d *DealPusher) runPDPSchedule(ctx context.Context, schedule *model.Schedul
PieceSize: car.PieceSize,
Verified: schedule.Verified,
ScheduleID: &schedule.ID,
ClientID: actorObj.ID,
ClientID: clientID,
WalletID: &walletObj.ID,
ProofSetID: &proofSetIDCopy,
}

Expand Down
2 changes: 2 additions & 0 deletions service/dealpusher/pdp_wiring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,5 +167,7 @@ func TestDealPusher_RunSchedule_PDPWithDependenciesCreatesDealsAfterConfirmation
require.Equal(t, model.DealProposed, deals[0].State)
require.NotNil(t, deals[0].ProofSetID)
require.Equal(t, uint64(42), *deals[0].ProofSetID)
require.NotNil(t, deals[0].WalletID)
require.Equal(t, wallet.ID, *deals[0].WalletID)
})
}
17 changes: 17 additions & 0 deletions service/dealtracker/dealtracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,18 @@ func (d *DealTracker) runOnce(ctx context.Context) error {
actorIDs[actor.ID] = struct{}{}
}

// build actor → wallet mapping for setting wallet_id on discovered deals
var wallets []model.Wallet
if err := db.Where("actor_id IS NOT NULL").Find(&wallets).Error; err != nil {
return errors.Wrap(err, "failed to get wallets from database")
}
actorWalletIDs := make(map[string]uint, len(wallets))
for _, w := range wallets {
if w.ActorID != nil {
actorWalletIDs[*w.ActorID] = w.ID
}
}

knownDeals := make(map[uint64]model.DealState)
rows, err := db.Model(&model.Deal{}).Where("deal_id IS NOT NULL").
Select("deal_id", "state").Rows()
Expand Down Expand Up @@ -552,12 +564,17 @@ func (d *DealTracker) runOnce(ctx context.Context) error {
if err != nil {
return errors.Wrapf(err, "failed to parse piece CID %s", deal.Proposal.PieceCID.Root)
}
var walletID *uint
if wid, ok := actorWalletIDs[deal.Proposal.Client]; ok {
walletID = &wid
}
err = database.DoRetry(ctx, func() error {
return db.Create(&model.Deal{
DealID: &dealID,
State: newState,
DealType: model.DealTypeMarket,
ClientID: deal.Proposal.Client,
WalletID: walletID,
Provider: deal.Proposal.Provider,
Label: deal.Proposal.Label,
PieceCID: model.CID(root),
Expand Down
Loading