diff --git a/.server-changes/run-engine-single-ttl-path.md b/.server-changes/run-engine-single-ttl-path.md new file mode 100644 index 00000000000..cac5e3a3cb1 --- /dev/null +++ b/.server-changes/run-engine-single-ttl-path.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: improvement +--- + +Route TTL expiration through the batch TTL path only. Removes the redundant per-run `expireRun` worker job, leaving the batch consumer as the single mechanism that flips runs to `EXPIRED` when their TTL elapses while still queued. diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 1725587df45..e0af0f2c4ff 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -797,7 +797,13 @@ export class RunEngine { } } else { try { - if (taskRun.ttl) { + // The new batch TTL path only expires runs still in the queue + // sorted set (waiting on a concurrency slot). For DEV + // environments where the dev CLI may not be running, fast-pathed + // runs can sit on the worker queue indefinitely and never get + // claimed for expiration. Keep the legacy per-run expireRun job + // armed for DEV so those runs still expire. + if (taskRun.ttl && environment.type === "DEVELOPMENT") { await this.ttlSystem.scheduleExpireRun({ runId: taskRun.id, ttl: taskRun.ttl }); } @@ -812,7 +818,7 @@ export class RunEngine { enableFastPath, }); } catch (enqueueError) { - this.logger.error("engine.trigger(): failed to schedule TTL or enqueue run", { + this.logger.error("engine.trigger(): failed to enqueue run", { runId: taskRun.id, friendlyId: taskRun.friendlyId, taskIdentifier: taskRun.taskIdentifier, diff --git a/internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts b/internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts index 740ce1a849f..32ab98bad6c 100644 --- a/internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts @@ -144,13 +144,34 @@ export class DelayedRunSystem { return; } + // The batch TTL path only expires runs still in the queue sorted set. + // For DEV environments where the dev CLI may not be running, fast-pathed + // runs can sit on the worker queue indefinitely. Keep the legacy per-run + // expireRun job armed for DEV so those runs still expire. + if (run.ttl && run.runtimeEnvironment.type === "DEVELOPMENT") { + const expireAt = parseNaturalLanguageDuration(run.ttl); + if (expireAt) { + await this.$.worker.enqueue({ + id: `expireRun:${runId}`, + job: "expireRun", + payload: { runId }, + availableAt: expireAt, + }); + } + } + // Now we need to enqueue the run into the RunQueue - // Skip the lock in enqueueRun since we already hold it + // Skip the lock in enqueueRun since we already hold it. + // includeTtl: true so the run's TTL is armed from the moment it enters + // the queue (not from taskRun.createdAt). The TTL system tracks runs + // that are queued and have never started — delayed runs are first + // enqueued here, so this is the correct point to arm TTL. await this.enqueueSystem.enqueueRun({ run, env: run.runtimeEnvironment, batchId: run.batchId ?? undefined, skipRunLock: true, + includeTtl: true, }); const queuedAt = new Date(); @@ -183,18 +204,6 @@ export class DelayedRunSystem { }, }); - if (run.ttl) { - const expireAt = parseNaturalLanguageDuration(run.ttl); - - if (expireAt) { - await this.$.worker.enqueue({ - id: `expireRun:${runId}`, - job: "expireRun", - payload: { runId }, - availableAt: expireAt, - }); - } - } }); } diff --git a/internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts b/internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts index c22429ac0c9..9007cf86b2d 100644 --- a/internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts @@ -98,6 +98,11 @@ export class PendingVersionSystem { run: updatedRun, env: backgroundWorker.runtimeEnvironment, tx, + // PENDING_VERSION re-enqueue is the first time this run is actually + // entering the run queue (the original enqueue was held back waiting + // for a worker version). Arm TTL here so the TTL system can expire it + // if it sits queued waiting on a concurrency slot. + includeTtl: true, }); }); diff --git a/internal-packages/run-engine/src/engine/tests/delays.test.ts b/internal-packages/run-engine/src/engine/tests/delays.test.ts index 8a93aa1ad14..81e3641cd74 100644 --- a/internal-packages/run-engine/src/engine/tests/delays.test.ts +++ b/internal-packages/run-engine/src/engine/tests/delays.test.ts @@ -201,6 +201,11 @@ describe("RunEngine delays", () => { }, queue: { redis: redisOptions, + ttlSystem: { + pollIntervalMs: 100, + batchSize: 10, + batchMaxWaitMs: 100, + }, }, runLock: { redis: redisOptions, @@ -230,7 +235,21 @@ describe("RunEngine delays", () => { taskIdentifier ); + // TTL only expires runs still queued waiting on a concurrency slot. + // Once the delay elapses, the run gets enqueued; saturate env concurrency + // so it stays queued so the new TTL path can expire it. + await engine.runQueue.updateEnvConcurrencyLimits({ + ...authenticatedEnvironment, + maximumConcurrencyLimit: 0, + }); + + const enqueuedAfterDelayTimes: number[] = []; + engine.eventBus.on("runEnqueuedAfterDelay", () => { + enqueuedAfterDelayTimes.push(Date.now()); + }); + //trigger the run + const triggerTime = Date.now(); const run = await engine.trigger( { number: 1, @@ -247,7 +266,7 @@ describe("RunEngine delays", () => { queue: "task/test-task", isTest: false, tags: [], - delayUntil: new Date(Date.now() + 1000), + delayUntil: new Date(triggerTime + 1000), ttl: "2s", }, prisma @@ -259,7 +278,7 @@ describe("RunEngine delays", () => { expect(executionData.snapshot.executionStatus).toBe("DELAYED"); expect(run.status).toBe("DELAYED"); - //wait for 1 seconds + //wait so the delay elapses and the run is enqueued await setTimeout(2_500); //should now be queued @@ -273,19 +292,29 @@ describe("RunEngine delays", () => { expect(run2.status).toBe("PENDING"); - //wait for 3 seconds - await setTimeout(3_000); + // TTL is armed at queue-enter time (not from triggerTime). With a 2s TTL + // and a 1s delay, the run becomes eligible to expire ~3s after trigger. + // Confirm the TTL was not armed against triggerTime (i.e. didn't already + // fire while still DELAYED), and that the run only expires after the + // queue-enter timestamp + ttl has elapsed. + expect(enqueuedAfterDelayTimes.length).toBe(1); + const enqueuedAt = enqueuedAfterDelayTimes[0]!; + expect(enqueuedAt - triggerTime).toBeGreaterThanOrEqual(1000); - //should now be expired - const executionData3 = await engine.getRunExecutionData({ runId: run.id }); - assertNonNullable(executionData3); - expect(executionData3.snapshot.executionStatus).toBe("FINISHED"); + //wait so the TTL fires (counted from when the run was enqueued) + await setTimeout(3_000); + // Status comes from the DB; the batch TTL path does not create + // execution snapshots, so getRunExecutionData may still show QUEUED. const run3 = await prisma.taskRun.findFirstOrThrow({ where: { id: run.id }, }); expect(run3.status).toBe("EXPIRED"); + assertNonNullable(run3.expiredAt); + // The expiry must happen after enqueue + ttl, not after trigger + ttl. + // Allow a small tolerance for poll interval + batch wait. + expect(run3.expiredAt.getTime()).toBeGreaterThanOrEqual(enqueuedAt + 2_000); } finally { await engine.quit(); } diff --git a/internal-packages/run-engine/src/engine/tests/lazyWaitpoint.test.ts b/internal-packages/run-engine/src/engine/tests/lazyWaitpoint.test.ts index bc24f9b6f1a..dedcff5b5b6 100644 --- a/internal-packages/run-engine/src/engine/tests/lazyWaitpoint.test.ts +++ b/internal-packages/run-engine/src/engine/tests/lazyWaitpoint.test.ts @@ -409,6 +409,7 @@ describe("RunEngine lazy waitpoint creation", () => { ttlSystem: { pollIntervalMs: 100, batchSize: 10, + batchMaxWaitMs: 100, }, }, runLock: { @@ -434,6 +435,12 @@ describe("RunEngine lazy waitpoint creation", () => { await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + // TTL only expires runs still queued waiting on a concurrency slot. + await engine.runQueue.updateEnvConcurrencyLimits({ + ...authenticatedEnvironment, + maximumConcurrencyLimit: 0, + }); + // Trigger a standalone run with TTL (no waitpoint) const run = await engine.trigger( { @@ -467,11 +474,15 @@ describe("RunEngine lazy waitpoint creation", () => { // Wait for TTL to expire await setTimeout(1_500); - // Verify run expired successfully (no throw) - const executionData = await engine.getRunExecutionData({ runId: run.id }); - assertNonNullable(executionData); - expect(executionData.run.status).toBe("EXPIRED"); - expect(executionData.snapshot.executionStatus).toBe("FINISHED"); + // Verify run expired successfully (no throw). + // The batch TTL path does not create execution snapshots, so check + // the status directly from the database rather than via + // getRunExecutionData. + const expiredRun = await prisma.taskRun.findUnique({ + where: { id: run.id }, + select: { status: true }, + }); + expect(expiredRun?.status).toBe("EXPIRED"); } finally { await engine.quit(); } diff --git a/internal-packages/run-engine/src/engine/tests/pendingVersion.test.ts b/internal-packages/run-engine/src/engine/tests/pendingVersion.test.ts index 65498e32ffe..38eaa00b213 100644 --- a/internal-packages/run-engine/src/engine/tests/pendingVersion.test.ts +++ b/internal-packages/run-engine/src/engine/tests/pendingVersion.test.ts @@ -309,4 +309,120 @@ describe("RunEngine pending version", () => { } } ); + + containerTest( + "PENDING_VERSION re-enqueue arms TTL on the queued message", + async ({ prisma, redisOptions }) => { + // When a run enters PENDING_VERSION (background worker doesn't yet have + // the task), the first enqueue happens but the message is dequeued and + // its TTL set entry is dropped while the run waits for a matching worker. + // Once a worker arrives, pendingVersionSystem re-enqueues. That + // re-enqueue is the first time the run is actually queued for a worker, + // so TTL must be armed at that point — not held over from the original + // enqueue. + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + processWorkerQueueDebounceMs: 50, + masterQueueConsumersDisabled: true, + ttlSystem: { + pollIntervalMs: 100, + batchSize: 10, + batchMaxWaitMs: 100, + }, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const taskIdentifier = "test-task"; + + // Trigger a run with TTL — no background worker exists yet for this + // task, so it will end up in PENDING_VERSION. + const run = await engine.trigger( + { + number: 1, + friendlyId: "run_pvttl1", + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "tpv1", + spanId: "spv1", + queue: "task/test-task", + isTest: false, + tags: [], + ttl: "10m", + }, + prisma + ); + + // A worker arrives that doesn't have this task — flushes the run to + // PENDING_VERSION. + await setupBackgroundWorker(engine, authenticatedEnvironment, ["test-task-other"]); + + await setTimeout(500); + + // The consumer attempt is what flushes the run to PENDING_VERSION — + // dequeue finds no matching task version and returns nothing. + const dequeuedEmpty = await engine.dequeueFromWorkerQueue({ + consumerId: "test_pv", + workerQueue: "main", + }); + expect(dequeuedEmpty.length).toBe(0); + + const executionDataAfter = await engine.getRunExecutionData({ runId: run.id }); + assertNonNullable(executionDataAfter); + expect(executionDataAfter.run.status).toBe("PENDING_VERSION"); + + // Now a worker arrives WITH the task — pendingVersionSystem + // re-enqueues the run. + await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + + await setTimeout(1000); + + const executionDataReenqueued = await engine.getRunExecutionData({ runId: run.id }); + assertNonNullable(executionDataReenqueued); + expect(executionDataReenqueued.run.status).toBe("PENDING"); + + // The re-enqueued message must carry ttlExpiresAt so the TTL set + // tracks it for expiration. + const message = await engine.runQueue.readMessage( + authenticatedEnvironment.organization.id, + run.id + ); + assertNonNullable(message); + expect(message.ttlExpiresAt).toBeDefined(); + expect(typeof message.ttlExpiresAt).toBe("number"); + } finally { + await engine.quit(); + } + } + ); }); diff --git a/internal-packages/run-engine/src/engine/tests/ttl.test.ts b/internal-packages/run-engine/src/engine/tests/ttl.test.ts index c1df00bf13f..e787d916f8a 100644 --- a/internal-packages/run-engine/src/engine/tests/ttl.test.ts +++ b/internal-packages/run-engine/src/engine/tests/ttl.test.ts @@ -1,6 +1,7 @@ import { containerTest, assertNonNullable } from "@internal/testcontainers"; import { trace } from "@internal/tracing"; import { expect } from "vitest"; +import { Decimal } from "@trigger.dev/database"; import { RunEngine } from "../index.js"; import { setTimeout } from "timers/promises"; import { EventBusEventArgs } from "../eventBus.js"; @@ -28,6 +29,7 @@ describe("RunEngine ttl", () => { ttlSystem: { pollIntervalMs: 100, batchSize: 10, + batchMaxWaitMs: 100, }, }, runLock: { @@ -58,6 +60,14 @@ describe("RunEngine ttl", () => { taskIdentifier ); + // TTL only expires runs still queued waiting on a concurrency slot. + // Force env concurrency to 0 so the run never gets dequeued and stays + // in the TTL set long enough for the consumer to expire it. + await engine.runQueue.updateEnvConcurrencyLimits({ + ...authenticatedEnvironment, + maximumConcurrencyLimit: 0, + }); + //trigger the run const run = await engine.trigger( { @@ -153,6 +163,7 @@ describe("RunEngine ttl", () => { ttlSystem: { pollIntervalMs: 100, batchSize: 10, + batchMaxWaitMs: 100, }, }, runLock: { @@ -231,6 +242,7 @@ describe("RunEngine ttl", () => { ttlSystem: { pollIntervalMs: 100, batchSize: 10, + batchMaxWaitMs: 100, }, }, runLock: { @@ -302,6 +314,221 @@ describe("RunEngine ttl", () => { } }); + containerTest( + "Re-enqueued runs are not expired by TTL once they have started", + async ({ prisma, redisOptions }) => { + // Contract: TTL only applies to runs that are queued and have never started. + // Once a run has been dequeued (started executing), a subsequent re-enqueue + // (e.g. after a waitpoint, checkpoint resume, or pending-version flow) + // must not re-arm TTL, even if the original TTL deadline has long passed. + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const expiredEvents: EventBusEventArgs<"runExpired">[0][] = []; + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + processWorkerQueueDebounceMs: 50, + masterQueueConsumersDisabled: true, + ttlSystem: { + pollIntervalMs: 100, + batchSize: 10, + batchMaxWaitMs: 100, + }, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const taskIdentifier = "test-task"; + await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + + engine.eventBus.on("runExpired", (result) => { + expiredEvents.push(result); + }); + + const run = await engine.trigger( + { + number: 1, + friendlyId: "run_restart01", + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t_re2", + spanId: "s_re2", + workerQueue: "main", + queue: "task/test-task", + isTest: false, + tags: [], + ttl: "1s", + }, + prisma + ); + + // Dequeue the run — this simulates the run starting to execute, which + // ZREMs its TTL set entry. + await engine.runQueue.processMasterQueueForEnvironment( + authenticatedEnvironment.id, + 10 + ); + const dequeued = await engine.dequeueFromWorkerQueue({ + consumerId: "test-consumer", + workerQueue: "main", + blockingPopTimeoutSeconds: 1, + }); + expect(dequeued.length).toBe(1); + + // Re-enqueue without includeTtl — this is what waitpoint/checkpoint + // resume paths do. + await engine.enqueueSystem.enqueueRun({ + run, + env: authenticatedEnvironment, + tx: prisma, + skipRunLock: true, + includeTtl: false, + }); + + // Wait well past the original 1s TTL deadline. The run was first + // enqueued ~0s ago, so this is far beyond the original deadline. + await setTimeout(2_500); + + // Run must still exist and must NOT have been expired. + expect(expiredEvents.length).toBe(0); + const reenqueuedRun = await prisma.taskRun.findUnique({ + where: { id: run.id }, + select: { status: true }, + }); + // Whatever status the dequeue/re-enqueue flow leaves the run in, it + // must NOT be EXPIRED — that's the contract this test locks in. + expect(reenqueuedRun?.status).not.toBe("EXPIRED"); + } finally { + await engine.quit(); + } + } + ); + + containerTest( + "DEV runs sitting on worker queue still expire via legacy per-run job", + async ({ prisma, redisOptions }) => { + // The batch TTL path only expires runs still in the queue sorted set. + // In DEV, runs are fast-pathed straight to the worker queue, and if the + // dev CLI isn't running they can sit there forever. The legacy per-run + // expireRun job is kept for DEV specifically to cover this case. + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "DEVELOPMENT"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + processWorkerQueueDebounceMs: 50, + masterQueueConsumersDisabled: true, + // TTL batch path is enabled but should never see this run: it goes + // straight to the worker queue via fast-path. The legacy per-run + // job is what should expire it. + ttlSystem: { + pollIntervalMs: 100, + batchSize: 10, + batchMaxWaitMs: 100, + }, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const taskIdentifier = "test-task"; + + await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + + let expiredEventData: EventBusEventArgs<"runExpired">[0] | undefined; + engine.eventBus.on("runExpired", (result) => { + expiredEventData = result; + }); + + // Trigger a DEV run with fast-path enabled and a short TTL. The run + // should land in the worker queue without entering the TTL set. + const run = await engine.trigger( + { + number: 1, + friendlyId: "run_devttl1", + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "tdevttl1", + spanId: "sdevttl1", + workerQueue: "main", + queue: "task/test-task", + isTest: false, + tags: [], + ttl: "1s", + enableFastPath: true, + }, + prisma + ); + + // Wait past the TTL. The legacy per-run job should fire and expire it. + await setTimeout(1_500); + + assertNonNullable(expiredEventData); + const expiredRun = await prisma.taskRun.findUnique({ + where: { id: run.id }, + select: { status: true }, + }); + expect(expiredRun?.status).toBe("EXPIRED"); + } finally { + await engine.quit(); + } + } + ); + containerTest("Multiple runs expiring via TTL batch", async ({ prisma, redisOptions }) => { const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); @@ -322,6 +549,7 @@ describe("RunEngine ttl", () => { ttlSystem: { pollIntervalMs: 100, batchSize: 10, + batchMaxWaitMs: 100, }, }, runLock: { @@ -347,6 +575,12 @@ describe("RunEngine ttl", () => { await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + // TTL only expires runs still queued waiting on a concurrency slot. + await engine.runQueue.updateEnvConcurrencyLimits({ + ...authenticatedEnvironment, + maximumConcurrencyLimit: 0, + }); + engine.eventBus.on("runExpired", (result) => { expiredEvents.push(result); }); @@ -384,8 +618,10 @@ describe("RunEngine ttl", () => { expect(executionData.snapshot.executionStatus).toBe("QUEUED"); } - // Wait for TTL to expire - await setTimeout(1_500); + // Wait for TTL to expire. Concurrent triggers can land in different + // 100ms TTL-poll windows, so allow enough headroom for any stragglers + // to be claimed in a subsequent poll and flushed. + await setTimeout(2_500); // All runs should be expired expect(expiredEvents.length).toBe(3); @@ -450,6 +686,7 @@ describe("RunEngine ttl", () => { ttlSystem: { pollIntervalMs: 100, batchSize: 10, + batchMaxWaitMs: 100, }, }, runLock: { @@ -538,6 +775,7 @@ describe("RunEngine ttl", () => { ttlSystem: { pollIntervalMs: 100, batchSize: 10, + batchMaxWaitMs: 100, }, }, runLock: { @@ -563,6 +801,12 @@ describe("RunEngine ttl", () => { await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + // TTL only expires runs still queued waiting on a concurrency slot. + await engine.runQueue.updateEnvConcurrencyLimits({ + ...authenticatedEnvironment, + maximumConcurrencyLimit: 0, + }); + engine.eventBus.on("runExpired", (result) => { expiredEvents.push(result); }); @@ -631,10 +875,9 @@ describe("RunEngine ttl", () => { const expiredEvents: EventBusEventArgs<"runExpired">[0][] = []; - // Disable worker to prevent the scheduleExpireRun job from firing before - // we can test the dequeue path. Use masterQueueConsumersDisabled so we can - // manually trigger dequeue via processMasterQueueForEnvironment. - // TTL consumers start independently and will expire the run after their poll interval. + // Use masterQueueConsumersDisabled so we can manually trigger dequeue via + // processMasterQueueForEnvironment. TTL consumers start independently and + // will expire the run after their poll interval. const engine = new RunEngine({ prisma, worker: { @@ -651,6 +894,7 @@ describe("RunEngine ttl", () => { ttlSystem: { pollIntervalMs: 5000, batchSize: 10, + batchMaxWaitMs: 100, }, }, runLock: { @@ -781,6 +1025,7 @@ describe("RunEngine ttl", () => { ttlSystem: { pollIntervalMs: 5000, batchSize: 10, + batchMaxWaitMs: 100, }, }, runLock: { @@ -878,7 +1123,6 @@ describe("RunEngine ttl", () => { async ({ prisma, redisOptions }) => { const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); - // Disable worker to prevent the scheduleExpireRun job from firing. // Use masterQueueConsumersDisabled so we can manually trigger dequeue. // Very long TTL consumer interval so it doesn't interfere. const engine = new RunEngine({ @@ -1246,6 +1490,7 @@ describe("RunEngine ttl", () => { ttlSystem: { pollIntervalMs: 100, batchSize: 10, + batchMaxWaitMs: 100, }, }, runLock: { @@ -1272,6 +1517,16 @@ describe("RunEngine ttl", () => { await setupBackgroundWorker(engine, authenticatedEnvironment, [parentTask, childTask]); + // TTL only expires runs still queued waiting on a concurrency slot. + // Cap env concurrency at exactly 1 (limit=1, burstFactor=1) so the + // parent takes the only slot and the child stays queued long enough + // for the new TTL path to expire it. + await engine.runQueue.updateEnvConcurrencyLimits({ + ...authenticatedEnvironment, + maximumConcurrencyLimit: 1, + concurrencyLimitBurstFactor: new Decimal(1.0), + }); + // Trigger the parent run const parentRun = await engine.trigger( {