diff --git a/cmd/run/dealpusher.go b/cmd/run/dealpusher.go index d810f3c9..e5da075c 100644 --- a/cmd/run/dealpusher.go +++ b/cmd/run/dealpusher.go @@ -1,6 +1,8 @@ package run import ( + "time" + "github.com/cockroachdb/errors" "github.com/data-preservation-programs/singularity/database" "github.com/data-preservation-programs/singularity/service" @@ -25,6 +27,26 @@ var DealPusherCmd = &cli.Command{ Aliases: []string{"M"}, DefaultText: "Unlimited", }, + &cli.IntFlag{ + Name: "pdp-batch-size", + Usage: "Number of roots to include in each PDP add-roots transaction", + Value: 128, + }, + &cli.Uint64Flag{ + Name: "pdp-gas-limit", + Usage: "Gas limit for PDP on-chain transactions", + Value: 5000000, + }, + &cli.Uint64Flag{ + Name: "pdp-confirmation-depth", + Usage: "Number of block confirmations required for PDP transactions", + Value: 5, + }, + &cli.DurationFlag{ + Name: "pdp-poll-interval", + Usage: "Polling interval for PDP transaction confirmation checks", + Value: 30 * time.Second, + }, }, Action: func(c *cli.Context) error { db, closer, err := database.OpenFromCLI(c) @@ -39,7 +61,24 @@ var DealPusherCmd = &cli.Command{ return errors.WithStack(err) } - dm, err := dealpusher.NewDealPusher(db, c.String("lotus-api"), c.String("lotus-token"), c.Uint("deal-attempts"), c.Uint("max-replication-factor")) + pdpCfg := dealpusher.PDPSchedulingConfig{ + BatchSize: c.Int("pdp-batch-size"), + GasLimit: c.Uint64("pdp-gas-limit"), + ConfirmationDepth: c.Uint64("pdp-confirmation-depth"), + PollingInterval: c.Duration("pdp-poll-interval"), + } + if err := pdpCfg.Validate(); err != nil { + return errors.WithStack(err) + } + + dm, err := dealpusher.NewDealPusher( + db, + c.String("lotus-api"), + c.String("lotus-token"), + c.Uint("deal-attempts"), + c.Uint("max-replication-factor"), + dealpusher.WithPDPSchedulingConfig(pdpCfg), + ) if err != nil { return errors.WithStack(err) } diff --git a/docs/en/cli-reference/run/deal-pusher.md b/docs/en/cli-reference/run/deal-pusher.md index 0a9b1bcb..4718d372 100644 --- a/docs/en/cli-reference/run/deal-pusher.md +++ b/docs/en/cli-reference/run/deal-pusher.md @@ -11,6 +11,10 @@ USAGE: OPTIONS: --deal-attempts value, -d value Number of times to attempt a deal before giving up (default: 3) --max-replication-factor value, -M value Max number of replicas for each individual PieceCID across all clients and providers (default: Unlimited) + --pdp-batch-size value Number of roots to include in each PDP add-roots transaction (default: 128) + --pdp-gas-limit value Gas limit for PDP on-chain transactions (default: 5000000) + --pdp-confirmation-depth value Number of block confirmations required for PDP transactions (default: 5) + --pdp-poll-interval value Polling interval for PDP transaction confirmation checks (default: 30s) --help, -h show help ``` {% endcode %} diff --git a/service/dealpusher/dealpusher.go b/service/dealpusher/dealpusher.go index 0009ff0d..ff9120b0 100644 --- a/service/dealpusher/dealpusher.go +++ b/service/dealpusher/dealpusher.go @@ -16,7 +16,6 @@ import ( "github.com/data-preservation-programs/singularity/service/healthcheck" "github.com/data-preservation-programs/singularity/util" "github.com/data-preservation-programs/singularity/util/keystore" - "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/crypto" "github.com/google/uuid" "github.com/ipfs/go-cid" @@ -458,39 +457,6 @@ func (d *DealPusher) resolveScheduleDealType(schedule *model.Schedule) model.Dea return d.scheduleDealTypeResolver(schedule) } -func defaultPDPSchedulingConfig() PDPSchedulingConfig { - return PDPSchedulingConfig{ - BatchSize: 128, - GasLimit: 5_000_000, - ConfirmationDepth: 5, - PollingInterval: 30 * time.Second, - } -} - -// inferScheduleDealType uses the provider address protocol as the discriminator: -// delegated (f4) addresses are FEVM contracts that speak PDP, everything else -// is a traditional miner actor that speaks market deals. -func inferScheduleDealType(schedule *model.Schedule) model.DealType { - if schedule == nil { - return model.DealTypeMarket - } - providerAddr, err := address.NewFromString(schedule.Provider) - if err != nil { - return model.DealTypeMarket - } - if providerAddr.Protocol() == address.Delegated { - return model.DealTypePDP - } - return model.DealTypeMarket -} - -func (d *DealPusher) runPDPSchedule(_ context.Context, _ *model.Schedule) (model.ScheduleState, error) { - if d.pdpProofSetManager == nil || d.pdpTxConfirmer == nil { - return model.ScheduleError, errors.New("pdp scheduling dependencies are not configured") - } - return model.ScheduleError, errors.New("pdp scheduling path is not implemented") -} - func NewDealPusher(db *gorm.DB, lotusURL string, lotusToken string, numAttempts uint, maxReplicas uint, opts ...Option, ) (*DealPusher, error) { diff --git a/service/dealpusher/pdp_schedule.go b/service/dealpusher/pdp_schedule.go new file mode 100644 index 00000000..5bd25e46 --- /dev/null +++ b/service/dealpusher/pdp_schedule.go @@ -0,0 +1,309 @@ +package dealpusher + +import ( + "context" + "time" + + "github.com/cockroachdb/errors" + "github.com/data-preservation-programs/singularity/database" + "github.com/data-preservation-programs/singularity/handler/wallet" + "github.com/data-preservation-programs/singularity/model" + "github.com/data-preservation-programs/singularity/util" + "github.com/data-preservation-programs/singularity/util/keystore" + "github.com/filecoin-project/go-address" + "github.com/ipfs/go-cid" + "github.com/rjNemo/underscore" + "gorm.io/gorm" +) + +func defaultPDPSchedulingConfig() PDPSchedulingConfig { + return PDPSchedulingConfig{ + BatchSize: 128, + GasLimit: 5_000_000, + ConfirmationDepth: 5, + PollingInterval: 30 * time.Second, + } +} + +func inferScheduleDealType(schedule *model.Schedule) model.DealType { + if schedule == nil { + return model.DealTypeMarket + } + providerAddr, err := address.NewFromString(schedule.Provider) + if err != nil { + return model.DealTypeMarket + } + if providerAddr.Protocol() == address.Delegated { + return model.DealTypePDP + } + return model.DealTypeMarket +} + +func (d *DealPusher) runPDPSchedule(ctx context.Context, schedule *model.Schedule) (model.ScheduleState, error) { + if d.pdpProofSetManager == nil || d.pdpTxConfirmer == nil { + return model.ScheduleError, errors.New("pdp scheduling dependencies are not configured") + } + if err := d.pdpSchedulingConfig.Validate(); err != nil { + return model.ScheduleError, errors.Wrap(err, "invalid PDP scheduling configuration") + } + + db := d.dbNoContext.WithContext(ctx) + var attachments []model.SourceAttachment + if err := db.Model(&model.SourceAttachment{}). + Where("preparation_id = ?", schedule.PreparationID). + Find(&attachments).Error; err != nil { + return model.ScheduleError, errors.Wrap(err, "failed to find attachments") + } + + allowedPieceCIDs := make([]model.CID, 0, len(schedule.AllowedPieceCIDs)) + for _, c := range schedule.AllowedPieceCIDs { + parsed, err := cid.Parse(c) + if err != nil { + return model.ScheduleError, errors.Wrapf(err, "failed to parse CID %s", c) + } + allowedPieceCIDs = append(allowedPieceCIDs, model.CID(parsed)) + } + + overReplicatedCIDs := db. + Table("deals"). + Select("piece_cid"). + Where("state in ?", []model.DealState{model.DealProposed, model.DealPublished, model.DealActive}). + Group("piece_cid"). + Having("count(*) >= ?", d.maxReplicas) + + var timer *time.Timer + current := sumResult{} + for { + if ctx.Err() != nil { + return "", nil + } + + pending, total, err := d.getPDPScheduleCounts(ctx, schedule) + if err != nil { + return model.ScheduleError, err + } + + shouldWait := false + if schedule.MaxPendingDealNumber > 0 && pending.DealNumber >= schedule.MaxPendingDealNumber { + shouldWait = true + } + if schedule.MaxPendingDealSize > 0 && pending.DealSize >= schedule.MaxPendingDealSize { + shouldWait = true + } + if shouldWait { + if timer == nil { + timer = time.NewTimer(waitPendingInterval) + defer timer.Stop() + } else { + timer.Reset(waitPendingInterval) + } + select { + case <-ctx.Done(): + return "", nil + case <-timer.C: + } + continue + } + if schedule.TotalDealNumber > 0 && total.DealNumber >= schedule.TotalDealNumber { + return model.ScheduleCompleted, nil + } + if schedule.TotalDealSize > 0 && total.DealSize >= schedule.TotalDealSize { + return model.ScheduleCompleted, nil + } + if schedule.ScheduleCron != "" && schedule.ScheduleDealNumber > 0 && current.DealNumber >= schedule.ScheduleDealNumber { + return "", nil + } + if schedule.ScheduleCron != "" && schedule.ScheduleDealSize > 0 && current.DealSize >= schedule.ScheduleDealSize { + return "", nil + } + + cars, err := d.findPDPCars(ctx, schedule, attachments, allowedPieceCIDs, overReplicatedCIDs, d.pdpSchedulingConfig.BatchSize) + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + if schedule.ScheduleCron != "" && schedule.ScheduleCronPerpetual { + return "", nil + } + return model.ScheduleCompleted, nil + } + return model.ScheduleError, err + } + if len(cars) == 0 { + if schedule.ScheduleCron != "" && schedule.ScheduleCronPerpetual { + return "", nil + } + return model.ScheduleCompleted, nil + } + + walletObj, err := d.walletChooser.Choose(ctx, schedule.Preparation.Wallets) + if err != nil { + return model.ScheduleError, errors.Wrap(err, "failed to choose wallet") + } + actorObj, err := wallet.GetOrCreateActor(ctx, db, d.lotusClient, &walletObj) + if err != nil { + return model.ScheduleError, errors.Wrapf(err, "failed to resolve actor for wallet %s", walletObj.Address) + } + + evmSigner, err := keystore.EVMSigner(d.keyStore, walletObj) + if err != nil { + return model.ScheduleError, errors.Wrap(err, "failed to load EVM signer for wallet") + } + + proofSetID, err := d.pdpProofSetManager.EnsureProofSet(ctx, evmSigner, schedule.Provider) + if err != nil { + return model.ScheduleError, errors.Wrap(err, "failed to ensure PDP proof set") + } + + pieceCIDs := make([]cid.Cid, 0, len(cars)) + for _, car := range cars { + pieceCIDs = append(pieceCIDs, cid.Cid(car.PieceCID)) + } + queuedTx, err := d.pdpProofSetManager.QueueAddRoots(ctx, evmSigner, proofSetID, pieceCIDs, d.pdpSchedulingConfig) + if err != nil { + return model.ScheduleError, errors.Wrap(err, "failed to queue PDP root addition transaction") + } + + _, err = d.pdpTxConfirmer.WaitForConfirmations(ctx, queuedTx.Hash, d.pdpSchedulingConfig.ConfirmationDepth, d.pdpSchedulingConfig.PollingInterval) + if err != nil { + return model.ScheduleError, errors.Wrap(err, "failed waiting for PDP transaction confirmation") + } + + for _, car := range cars { + proofSetIDCopy := proofSetID + dealModel := &model.Deal{ + State: model.DealProposed, + DealType: model.DealTypePDP, + Provider: schedule.Provider, + PieceCID: car.PieceCID, + PieceSize: car.PieceSize, + Verified: schedule.Verified, + ScheduleID: &schedule.ID, + ClientID: actorObj.ID, + ProofSetID: &proofSetIDCopy, + } + + if err := database.DoRetry(ctx, func() error { return db.Create(dealModel).Error }); err != nil { + return model.ScheduleError, errors.Wrap(err, "failed to create PDP deal") + } + current.DealNumber++ + current.DealSize += car.PieceSize + } + continue + } +} + +func (d *DealPusher) getPDPScheduleCounts(ctx context.Context, schedule *model.Schedule) (sumResult, sumResult, error) { + db := d.dbNoContext.WithContext(ctx) + var pending sumResult + err := db.Model(&model.Deal{}). + Where("schedule_id = ? AND deal_type = ? AND state IN (?)", schedule.ID, model.DealTypePDP, []model.DealState{ + model.DealProposed, model.DealPublished, + }). + Select("COUNT(*) AS deal_number, SUM(piece_size) AS deal_size"). + Scan(&pending).Error + if err != nil { + return sumResult{}, sumResult{}, errors.Wrap(err, "failed to count pending PDP deals") + } + + var total sumResult + err = db.Model(&model.Deal{}). + Where("schedule_id = ? AND deal_type = ? AND state IN (?)", schedule.ID, model.DealTypePDP, []model.DealState{ + model.DealActive, model.DealProposed, model.DealPublished, + }). + Select("COUNT(*) AS deal_number, SUM(piece_size) AS deal_size"). + Scan(&total).Error + if err != nil { + return sumResult{}, sumResult{}, errors.Wrap(err, "failed to count total PDP deals") + } + + return pending, total, nil +} + +func (d *DealPusher) findPDPCars( + ctx context.Context, + schedule *model.Schedule, + attachments []model.SourceAttachment, + allowedPieceCIDs []model.CID, + overReplicatedCIDs *gorm.DB, + limit int, +) ([]model.Car, error) { + db := d.dbNoContext.WithContext(ctx) + attachmentIDs := underscore.Map(attachments, func(a model.SourceAttachment) uint32 { return uint32(a.ID) }) + existingPieceCIDQuery := db.Table("deals").Select("piece_cid"). + Where("provider = ? AND deal_type = ? AND state IN (?)", + schedule.Provider, + model.DealTypePDP, + []model.DealState{ + model.DealProposed, model.DealPublished, model.DealActive, + }). + Where("piece_cid IS NOT NULL") + if schedule.Force { + existingPieceCIDQuery = db.Table("deals").Select("piece_cid"). + Where("schedule_id = ? AND deal_type = ?", schedule.ID, model.DealTypePDP). + Where("piece_cid IS NOT NULL") + } + + var existingPieceCIDs []model.CID + if err := existingPieceCIDQuery.Find(&existingPieceCIDs).Error; err != nil { + return nil, errors.Wrap(err, "failed to query existing PDP piece CIDs") + } + existingSet := make(map[string]struct{}, len(existingPieceCIDs)) + for _, existing := range existingPieceCIDs { + existingSet[cid.Cid(existing).String()] = struct{}{} + } + + baseQuery := func() *gorm.DB { + query := db.Where("attachment_id IN ?", attachmentIDs) + if d.maxReplicas > 0 && !schedule.Force { + query = query.Where("piece_cid NOT IN (?)", overReplicatedCIDs) + } + return query + } + + if len(allowedPieceCIDs) == 0 { + var cars []model.Car + if err := baseQuery().Find(&cars).Error; err != nil { + return nil, errors.Wrap(err, "failed to find PDP cars") + } + filtered := make([]model.Car, 0, limit) + for _, car := range cars { + if _, exists := existingSet[cid.Cid(car.PieceCID).String()]; exists { + continue + } + filtered = append(filtered, car) + if len(filtered) >= limit { + break + } + } + if len(filtered) == 0 { + return nil, gorm.ErrRecordNotFound + } + return filtered, nil + } + + cars := make([]model.Car, 0, limit) + pieceCIDChunks := util.ChunkSlice(allowedPieceCIDs, util.BatchSize) + for _, pieceCIDChunk := range pieceCIDChunks { + if len(cars) >= limit { + break + } + var chunkCars []model.Car + if err := baseQuery(). + Where("piece_cid IN ?", pieceCIDChunk). + Find(&chunkCars).Error; err != nil { + return nil, errors.Wrap(err, "failed to find PDP cars by allowed piece CID") + } + for _, car := range chunkCars { + if _, exists := existingSet[cid.Cid(car.PieceCID).String()]; exists { + continue + } + cars = append(cars, car) + if len(cars) >= limit { + break + } + } + } + if len(cars) == 0 { + return nil, gorm.ErrRecordNotFound + } + return cars, nil +} diff --git a/service/dealpusher/pdp_wiring_test.go b/service/dealpusher/pdp_wiring_test.go index 2001d1bf..e22f8343 100644 --- a/service/dealpusher/pdp_wiring_test.go +++ b/service/dealpusher/pdp_wiring_test.go @@ -7,24 +7,42 @@ import ( "github.com/data-preservation-programs/go-synapse/signer" "github.com/data-preservation-programs/singularity/model" + "github.com/data-preservation-programs/singularity/util/keystore" + "github.com/data-preservation-programs/singularity/util/testutil" "github.com/filecoin-project/go-address" "github.com/ipfs/go-cid" "github.com/stretchr/testify/require" + "gorm.io/gorm" ) -type noopPDPProofSetManager struct{} +type fixedWalletChooser struct { + wallet model.Wallet +} + +func (c fixedWalletChooser) Choose(_ context.Context, _ []model.Wallet) (model.Wallet, error) { + return c.wallet, nil +} -func (noopPDPProofSetManager) EnsureProofSet(_ context.Context, _ signer.EVMSigner, _ string) (uint64, error) { - return 1, nil +type proofSetManagerMock struct { + proofSetID uint64 + pieceCIDs []cid.Cid } -func (noopPDPProofSetManager) QueueAddRoots(_ context.Context, _ signer.EVMSigner, _ uint64, _ []cid.Cid, _ PDPSchedulingConfig) (*PDPQueuedTx, error) { - return &PDPQueuedTx{Hash: "0x1"}, nil +func (m *proofSetManagerMock) EnsureProofSet(_ context.Context, _ signer.EVMSigner, _ string) (uint64, error) { + return m.proofSetID, nil } -type noopPDPTransactionConfirmer struct{} +func (m *proofSetManagerMock) QueueAddRoots(_ context.Context, _ signer.EVMSigner, _ uint64, pieceCIDs []cid.Cid, _ PDPSchedulingConfig) (*PDPQueuedTx, error) { + m.pieceCIDs = append([]cid.Cid(nil), pieceCIDs...) + return &PDPQueuedTx{Hash: "0xabc"}, nil +} + +type txConfirmerMock struct { + txHash string +} -func (noopPDPTransactionConfirmer) WaitForConfirmations(_ context.Context, txHash string, _ uint64, _ time.Duration) (*PDPTransactionReceipt, error) { +func (m *txConfirmerMock) WaitForConfirmations(_ context.Context, txHash string, _ uint64, _ time.Duration) (*PDPTransactionReceipt, error) { + m.txHash = txHash return &PDPTransactionReceipt{Hash: txHash}, nil } @@ -55,17 +73,99 @@ func TestDealPusher_RunSchedule_PDPWithoutDependenciesReturnsConfiguredError(t * require.Contains(t, err.Error(), "pdp scheduling dependencies are not configured") } -func TestDealPusher_RunSchedule_PDPWithDependenciesReturnsNotImplemented(t *testing.T) { - d := &DealPusher{ - pdpProofSetManager: noopPDPProofSetManager{}, - pdpTxConfirmer: noopPDPTransactionConfirmer{}, - scheduleDealTypeResolver: func(_ *model.Schedule) model.DealType { - return model.DealTypePDP - }, - } +func TestDealPusher_RunSchedule_PDPWithDependenciesCreatesDealsAfterConfirmation(t *testing.T) { + testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { + clientSubaddr := make([]byte, 20) + clientSubaddr[19] = 10 + clientAddr, err := address.NewDelegatedAddress(10, clientSubaddr) + require.NoError(t, err) + providerSubaddr := make([]byte, 20) + providerSubaddr[19] = 20 + providerAddr, err := address.NewDelegatedAddress(10, providerSubaddr) + require.NoError(t, err) - state, err := d.runSchedule(context.Background(), &model.Schedule{}) - require.Error(t, err) - require.Equal(t, model.ScheduleError, state) - require.Contains(t, err.Error(), "pdp scheduling path is not implemented") + prep := model.Preparation{Name: "prep"} + require.NoError(t, db.Create(&prep).Error) + require.NotZero(t, prep.ID) + + ks, err := keystore.NewLocalKeyStore(t.TempDir()) + require.NoError(t, err) + keyPath, _, err := ks.Put(testutil.TestPrivateKeyHex) + require.NoError(t, err) + + actorID := "f01001" + require.NoError(t, db.Create(&model.Actor{ID: actorID, Address: clientAddr.String()}).Error) + wallet := model.Wallet{ + Address: clientAddr.String(), + KeyPath: keyPath, + KeyStore: "local", + ActorID: &actorID, + } + require.NoError(t, db.Create(&wallet).Error) + require.NoError(t, db.Model(&prep).Association("Wallets").Append(&wallet)) + storage := model.Storage{Name: "src-storage"} + require.NoError(t, db.Create(&storage).Error) + require.NotZero(t, storage.ID) + attachment := model.SourceAttachment{PreparationID: prep.ID, StorageID: storage.ID} + require.NoError(t, db.Create(&attachment).Error) + require.NotZero(t, attachment.ID) + + pieceCID := model.CID(calculateCommp(t, generateRandomBytes(1000), 1024)) + car := model.Car{ + AttachmentID: &attachment.ID, + PreparationID: &prep.ID, + PieceCID: pieceCID, + PieceSize: 1024, + StoragePath: "car-1", + } + require.NoError(t, db.Create(&car).Error) + + schedule := model.Schedule{ + PreparationID: prep.ID, + State: model.ScheduleActive, + Provider: providerAddr.String(), + TotalDealNumber: 1, + } + require.NoError(t, db.Create(&schedule).Error) + schedule.Preparation = &model.Preparation{Wallets: []model.Wallet{wallet}} + + psm := &proofSetManagerMock{proofSetID: 42} + conf := &txConfirmerMock{} + d := &DealPusher{ + dbNoContext: db, + keyStore: ks, + walletChooser: fixedWalletChooser{wallet: wallet}, + pdpProofSetManager: psm, + pdpTxConfirmer: conf, + pdpSchedulingConfig: defaultPDPSchedulingConfig(), + scheduleDealTypeResolver: func(_ *model.Schedule) model.DealType { return model.DealTypePDP }, + } + var attachments []model.SourceAttachment + require.NoError(t, db.Where("preparation_id = ?", schedule.PreparationID).Find(&attachments).Error) + require.Len(t, attachments, 1) + overReplicatedCIDs := db. + Table("deals"). + Select("piece_cid"). + Where("state in ?", []model.DealState{model.DealProposed, model.DealPublished, model.DealActive}). + Group("piece_cid"). + Having("count(*) >= ?", d.maxReplicas) + cars, err := d.findPDPCars(ctx, &schedule, attachments, nil, overReplicatedCIDs, d.pdpSchedulingConfig.BatchSize) + require.NoError(t, err) + require.Len(t, cars, 1) + + state, err := d.runSchedule(ctx, &schedule) + require.NoError(t, err) + require.Equal(t, model.ScheduleCompleted, state) + require.Equal(t, "0xabc", conf.txHash) + require.Len(t, psm.pieceCIDs, 1) + require.Equal(t, cid.Cid(pieceCID), psm.pieceCIDs[0]) + + var deals []model.Deal + require.NoError(t, db.Where("schedule_id = ?", schedule.ID).Find(&deals).Error) + require.Len(t, deals, 1) + require.Equal(t, model.DealTypePDP, deals[0].DealType) + require.Equal(t, model.DealProposed, deals[0].State) + require.NotNil(t, deals[0].ProofSetID) + require.Equal(t, uint64(42), *deals[0].ProofSetID) + }) }