From 60a418938bf63278cec74eead5e0f574eeef8b26 Mon Sep 17 00:00:00 2001 From: Arkadiy Kukarkin Date: Wed, 25 Feb 2026 14:15:23 +0100 Subject: [PATCH] add Deal.WalletID, decouple from Actor FK MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Deal.ClientID remains a plain string for on-chain deal matching (Key()) but no longer has a FK constraint to the actors table. New WalletID *uint FK points to wallets.id, letting PDP deals reference the originating wallet even when no f0 actor exists yet. - model: add WalletID + Wallet FK on Deal, remove Actor FK - migrate: backfill wallet_id from client_id→actor→wallet, drop fk_deals_actor constraint - all deal creation sites set WalletID (dealpusher, pdp_schedule, send-manual, dealtracker, eventprocessor) - datacap query uses wallet_id instead of client_id - pdp_schedule no longer requires actor to exist for PDP deals --- client/swagger/models/model_deal.go | 3 + docs/swagger/docs.go | 3 + docs/swagger/swagger.json | 3 + docs/swagger/swagger.yaml | 2 + handler/deal/send-manual.go | 1 + handler/deal/send-manual_test.go | 21 ++---- handler/file/deals_test.go | 3 - handler/file/retrieve_test.go | 4 -- model/migrate.go | 96 +++++++++++++++++++++++++++ model/replication.go | 3 +- replication/makedeal.go | 2 + replication/wallet.go | 5 +- replication/wallet_test.go | 1 + service/dealpusher/dealpusher.go | 1 + service/dealpusher/pdp_schedule.go | 13 ++-- service/dealpusher/pdp_wiring_test.go | 2 + service/dealtracker/dealtracker.go | 17 +++++ service/pdptracker/eventprocessor.go | 11 +++ 18 files changed, 159 insertions(+), 32 deletions(-) diff --git a/client/swagger/models/model_deal.go b/client/swagger/models/model_deal.go index 74f3bcb2..7e84ecd2 100644 --- a/client/swagger/models/model_deal.go +++ b/client/swagger/models/model_deal.go @@ -87,6 +87,9 @@ type ModelDeal struct { // verified Verified bool `json:"verified,omitempty"` + + // wallet Id + WalletID int64 `json:"walletId,omitempty"` } // Validate validates this model deal diff --git a/docs/swagger/docs.go b/docs/swagger/docs.go index 52e9c221..1e029e97 100644 --- a/docs/swagger/docs.go +++ b/docs/swagger/docs.go @@ -6560,6 +6560,9 @@ const docTemplate = `{ }, "verified": { "type": "boolean" + }, + "walletId": { + "type": "integer" } } }, diff --git a/docs/swagger/swagger.json b/docs/swagger/swagger.json index d8689f27..5dbc79de 100644 --- a/docs/swagger/swagger.json +++ b/docs/swagger/swagger.json @@ -6553,6 +6553,9 @@ }, "verified": { "type": "boolean" + }, + "walletId": { + "type": "integer" } } }, diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index d9d366c9..104ad957 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -445,6 +445,8 @@ definitions: type: string verified: type: boolean + walletId: + type: integer type: object model.DealState: enum: diff --git a/handler/deal/send-manual.go b/handler/deal/send-manual.go index 246e839c..fc0f34f8 100644 --- a/handler/deal/send-manual.go +++ b/handler/deal/send-manual.go @@ -140,6 +140,7 @@ func (DefaultHandler) SendManualHandler( AnnounceToIPNI: request.IPNI, StartDelay: startDelay, Duration: duration, + WalletID: &walletObj.ID, } // resolve actor lazily — only makes RPC call if ActorID not yet linked diff --git a/handler/deal/send-manual_test.go b/handler/deal/send-manual_test.go index 08318b91..a77e7785 100644 --- a/handler/deal/send-manual_test.go +++ b/handler/deal/send-manual_test.go @@ -3,7 +3,6 @@ package deal import ( "context" "testing" - "time" "github.com/data-preservation-programs/singularity/handler/handlererror" "github.com/data-preservation-programs/singularity/model" @@ -160,19 +159,13 @@ func TestSendManualHandler(t *testing.T) { createTestWalletAndActor(t, db, true) mockDealMaker := new(MockDealMaker) - mockDealMaker.On("MakeDeal", mock.Anything, actor, mock.Anything, replication.DealConfig{ - Provider: proposal.ProviderID, - StartDelay: 24 * time.Hour, - Duration: 2400 * time.Hour, - Verified: proposal.Verified, - HTTPHeaders: map[string]string{"a": "b"}, - URLTemplate: proposal.URLTemplate, - KeepUnsealed: proposal.KeepUnsealed, - AnnounceToIPNI: proposal.IPNI, - PricePerDeal: proposal.PricePerDeal, - PricePerGB: proposal.PricePerGB, - PricePerGBEpoch: proposal.PricePerGBEpoch, - }).Return(&model.Deal{}, nil) + mockDealMaker.On("MakeDeal", mock.Anything, actor, mock.Anything, + mock.MatchedBy(func(dc replication.DealConfig) bool { + return dc.Provider == proposal.ProviderID && + dc.Verified == proposal.Verified && + dc.WalletID != nil + }), + ).Return(&model.Deal{}, nil) // lotusClient is nil — GetOrCreateActor won't call lotus because ActorID is already set resp, err := Default.SendManualHandler(ctx, db, nil, nil, mockDealMaker, proposal) mockDealMaker.AssertExpectations(t) diff --git a/handler/file/deals_test.go b/handler/file/deals_test.go index 84ea2c11..91516ff4 100644 --- a/handler/file/deals_test.go +++ b/handler/file/deals_test.go @@ -77,13 +77,10 @@ func TestGetFileDealsHandler(t *testing.T) { deals := []model.Deal{{ PieceCID: model.CID(testCid1), - Actor: &model.Actor{}, }, { PieceCID: model.CID(testCid2), - Actor: &model.Actor{}, }, { PieceCID: model.CID(testCid2), - Actor: &model.Actor{}, }} err = db.Create(deals).Error require.NoError(t, err) diff --git a/handler/file/retrieve_test.go b/handler/file/retrieve_test.go index 8114b50c..1261d2ec 100644 --- a/handler/file/retrieve_test.go +++ b/handler/file/retrieve_test.go @@ -144,7 +144,6 @@ func TestRetrieveFileHandler(t *testing.T) { State: model.DealActive, PieceCID: model.CID(testCid), Provider: "apples" + strconv.Itoa(i), - Actor: &model.Actor{}, } err = db.Create(&deal).Error require.NoError(t, err) @@ -158,7 +157,6 @@ func TestRetrieveFileHandler(t *testing.T) { State: state, PieceCID: model.CID(testCid), Provider: "oranges" + strconv.Itoa(i), - Actor: &model.Actor{}, } err = db.Create(&deal).Error require.NoError(t, err) @@ -489,7 +487,6 @@ func BenchmarkFilecoinRetrieve(b *testing.B) { State: model.DealActive, PieceCID: model.CID(testCid), Provider: "apples" + strconv.Itoa(i), - Actor: &model.Actor{}, } err = db.Create(&deal).Error require.NoError(b, err) @@ -502,7 +499,6 @@ func BenchmarkFilecoinRetrieve(b *testing.B) { State: state, PieceCID: model.CID(testCid), Provider: "oranges" + strconv.Itoa(i), - Actor: &model.Actor{}, } err = db.Create(&deal).Error require.NoError(b, err) diff --git a/model/migrate.go b/model/migrate.go index f1ca258c..6883b500 100644 --- a/model/migrate.go +++ b/model/migrate.go @@ -124,6 +124,16 @@ func AutoMigrate(db *gorm.DB) error { return errors.Wrap(err, "failed to infer deal types") } + // Drop legacy fk_deals_actor constraint (Deal.ClientID no longer FKs to Actor) + if err := dropDealActorFK(db); err != nil { + return errors.Wrap(err, "failed to drop deal-actor FK") + } + + // Backfill wallet_id for deals that predate the column + if err := backfillDealWalletID(db); err != nil { + return errors.Wrap(err, "failed to backfill deal wallet IDs") + } + return nil } @@ -344,6 +354,92 @@ func inferDealTypes(db *gorm.DB) error { return nil } +// dropDealActorFK removes the legacy fk_deals_actor constraint if it exists. +// Deal.ClientID is now a plain string (no FK to actors table). +func dropDealActorFK(db *gorm.DB) error { + dialect := db.Dialector.Name() + if dialect == "sqlite" { + return nil + } + + constraint := "fk_deals_actor" + var exists bool + + if dialect == "postgres" { + err := db.Raw(` + SELECT EXISTS ( + SELECT 1 FROM information_schema.table_constraints + WHERE table_name = 'deals' AND constraint_name = ? + )`, constraint).Scan(&exists).Error + if err != nil { + return errors.Wrapf(err, "failed to check constraint %s", constraint) + } + } else if dialect == "mysql" { + err := db.Raw(` + SELECT COUNT(*) > 0 FROM information_schema.TABLE_CONSTRAINTS + WHERE TABLE_NAME = 'deals' AND CONSTRAINT_NAME = ? + `, constraint).Scan(&exists).Error + if err != nil { + return errors.Wrapf(err, "failed to check constraint %s", constraint) + } + } + + if !exists { + return nil + } + + logger.Infow("dropping legacy deal-actor FK constraint", "constraint", constraint) + if dialect == "postgres" { + return db.Exec(`ALTER TABLE deals DROP CONSTRAINT ` + constraint).Error + } + // mysql + return db.Exec(`ALTER TABLE deals DROP FOREIGN KEY ` + constraint).Error +} + +// backfillDealWalletID sets wallet_id for existing deals that have a client_id +// matching an actor linked to a wallet. idempotent — only touches NULL wallet_id rows. +func backfillDealWalletID(db *gorm.DB) error { + var count int64 + err := db.Raw(`SELECT COUNT(*) FROM deals WHERE wallet_id IS NULL AND client_id != ''`).Scan(&count).Error + if err != nil { + logger.Debugw("skipping wallet_id backfill", "error", err) + return nil + } + if count == 0 { + return nil + } + + logger.Infow("backfilling deal wallet_id from client_id → actor → wallet", "count", count) + + dialect := db.Dialector.Name() + var query string + if dialect == "sqlite" { + query = ` + UPDATE deals SET wallet_id = ( + SELECT w.id FROM wallets w + WHERE w.actor_id = deals.client_id + LIMIT 1 + ) WHERE wallet_id IS NULL AND client_id != '' + AND EXISTS (SELECT 1 FROM wallets w WHERE w.actor_id = deals.client_id)` + } else { + query = ` + UPDATE deals d + SET wallet_id = ( + SELECT w.id FROM wallets w + WHERE w.actor_id = d.client_id + LIMIT 1 + ) WHERE d.wallet_id IS NULL AND d.client_id != '' + AND EXISTS (SELECT 1 FROM wallets w WHERE w.actor_id = d.client_id)` + } + + result := db.Exec(query) + if result.Error != nil { + return errors.Wrap(result.Error, "failed to backfill wallet_id") + } + logger.Infow("backfilled deal wallet_id", "updated", result.RowsAffected) + return nil +} + // DropAll removes all tables specified in the Tables slice from the database. // // This function is typically used during development or testing where a clean database diff --git a/model/replication.go b/model/replication.go index 386fddb6..337811ec 100644 --- a/model/replication.go +++ b/model/replication.go @@ -123,7 +123,8 @@ type Deal struct { ScheduleID *ScheduleID `json:"scheduleId" table:"verbose"` Schedule *Schedule `gorm:"foreignKey:ScheduleID;constraint:OnDelete:SET NULL" json:"schedule,omitempty" swaggerignore:"true" table:"expand"` ClientID string `gorm:"index:idx_pending" json:"clientId"` - Actor *Actor `gorm:"foreignKey:ClientID;constraint:OnDelete:SET NULL" json:"actor,omitempty" swaggerignore:"true" table:"expand"` + WalletID *uint `gorm:"index:idx_deal_wallet" json:"walletId,omitempty" table:"verbose"` + Wallet *Wallet `gorm:"foreignKey:WalletID;constraint:OnDelete:SET NULL" json:"wallet,omitempty" swaggerignore:"true" table:"expand"` } // Key returns a mostly unique key to match deal from locally proposed deals and deals from the chain. diff --git a/replication/makedeal.go b/replication/makedeal.go index a8877e92..6c7f0ec1 100644 --- a/replication/makedeal.go +++ b/replication/makedeal.go @@ -459,6 +459,7 @@ type DealConfig struct { PricePerDeal float64 PricePerGB float64 PricePerGBEpoch float64 + WalletID *uint } // GetPrice calculates the price of a deal based on the size of the piece being stored, @@ -596,6 +597,7 @@ func (d DealMakerImpl) MakeDeal(ctx context.Context, actorObj model.Actor, dealModel := &model.Deal{ State: model.DealProposed, ClientID: actorObj.ID, + WalletID: dealConfig.WalletID, Provider: dealConfig.Provider, Label: cid.Cid(car.RootCID).String(), PieceCID: car.PieceCID, diff --git a/replication/wallet.go b/replication/wallet.go index 3a0aed0f..ffa7b73d 100644 --- a/replication/wallet.go +++ b/replication/wallet.go @@ -95,13 +95,10 @@ func (w DatacapWalletChooser) getDatacapCached(ctx context.Context, wallet model } func (w DatacapWalletChooser) getPendingDeals(ctx context.Context, wallet model.Wallet) (int64, error) { - if wallet.ActorID == nil { - return 0, nil - } var totalPieceSize int64 err := w.db.WithContext(ctx).Model(&model.Deal{}). Select("COALESCE(SUM(piece_size), 0)"). - Where("client_id = ? AND verified AND state = ?", *wallet.ActorID, model.DealProposed). + Where("wallet_id = ? AND verified AND state = ?", wallet.ID, model.DealProposed). Scan(&totalPieceSize). Error if err != nil { diff --git a/replication/wallet_test.go b/replication/wallet_test.go index 6fd5d820..cbec4ed9 100644 --- a/replication/wallet_test.go +++ b/replication/wallet_test.go @@ -89,6 +89,7 @@ func TestDatacapWalletChooser_Choose(t *testing.T) { require.NoError(t, db.Create(&model.Deal{ ClientID: "actorc", + WalletID: &wallets[2].ID, Verified: true, State: model.DealProposed, PieceSize: 500000, diff --git a/service/dealpusher/dealpusher.go b/service/dealpusher/dealpusher.go index ff9120b0..fa58d6cc 100644 --- a/service/dealpusher/dealpusher.go +++ b/service/dealpusher/dealpusher.go @@ -400,6 +400,7 @@ func (d *DealPusher) runSchedule(ctx context.Context, schedule *model.Schedule) PricePerDeal: schedule.PricePerDeal, PricePerGB: schedule.PricePerGB, PricePerGBEpoch: schedule.PricePerGBEpoch, + WalletID: &walletObj.ID, }, proposalSigner) if err != nil { diff --git a/service/dealpusher/pdp_schedule.go b/service/dealpusher/pdp_schedule.go index 5bd25e46..77372dc0 100644 --- a/service/dealpusher/pdp_schedule.go +++ b/service/dealpusher/pdp_schedule.go @@ -6,7 +6,6 @@ import ( "github.com/cockroachdb/errors" "github.com/data-preservation-programs/singularity/database" - "github.com/data-preservation-programs/singularity/handler/wallet" "github.com/data-preservation-programs/singularity/model" "github.com/data-preservation-programs/singularity/util" "github.com/data-preservation-programs/singularity/util/keystore" @@ -138,10 +137,6 @@ func (d *DealPusher) runPDPSchedule(ctx context.Context, schedule *model.Schedul if err != nil { return model.ScheduleError, errors.Wrap(err, "failed to choose wallet") } - actorObj, err := wallet.GetOrCreateActor(ctx, db, d.lotusClient, &walletObj) - if err != nil { - return model.ScheduleError, errors.Wrapf(err, "failed to resolve actor for wallet %s", walletObj.Address) - } evmSigner, err := keystore.EVMSigner(d.keyStore, walletObj) if err != nil { @@ -167,6 +162,11 @@ func (d *DealPusher) runPDPSchedule(ctx context.Context, schedule *model.Schedul return model.ScheduleError, errors.Wrap(err, "failed waiting for PDP transaction confirmation") } + clientID := "" + if walletObj.ActorID != nil { + clientID = *walletObj.ActorID + } + for _, car := range cars { proofSetIDCopy := proofSetID dealModel := &model.Deal{ @@ -177,7 +177,8 @@ func (d *DealPusher) runPDPSchedule(ctx context.Context, schedule *model.Schedul PieceSize: car.PieceSize, Verified: schedule.Verified, ScheduleID: &schedule.ID, - ClientID: actorObj.ID, + ClientID: clientID, + WalletID: &walletObj.ID, ProofSetID: &proofSetIDCopy, } diff --git a/service/dealpusher/pdp_wiring_test.go b/service/dealpusher/pdp_wiring_test.go index e22f8343..ab84dd85 100644 --- a/service/dealpusher/pdp_wiring_test.go +++ b/service/dealpusher/pdp_wiring_test.go @@ -167,5 +167,7 @@ func TestDealPusher_RunSchedule_PDPWithDependenciesCreatesDealsAfterConfirmation require.Equal(t, model.DealProposed, deals[0].State) require.NotNil(t, deals[0].ProofSetID) require.Equal(t, uint64(42), *deals[0].ProofSetID) + require.NotNil(t, deals[0].WalletID) + require.Equal(t, wallet.ID, *deals[0].WalletID) }) } diff --git a/service/dealtracker/dealtracker.go b/service/dealtracker/dealtracker.go index 26b6f2ce..b209458b 100644 --- a/service/dealtracker/dealtracker.go +++ b/service/dealtracker/dealtracker.go @@ -436,6 +436,18 @@ func (d *DealTracker) runOnce(ctx context.Context) error { actorIDs[actor.ID] = struct{}{} } + // build actor → wallet mapping for setting wallet_id on discovered deals + var wallets []model.Wallet + if err := db.Where("actor_id IS NOT NULL").Find(&wallets).Error; err != nil { + return errors.Wrap(err, "failed to get wallets from database") + } + actorWalletIDs := make(map[string]uint, len(wallets)) + for _, w := range wallets { + if w.ActorID != nil { + actorWalletIDs[*w.ActorID] = w.ID + } + } + knownDeals := make(map[uint64]model.DealState) rows, err := db.Model(&model.Deal{}).Where("deal_id IS NOT NULL"). Select("deal_id", "state").Rows() @@ -552,12 +564,17 @@ func (d *DealTracker) runOnce(ctx context.Context) error { if err != nil { return errors.Wrapf(err, "failed to parse piece CID %s", deal.Proposal.PieceCID.Root) } + var walletID *uint + if wid, ok := actorWalletIDs[deal.Proposal.Client]; ok { + walletID = &wid + } err = database.DoRetry(ctx, func() error { return db.Create(&model.Deal{ DealID: &dealID, State: newState, DealType: model.DealTypeMarket, ClientID: deal.Proposal.Client, + WalletID: walletID, Provider: deal.Proposal.Provider, Label: deal.Proposal.Label, PieceCID: model.CID(root), diff --git a/service/pdptracker/eventprocessor.go b/service/pdptracker/eventprocessor.go index aafe9ea8..0bd253a8 100644 --- a/service/pdptracker/eventprocessor.go +++ b/service/pdptracker/eventprocessor.go @@ -216,6 +216,16 @@ func reconcileProofSetPieces(ctx context.Context, db *gorm.DB, rpcClient *ChainP return err } + var walletID *uint + var wallet model.Wallet + if err := db.Where("actor_id = ?", actor.ID).First(&wallet).Error; err != nil { + if !errors.Is(err, gorm.ErrRecordNotFound) { + return errors.Wrapf(err, "failed to look up wallet for actor %s", actor.ID) + } + } else { + walletID = &wallet.ID + } + pieces, err := rpcClient.GetActivePieces(ctx, setID) if err != nil { return errors.Wrapf(err, "getActivePieces for set %d", setID) @@ -256,6 +266,7 @@ func reconcileProofSetPieces(ctx context.Context, db *gorm.DB, rpcClient *ChainP DealType: model.DealTypePDP, State: initialState, ClientID: actor.ID, + WalletID: walletID, Provider: ps.Provider, PieceCID: modelCID, ProofSetID: ptr.Of(setID),