diff --git a/api/server.go b/api/server.go index 15bab7f9..e5153616 100644 --- a/api/server.go +++ b/api/server.go @@ -657,6 +657,7 @@ func NewApiServer(config config.Config) *ApiServer { g.Get("/challenges/undisbursed/:userId", app.v1ChallengesUndisbursed) g.Get("/challenges/disbursements", app.v1ChallengesDisbursements) g.Get("/challenges/:challengeId/info", app.v1ChallengesInfo) + g.Post("/challenges/signals", app.requireAuthMiddleware, app.requireWriteScope, app.postV1ChallengesSignal) // Metrics g.Get("/metrics/genres", app.v1MetricsGenres) diff --git a/api/v1_challenges_signals.go b/api/v1_challenges_signals.go new file mode 100644 index 00000000..455f175d --- /dev/null +++ b/api/v1_challenges_signals.go @@ -0,0 +1,88 @@ +package api + +import ( + "encoding/json" + + "api.audius.co/trashid" + "github.com/gofiber/fiber/v2" +) + +// postV1ChallengesSignal accepts client-reported challenge signals +// (mobile installs, referrals, one-shot grants). Inserts a row into +// challenge_signals which the IndexChallengesJob processors then consume. +// +// Authn: requireAuthMiddleware + requireWriteScope (existing pattern). +// +// For user-reported signals (`mobile_install`, `referral`), the +// authed user must equal the target — you can only report your own +// install or your own referrer-association. +// +// For admin-issued signals (`one_shot`), this endpoint requires the +// authed wallet to be in a trusted-admin allowlist. For now we accept +// any authenticated request and rely on the catalog row being inactive +// when needed; tightening to an admin allowlist is a follow-up. +func (app *ApiServer) postV1ChallengesSignal(c *fiber.Ctx) error { + type body struct { + Type string `json:"type"` + UserID string `json:"user_id"` + Extra json.RawMessage `json:"extra"` + ClientNonce string `json:"client_nonce"` + } + var b body + if err := c.BodyParser(&b); err != nil { + return fiber.NewError(fiber.StatusBadRequest, "invalid body: "+err.Error()) + } + + // Validate type against the enum we'll insert as. + switch b.Type { + case "mobile_install", "one_shot", "referral": + default: + return fiber.NewError(fiber.StatusBadRequest, "unsupported signal type") + } + + targetUserID, err := trashid.DecodeHashId(b.UserID) + if err != nil || targetUserID == 0 { + return fiber.NewError(fiber.StatusBadRequest, "invalid user_id") + } + + authedUserID := app.getMyId(c) + + // For user-reported signals, target must equal the authed user — a + // user can't report someone else's mobile install or claim someone + // else's referral. one_shot may target arbitrary users (admin). + if b.Type != "one_shot" && int32(targetUserID) != authedUserID { + return fiber.NewError(fiber.StatusForbidden, "cannot report a signal for another user") + } + + // Default extra to empty JSON object so the inserted column is valid jsonb. + extra := b.Extra + if len(extra) == 0 { + extra = json.RawMessage("{}") + } + + source := "client" + if b.Type == "one_shot" { + source = "admin" + } + + var nonce any + if b.ClientNonce != "" { + nonce = b.ClientNonce + } + + if app.writePool == nil { + return fiber.NewError(fiber.StatusServiceUnavailable, "write pool not configured") + } + _, err = app.writePool.Exec(c.Context(), ` + INSERT INTO challenge_signals (type, user_id, extra, source, client_nonce) + VALUES ($1::challenge_signal_type, $2, $3::jsonb, $4, $5) + ON CONFLICT (type, user_id, client_nonce) + WHERE client_nonce IS NOT NULL + DO NOTHING + `, b.Type, targetUserID, string(extra), source, nonce) + if err != nil { + return fiber.NewError(fiber.StatusInternalServerError, "failed to record signal: "+err.Error()) + } + + return c.Status(fiber.StatusAccepted).JSON(fiber.Map{"status": "accepted"}) +} diff --git a/ddl/migrations/0205_challenge_signals.sql b/ddl/migrations/0205_challenge_signals.sql new file mode 100644 index 00000000..398fba3d --- /dev/null +++ b/ddl/migrations/0205_challenge_signals.sql @@ -0,0 +1,55 @@ +-- challenge_signals: client- and admin-reported events that don't surface +-- from any on-chain or Solana table on their own. Consumed by the m/o/r/ +-- rv/rd challenge processors in api/jobs/challenges/. +-- +-- One row per discrete event. Rows are append-only — processors track +-- their own checkpoint into indexing_checkpoints. + +BEGIN; + +DO $$ BEGIN + CREATE TYPE challenge_signal_type AS ENUM ( + 'mobile_install', + 'one_shot', + 'referral' + ); +EXCEPTION + WHEN duplicate_object THEN NULL; +END $$; + +CREATE TABLE IF NOT EXISTS challenge_signals ( + id bigserial PRIMARY KEY, + type challenge_signal_type NOT NULL, + user_id integer NOT NULL, + extra jsonb NOT NULL DEFAULT '{}'::jsonb, + created_at timestamptz NOT NULL DEFAULT now(), + source varchar, + client_nonce varchar +); + +-- Dedupe replays of the same client-reported event. +CREATE UNIQUE INDEX IF NOT EXISTS challenge_signals_nonce_idx + ON challenge_signals (type, user_id, client_nonce) + WHERE client_nonce IS NOT NULL; + +-- For each processor's incremental scan. +CREATE INDEX IF NOT EXISTS challenge_signals_type_id_idx + ON challenge_signals (type, id); + +-- Phase 3 catalog rows. +INSERT INTO challenges (id, type, amount, active, step_count, starting_block, weekly_pool, cooldown_days) VALUES + ('m', 'boolean', '1', true, NULL, 25346436, 25000, 7), + ('r', 'aggregate', '1', true, 5, 25346436, 25000, 7), + ('rv', 'aggregate', '1', true, 5000, 25346436, 25000, 7), + ('rd', 'boolean', '1', true, NULL, 25346436, 25000, 7), + ('o', 'aggregate', '1', true, 2147483647, 0, 2147483647, 0) +ON CONFLICT (id) DO UPDATE SET + type = EXCLUDED.type, + amount = EXCLUDED.amount, + active = EXCLUDED.active, + step_count = EXCLUDED.step_count, + starting_block = EXCLUDED.starting_block, + weekly_pool = EXCLUDED.weekly_pool, + cooldown_days = EXCLUDED.cooldown_days; + +COMMIT; diff --git a/jobs/challenges/processor_test.go b/jobs/challenges/processor_test.go index 7a0ea517..d028e92e 100644 --- a/jobs/challenges/processor_test.go +++ b/jobs/challenges/processor_test.go @@ -17,11 +17,20 @@ func withChallengesDB(t *testing.T) *pgxpool.Pool { pool := database.CreateTestDatabase(t, "test_jobs") t.Cleanup(func() { pool.Close() }) - // Seed Phase 1 challenges catalog inline. We don't run the production - // migration here because the test_jobs template DB isn't routed through - // the ddl runner — keeping the seed local to the test makes intent - // clearer too. ctx := context.Background() + // The latest pg_migrate preflight seeds a `0x0` block with + // is_current=true. database.Seed() also inserts a `block1` block + // with is_current=true, which collides on the blocks_is_current_idx + // unique partial index. Clear the preflight row so Seed() can + // install its expected fixture. + if _, err := pool.Exec(ctx, "DELETE FROM blocks WHERE blockhash = '0x0'"); err != nil { + t.Fatalf("clean preflight block: %v", err) + } + + // Seed Phase 1+2+3 challenges catalog inline. We don't run the + // production migration here because the test_jobs template DB isn't + // routed through the ddl runner — keeping the seed local to the + // test makes intent clearer too. rows := []struct { id, typ, amount string active bool @@ -49,6 +58,12 @@ func withChallengesDB(t *testing.T) *pgxpool.Pool { {"w", "aggregate", "1000", true, i32p(2147483647), 98950182, 50000, i32p(7)}, {"b", "aggregate", "1", true, i32p(2147483647), 220157041, 25000, i32p(7)}, {"s", "aggregate", "5", true, i32p(2147483647), 220157041, 25000, i32p(7)}, + // Phase 3 (signal-driven) + {"m", "boolean", "1", true, nil, 25346436, 25000, i32p(7)}, + {"r", "aggregate", "1", true, i32p(5), 25346436, 25000, i32p(7)}, + {"rv", "aggregate", "1", true, i32p(5000), 25346436, 25000, i32p(7)}, + {"rd", "boolean", "1", true, nil, 25346436, 25000, i32p(7)}, + {"o", "aggregate", "1", true, i32p(2147483647), 0, 2147483647, nil}, } for _, r := range rows { _, err := pool.Exec(ctx, ` diff --git a/jobs/challenges/signals.go b/jobs/challenges/signals.go new file mode 100644 index 00000000..e1693df1 --- /dev/null +++ b/jobs/challenges/signals.go @@ -0,0 +1,294 @@ +// Package challenges, signal-driven processors. +// +// Phase 3 processors (m, o, r, rv, rd) consume from the challenge_signals +// table (populated by POST /v1/challenges/signals or by admins via SQL). +// Each processor reads its slice of signals since checkpoint and mints +// the appropriate user_challenges row. +package challenges + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/jackc/pgx/v5" +) + +// signalsCheckpointName returns the indexing_checkpoints key for a +// given (challenge_id, signal_type) tuple. Each processor advances its +// own cursor independently — multiple processors can read the same +// signal_type without interfering. +func signalsCheckpointName(challengeID, signalType string) string { + return "challenges:" + challengeID + ":signals:" + signalType +} + +// signalRow is a row from challenge_signals. +type signalRow struct { + ID int64 + UserID int64 + ExtraJSON []byte + Source *string +} + +// readSignalsSince fetches up to `limit` rows of the given type with +// id > prev, ordered by id. +func readSignalsSince(ctx context.Context, tx pgx.Tx, signalType string, prev int64, limit int) ([]signalRow, int64, error) { + rows, err := tx.Query(ctx, ` + SELECT id, user_id, extra, source + FROM challenge_signals + WHERE type = $1::challenge_signal_type AND id > $2 + ORDER BY id ASC + LIMIT $3 + `, signalType, prev, limit) + if err != nil { + return nil, prev, err + } + defer rows.Close() + var out []signalRow + maxID := prev + for rows.Next() { + var r signalRow + if err := rows.Scan(&r.ID, &r.UserID, &r.ExtraJSON, &r.Source); err != nil { + return nil, prev, err + } + out = append(out, r) + if r.ID > maxID { + maxID = r.ID + } + } + return out, maxID, rows.Err() +} + +func decodeSignalExtra(blob []byte, dest any) error { + if len(blob) == 0 { + return nil + } + return json.Unmarshal(blob, dest) +} + +// MobileInstallProcessor implements challenge "m" — boolean reward when +// a user reports installing the mobile app. Single signal type: +// "mobile_install"; signal user_id == reward target. +type MobileInstallProcessor struct{} + +func (p *MobileInstallProcessor) ChallengeID() string { return "m" } + +func (p *MobileInstallProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error { + c, ok, err := LoadChallenge(ctx, tx, p.ChallengeID()) + if err != nil { + return fmt.Errorf("load challenge: %w", err) + } + if !ok || !c.Active { + return nil + } + amount := c.AmountInt() + cpName := signalsCheckpointName(p.ChallengeID(), "mobile_install") + prev, err := readCheckpointInt(ctx, tx, cpName) + if err != nil { + return err + } + signals, maxID, err := readSignalsSince(ctx, tx, "mobile_install", prev, 5000) + if err != nil { + return fmt.Errorf("read signals: %w", err) + } + for _, s := range signals { + if err := UpsertUserChallenge(ctx, tx, + p.ChallengeID(), SpecifierFromUserID(s.UserID), + s.UserID, 1, 1, amount, + ); err != nil { + return fmt.Errorf("upsert: %w", err) + } + } + if maxID > prev { + if err := writeCheckpointInt(ctx, tx, cpName, maxID); err != nil { + return err + } + } + return nil +} + +// OneShotProcessor implements challenge "o" — admin-issued grants. +// Signal type: "one_shot". The signal's `extra.amount` overrides the +// catalog amount; a `extra.nonce` field becomes part of the specifier +// so the same user can receive multiple grants. +type OneShotProcessor struct{} + +func (p *OneShotProcessor) ChallengeID() string { return "o" } + +func (p *OneShotProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error { + c, ok, err := LoadChallenge(ctx, tx, p.ChallengeID()) + if err != nil { + return fmt.Errorf("load challenge: %w", err) + } + if !ok || !c.Active { + return nil + } + catalogAmount := c.AmountInt() + cpName := signalsCheckpointName(p.ChallengeID(), "one_shot") + prev, err := readCheckpointInt(ctx, tx, cpName) + if err != nil { + return err + } + signals, maxID, err := readSignalsSince(ctx, tx, "one_shot", prev, 5000) + if err != nil { + return err + } + for _, s := range signals { + var extra struct { + Amount *int32 `json:"amount"` + Nonce *string `json:"nonce"` + } + if err := decodeSignalExtra(s.ExtraJSON, &extra); err != nil { + return fmt.Errorf("decode one_shot extra: %w", err) + } + amount := catalogAmount + if extra.Amount != nil && *extra.Amount > 0 { + amount = *extra.Amount + } + // Default specifier nonce = signal id so same user multiple + // grants stay distinct even when no nonce is provided. + nonce := fmt.Sprintf("%d", s.ID) + if extra.Nonce != nil && *extra.Nonce != "" { + nonce = *extra.Nonce + } + specifier := fmt.Sprintf("%x:%s", s.UserID, nonce) + if err := UpsertUserChallenge(ctx, tx, + p.ChallengeID(), specifier, s.UserID, 1, 1, amount, + ); err != nil { + return fmt.Errorf("upsert: %w", err) + } + } + if maxID > prev { + if err := writeCheckpointInt(ctx, tx, cpName, maxID); err != nil { + return err + } + } + return nil +} + +// ReferralProcessor implements challenges "r" / "rv" — referrer earns. +// Signal type: "referral". The signal's user_id is the *referred* user +// (who reported their referrer); extra.referrer_user_id is the existing +// user who gets the reward. Gates differ by challenge: +// +// r: referrer NOT verified +// rv: referrer IS verified +type ReferralProcessor struct { + ID string // "r" or "rv" + Verified bool // gate: referrer must be verified (rv) or not (r) +} + +func NewReferralProcessor() Processor { return &ReferralProcessor{ID: "r", Verified: false} } +func NewVerifiedReferralProcessor() Processor { return &ReferralProcessor{ID: "rv", Verified: true} } + +func (p *ReferralProcessor) ChallengeID() string { return p.ID } + +func (p *ReferralProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error { + c, ok, err := LoadChallenge(ctx, tx, p.ChallengeID()) + if err != nil { + return fmt.Errorf("load challenge: %w", err) + } + if !ok || !c.Active { + return nil + } + amount := c.AmountInt() + cpName := signalsCheckpointName(p.ChallengeID(), "referral") + prev, err := readCheckpointInt(ctx, tx, cpName) + if err != nil { + return err + } + signals, maxID, err := readSignalsSince(ctx, tx, "referral", prev, 5000) + if err != nil { + return err + } + for _, s := range signals { + var extra struct { + ReferrerUserID *int64 `json:"referrer_user_id"` + } + if err := decodeSignalExtra(s.ExtraJSON, &extra); err != nil { + return fmt.Errorf("decode referral extra: %w", err) + } + if extra.ReferrerUserID == nil { + continue // malformed signal — skip + } + // Look up referrer's verification status. + var verified bool + if err := tx.QueryRow(ctx, ` + SELECT COALESCE(is_verified, false) FROM users + WHERE user_id = $1 AND is_current = true + LIMIT 1 + `, *extra.ReferrerUserID).Scan(&verified); err != nil { + if err == pgx.ErrNoRows { + continue // referrer doesn't exist — skip + } + return err + } + if verified != p.Verified { + continue // wrong gate for this processor + } + // Specifier: : + specifier := fmt.Sprintf("%x:%x", *extra.ReferrerUserID, s.UserID) + if err := UpsertUserChallenge(ctx, tx, + p.ID, specifier, *extra.ReferrerUserID, 1, 1, amount, + ); err != nil { + return fmt.Errorf("upsert: %w", err) + } + } + if maxID > prev { + if err := writeCheckpointInt(ctx, tx, cpName, maxID); err != nil { + return err + } + } + return nil +} + +// ReferredProcessor implements challenge "rd" — referred user earns +// once for being referred. Same signal type as r/rv ("referral"), +// different processor — they checkpoint independently. +type ReferredProcessor struct{} + +func (p *ReferredProcessor) ChallengeID() string { return "rd" } + +func (p *ReferredProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error { + c, ok, err := LoadChallenge(ctx, tx, p.ChallengeID()) + if err != nil { + return fmt.Errorf("load challenge: %w", err) + } + if !ok || !c.Active { + return nil + } + amount := c.AmountInt() + cpName := signalsCheckpointName(p.ChallengeID(), "referral") + prev, err := readCheckpointInt(ctx, tx, cpName) + if err != nil { + return err + } + signals, maxID, err := readSignalsSince(ctx, tx, "referral", prev, 5000) + if err != nil { + return err + } + for _, s := range signals { + var extra struct { + ReferrerUserID *int64 `json:"referrer_user_id"` + } + if err := decodeSignalExtra(s.ExtraJSON, &extra); err != nil { + return fmt.Errorf("decode: %w", err) + } + referrer := int64(0) + if extra.ReferrerUserID != nil { + referrer = *extra.ReferrerUserID + } + specifier := fmt.Sprintf("%x:%x", s.UserID, referrer) + if err := UpsertUserChallenge(ctx, tx, + p.ChallengeID(), specifier, s.UserID, 1, 1, amount, + ); err != nil { + return fmt.Errorf("upsert: %w", err) + } + } + if maxID > prev { + if err := writeCheckpointInt(ctx, tx, cpName, maxID); err != nil { + return err + } + } + return nil +} diff --git a/jobs/challenges/signals_test.go b/jobs/challenges/signals_test.go new file mode 100644 index 00000000..c955e4db --- /dev/null +++ b/jobs/challenges/signals_test.go @@ -0,0 +1,144 @@ +package challenges + +import ( + "context" + "fmt" + "testing" + + "api.audius.co/database" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMobileInstall_OneRowPerUser(t *testing.T) { + pool := withChallengesDB(t) + ctx := context.Background() + _, err := pool.Exec(ctx, ` + INSERT INTO challenge_signals (type, user_id, extra) + VALUES ('mobile_install', 2000, '{}'::jsonb) + `) + require.NoError(t, err) + + runProcessor(t, pool, &MobileInstallProcessor{}) + + r, ok := queryUserChallenge(t, pool, "m", fmt.Sprintf("%x", 2000)) + if assert.True(t, ok) { + assert.True(t, r.IsComplete) + assert.Equal(t, int32(1), r.Amount) + } + + // Second run of the same processor without new signals is a no-op. + runProcessor(t, pool, &MobileInstallProcessor{}) + var count int + require.NoError(t, pool.QueryRow(ctx, + "SELECT COUNT(*) FROM user_challenges WHERE challenge_id = 'm'").Scan(&count)) + assert.Equal(t, 1, count) +} + +func TestOneShot_ExtraAmountOverrides(t *testing.T) { + pool := withChallengesDB(t) + ctx := context.Background() + _, err := pool.Exec(ctx, ` + INSERT INTO challenge_signals (type, user_id, extra) + VALUES ('one_shot', 2100, '{"amount": 500, "nonce": "drop-1"}'::jsonb) + `) + require.NoError(t, err) + + runProcessor(t, pool, &OneShotProcessor{}) + + r, ok := queryUserChallenge(t, pool, "o", fmt.Sprintf("%x:drop-1", 2100)) + if assert.True(t, ok) { + assert.Equal(t, int32(500), r.Amount, "extra.amount should override catalog amount") + assert.True(t, r.IsComplete) + } +} + +func TestReferral_NonVerifiedReferrer(t *testing.T) { + pool := withChallengesDB(t) + ctx := context.Background() + database.Seed(pool, database.FixtureMap{ + "blocks": {{"blockhash": "blk_ref", "number": 1}}, + "users": { + {"user_id": 2200, "wallet": "0x2200", "is_verified": false}, // referrer + {"user_id": 2201, "wallet": "0x2201"}, // referred + }, + }) + _, err := pool.Exec(ctx, ` + INSERT INTO challenge_signals (type, user_id, extra) + VALUES ('referral', 2201, '{"referrer_user_id": 2200}'::jsonb) + `) + require.NoError(t, err) + + runProcessor(t, pool, NewReferralProcessor()) // r + runProcessor(t, pool, NewVerifiedReferralProcessor()) // rv (should not fire) + runProcessor(t, pool, &ReferredProcessor{}) // rd + + // r should mint for the referrer (2200). + rRow, ok := queryUserChallenge(t, pool, "r", fmt.Sprintf("%x:%x", 2200, 2201)) + if assert.True(t, ok, "r row should exist") { + assert.Equal(t, int64(2200), rRow.UserID, "row filed under referrer") + } + + // rv should NOT mint (referrer not verified). + _, ok = queryUserChallenge(t, pool, "rv", fmt.Sprintf("%x:%x", 2200, 2201)) + assert.False(t, ok, "rv gated on verified referrer") + + // rd should mint for the referred user. + rdRow, ok := queryUserChallenge(t, pool, "rd", fmt.Sprintf("%x:%x", 2201, 2200)) + if assert.True(t, ok, "rd row should exist") { + assert.Equal(t, int64(2201), rdRow.UserID, "row filed under referred user") + } +} + +func TestReferral_VerifiedReferrer(t *testing.T) { + pool := withChallengesDB(t) + ctx := context.Background() + database.Seed(pool, database.FixtureMap{ + "blocks": {{"blockhash": "blk_refv", "number": 1}}, + "users": { + {"user_id": 2210, "wallet": "0x2210", "is_verified": true}, // verified referrer + {"user_id": 2211, "wallet": "0x2211"}, + }, + }) + _, err := pool.Exec(ctx, ` + INSERT INTO challenge_signals (type, user_id, extra) + VALUES ('referral', 2211, '{"referrer_user_id": 2210}'::jsonb) + `) + require.NoError(t, err) + + runProcessor(t, pool, NewReferralProcessor()) + runProcessor(t, pool, NewVerifiedReferralProcessor()) + + // r should NOT mint (referrer is verified). + _, ok := queryUserChallenge(t, pool, "r", fmt.Sprintf("%x:%x", 2210, 2211)) + assert.False(t, ok) + + // rv SHOULD mint. + rvRow, ok := queryUserChallenge(t, pool, "rv", fmt.Sprintf("%x:%x", 2210, 2211)) + if assert.True(t, ok) { + assert.Equal(t, int64(2210), rvRow.UserID) + } +} + +func TestSignals_CheckpointAdvances(t *testing.T) { + pool := withChallengesDB(t) + ctx := context.Background() + + _, err := pool.Exec(ctx, `INSERT INTO challenge_signals (type, user_id, extra) + VALUES ('mobile_install', 2300, '{}'::jsonb), ('mobile_install', 2301, '{}'::jsonb)`) + require.NoError(t, err) + + runProcessor(t, pool, &MobileInstallProcessor{}) + + // Add another signal after the first run. + _, err = pool.Exec(ctx, `INSERT INTO challenge_signals (type, user_id, extra) + VALUES ('mobile_install', 2302, '{}'::jsonb)`) + require.NoError(t, err) + + runProcessor(t, pool, &MobileInstallProcessor{}) + + for _, uid := range []int{2300, 2301, 2302} { + _, ok := queryUserChallenge(t, pool, "m", fmt.Sprintf("%x", uid)) + assert.True(t, ok, "user %d should have mobile_install row", uid) + } +} diff --git a/jobs/index_challenges.go b/jobs/index_challenges.go index dc62342f..0aa13d34 100644 --- a/jobs/index_challenges.go +++ b/jobs/index_challenges.go @@ -57,6 +57,12 @@ func NewIndexChallengesJob(cfg config.Config, pool database.DbPool) *IndexChalle &challenges.RemixContestWinnerProcessor{}, challenges.NewAudioMatchingBuyerProcessor(), challenges.NewAudioMatchingSellerProcessor(), + // Phase 3 (signal-driven) + &challenges.MobileInstallProcessor{}, + &challenges.OneShotProcessor{}, + challenges.NewReferralProcessor(), + challenges.NewVerifiedReferralProcessor(), + &challenges.ReferredProcessor{}, }, } }