@@ -2,13 +2,50 @@ import { RunId } from "@trigger.dev/core/v3/isomorphic";
22import type { PrismaClientOrTransaction , TaskRun } from "@trigger.dev/database" ;
33import { logger } from "~/services/logger.server" ;
44import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server" ;
5+ import { ServiceValidationError } from "~/v3/services/common.server" ;
56import type { RunEngine } from "~/v3/runEngine.server" ;
67import { shouldIdempotencyKeyBeCleared } from "~/v3/taskStatus" ;
8+ import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server" ;
9+ import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server" ;
10+ import { claimOrAwait } from "~/v3/mollifier/idempotencyClaim.server" ;
11+ import { makeResolveMollifierFlag } from "~/v3/mollifier/mollifierGate.server" ;
712import type { TraceEventConcern , TriggerTaskRequest } from "../types" ;
813
14+ // In-memory per-org mollifier-enabled check, shared with `evaluateGate`
15+ // (same `Organization.featureFlags` JSON, no DB read). Used to gate the
16+ // pre-gate claim's Redis round-trip so non-mollifier orgs don't pay it
17+ // during staged rollout — see the comment above the claim block in
18+ // handleTriggerRequest.
19+ const resolveOrgMollifierFlag = makeResolveMollifierFlag ( ) ;
20+
21+ // Claim ownership context returned to the caller when the
22+ // IdempotencyKeyConcern won a pre-gate claim. Caller MUST publish the
23+ // winning runId on pipeline success (`publishClaim`) or release the
24+ // claim on failure (`releaseClaim`).
25+ export type ClaimedIdempotency = {
26+ envId : string ;
27+ taskIdentifier : string ;
28+ idempotencyKey : string ;
29+ // Ownership token from `claimOrAwait`. The caller's trigger pipeline
30+ // MUST thread this into publishClaim/releaseClaim so the buffer's
31+ // compare-and-act protects the slot against a stale predecessor.
32+ token : string ;
33+ } ;
34+
935export type IdempotencyKeyConcernResult =
1036 | { isCached : true ; run : TaskRun }
11- | { isCached : false ; idempotencyKey ?: string ; idempotencyKeyExpiresAt ?: Date } ;
37+ | {
38+ isCached : false ;
39+ idempotencyKey ?: string ;
40+ idempotencyKeyExpiresAt ?: Date ;
41+ // Set when this trigger holds a pre-gate claim. The caller's
42+ // trigger pipeline MUST resolve the claim by either publishing
43+ // the runId on success or releasing on failure. Undefined when
44+ // the request has no idempotency key, when the buffer is
45+ // unavailable, or when the request is a triggerAndWait (claim
46+ // path skipped per plan doc).
47+ claim ?: ClaimedIdempotency ;
48+ } ;
1249
1350export class IdempotencyKeyConcern {
1451 constructor (
@@ -17,6 +54,47 @@ export class IdempotencyKeyConcern {
1754 private readonly traceEventConcern : TraceEventConcern
1855 ) { }
1956
57+ // Buffer-side idempotency dedup. Resolves an idempotency key against the
58+ // mollifier buffer when PG missed. Returns a SyntheticRun cast to
59+ // TaskRun so the route handler (which only reads run.id / run.friendlyId)
60+ // can echo the buffered run's friendlyId as a cached hit. Returns null
61+ // for any failure or miss — buffer outages must not 500 the trigger
62+ // hot path; we fail open to "no cache hit" and let the request through.
63+ private async findBufferedRunWithIdempotency (
64+ environmentId : string ,
65+ organizationId : string ,
66+ taskIdentifier : string ,
67+ idempotencyKey : string ,
68+ ) : Promise < TaskRun | null > {
69+ const buffer = getMollifierBuffer ( ) ;
70+ if ( ! buffer ) return null ;
71+
72+ let bufferedRunId : string | null ;
73+ try {
74+ bufferedRunId = await buffer . lookupIdempotency ( {
75+ envId : environmentId ,
76+ taskIdentifier,
77+ idempotencyKey,
78+ } ) ;
79+ } catch ( err ) {
80+ logger . error ( "IdempotencyKeyConcern: buffer lookupIdempotency failed" , {
81+ environmentId,
82+ taskIdentifier,
83+ err : err instanceof Error ? err . message : String ( err ) ,
84+ } ) ;
85+ return null ;
86+ }
87+ if ( ! bufferedRunId ) return null ;
88+
89+ const synthetic = await findRunByIdWithMollifierFallback ( {
90+ runId : bufferedRunId ,
91+ environmentId,
92+ organizationId,
93+ } ) ;
94+ if ( ! synthetic ) return null ;
95+ return synthetic as unknown as TaskRun ;
96+ }
97+
2098 async handleTriggerRequest (
2199 request : TriggerTaskRequest ,
22100 parentStore : string | undefined
@@ -44,6 +122,25 @@ export class IdempotencyKeyConcern {
44122 } )
45123 : undefined ;
46124
125+ // Buffer fallback per the mollifier-idempotency design. PG missed —
126+ // the same key may belong to a buffered run that hasn't materialised
127+ // yet. Skipped when `resumeParentOnCompletion` is set: blocking a
128+ // parent on a buffered child via waitpoint requires a PG row that
129+ // doesn't exist yet. The follow-up accept's SETNX in mollifyTrigger
130+ // still dedupes the trigger itself; the waitpoint just doesn't fire
131+ // for this rare race window.
132+ if ( ! existingRun && idempotencyKey && ! request . body . options ?. resumeParentOnCompletion ) {
133+ const buffered = await this . findBufferedRunWithIdempotency (
134+ request . environment . id ,
135+ request . environment . organizationId ,
136+ request . taskId ,
137+ idempotencyKey ,
138+ ) ;
139+ if ( buffered ) {
140+ return { isCached : true , run : buffered } ;
141+ }
142+ }
143+
47144 if ( existingRun ) {
48145 // The idempotency key has expired
49146 if ( existingRun . idempotencyKeyExpiresAt && existingRun . idempotencyKeyExpiresAt < new Date ( ) ) {
@@ -133,6 +230,125 @@ export class IdempotencyKeyConcern {
133230 return { isCached : true , run : existingRun } ;
134231 }
135232
233+ // Pre-gate claim — closes the PG+buffer race during gate transition.
234+ // All same-key triggers serialise here before evaluateGate decides
235+ // PG-pass-through vs mollify. Skipped for triggerAndWait
236+ // (resumeParentOnCompletion) — that path bypasses the gate entirely
237+ // and its existing PG-side dedup is sufficient.
238+ //
239+ // Also gated on the same per-org mollifier flag the gate uses: when
240+ // `TRIGGER_MOLLIFIER_ENABLED=1` globally for staged rollout, the buffer
241+ // singleton is constructed and `claimOrAwait` would otherwise issue a
242+ // Redis SETNX for EVERY idempotency-keyed trigger — including orgs
243+ // that haven't opted in. Those orgs never enter the mollify branch
244+ // (the gate always returns pass_through for them), so there's no
245+ // buffer activity to serialise against; PG's unique constraint
246+ // already deduplicates concurrent same-key races. Resolving the org
247+ // flag is a pure in-memory read of `Organization.featureFlags` — no
248+ // DB query, same predicate the gate uses — keeping the claim's Redis
249+ // RTT off the hot path for non-opted-in orgs during incremental
250+ // rollout.
251+ const claimEligible =
252+ ! request . body . options ?. resumeParentOnCompletion &&
253+ ( await resolveOrgMollifierFlag ( {
254+ envId : request . environment . id ,
255+ orgId : request . environment . organizationId ,
256+ taskId : request . taskId ,
257+ orgFeatureFlags :
258+ ( ( request . environment . organization ?. featureFlags as
259+ | Record < string , unknown >
260+ | null
261+ | undefined ) ?? null ) ,
262+ } ) ) ;
263+ if ( claimEligible ) {
264+ const ttlSeconds = Math . max (
265+ 1 ,
266+ Math . min (
267+ 30 ,
268+ Math . ceil ( ( idempotencyKeyExpiresAt . getTime ( ) - Date . now ( ) ) / 1000 ) ,
269+ ) ,
270+ ) ;
271+ const outcome = await claimOrAwait ( {
272+ envId : request . environment . id ,
273+ taskIdentifier : request . taskId ,
274+ idempotencyKey,
275+ ttlSeconds,
276+ } ) ;
277+ if ( outcome . kind === "resolved" ) {
278+ // Another concurrent trigger committed first. Re-resolve via the
279+ // existing checks: writer-side PG findFirst first (defeats
280+ // replica lag), then buffer fallback for the buffered case.
281+ const writerRun = await this . prisma . taskRun . findFirst ( {
282+ where : {
283+ runtimeEnvironmentId : request . environment . id ,
284+ idempotencyKey,
285+ taskIdentifier : request . taskId ,
286+ } ,
287+ include : { associatedWaitpoint : true } ,
288+ } ) ;
289+ if ( writerRun ) {
290+ return { isCached : true , run : writerRun } ;
291+ }
292+ const buffered = await this . findBufferedRunWithIdempotency (
293+ request . environment . id ,
294+ request . environment . organizationId ,
295+ request . taskId ,
296+ idempotencyKey ,
297+ ) ;
298+ if ( buffered ) {
299+ return { isCached : true , run : buffered } ;
300+ }
301+ // Claim resolved to a runId nothing can find — the run was
302+ // genuinely lost (claimant errored after publish, drain failed,
303+ // or both the PG row and buffer entry TTL'd out). This is
304+ // terminal, not transient: `lookupIdempotency` self-heals a
305+ // dangling pointer, and `ack` keeps the entry hash as a
306+ // read-fallback past the PG write, so re-polling cannot conjure
307+ // a run that is gone. Falling through to a fresh trigger is the
308+ // correct recovery.
309+ //
310+ // Why falling through claimless is safe (no duplicate runs):
311+ // concurrent triggers that also fall through here converge on a
312+ // single run via the same dedup backstops the claim layer relies
313+ // on — the PG unique constraint on the idempotency key
314+ // (RunDuplicateIdempotencyKeyError → retry resolves to the
315+ // winner) for the pass-through path, and `accept`'s idempotency
316+ // SETNX (`duplicate_idempotency`) for the mollify path. Once the
317+ // first fall-through commits a run, later callers find it via the
318+ // writer-PG / buffer lookups above despite the stale `resolved:`
319+ // slot, which the slot's TTL clears within ~30s. The residual
320+ // cost is a few redundant (deduped) trigger attempts in that
321+ // window, not duplicate runs.
322+ logger . warn ( "idempotency claim resolved but runId not findable" , {
323+ envId : request . environment . id ,
324+ taskIdentifier : request . taskId ,
325+ claimedRunId : outcome . runId ,
326+ } ) ;
327+ }
328+ if ( outcome . kind === "timed_out" ) {
329+ throw new ServiceValidationError (
330+ "Idempotency claim resolution timed out" ,
331+ 503 ,
332+ ) ;
333+ }
334+ if ( outcome . kind === "claimed" ) {
335+ // Caller MUST publish/release. Signalled via the result's
336+ // `claim` field, including the ownership token so the buffer
337+ // can compare-and-act on the slot we now own.
338+ return {
339+ isCached : false ,
340+ idempotencyKey,
341+ idempotencyKeyExpiresAt,
342+ claim : {
343+ envId : request . environment . id ,
344+ taskIdentifier : request . taskId ,
345+ idempotencyKey,
346+ token : outcome . token ,
347+ } ,
348+ } ;
349+ }
350+ }
351+
136352 return { isCached : false , idempotencyKey, idempotencyKeyExpiresAt } ;
137353 }
138354}
0 commit comments