Skip to content

Commit 45e4887

Browse files
committed
fix: prevent MVCC race in blockRunWithWaitpoint pending check
Split the CTE in blockRunWithWaitpoint so the pending waitpoint check is a separate SQL statement. In READ COMMITTED isolation, each statement gets its own snapshot, so a separate SELECT sees the latest committed state from concurrent completeWaitpoint calls. Previously, the CTE did INSERT + pending check in one statement (one snapshot). If completeWaitpoint committed between the CTE start and the SELECT, the SELECT would still see PENDING due to the stale snapshot. Neither side would enqueue continueRunIfUnblocked, leaving the run stuck forever.
1 parent a482153 commit 45e4887

File tree

1 file changed

+17
-6
lines changed

1 file changed

+17
-6
lines changed

internal-packages/run-engine/src/engine/systems/waitpointSystem.ts

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -399,8 +399,10 @@ export class WaitpointSystem {
399399
return await this.$.runLock.lock("blockRunWithWaitpoint", [runId], async () => {
400400
let snapshot: TaskRunExecutionSnapshot = await getLatestExecutionSnapshot(prisma, runId);
401401

402-
//block the run with the waitpoints, returning how many waitpoints are pending
403-
const insert = await prisma.$queryRaw<{ pending_count: BigInt }[]>`
402+
// Insert the blocking connections and the historical run connections.
403+
// We use a CTE to do both inserts atomically. Data-modifying CTEs are
404+
// always executed regardless of whether they're referenced in the outer query.
405+
await prisma.$queryRaw`
404406
WITH inserted AS (
405407
INSERT INTO "TaskRunWaitpoint" ("id", "taskRunId", "waitpointId", "projectId", "createdAt", "updatedAt", "spanIdToComplete", "batchId", "batchIndex")
406408
SELECT
@@ -425,12 +427,21 @@ export class WaitpointSystem {
425427
WHERE w.id IN (${Prisma.join($waitpoints)})
426428
ON CONFLICT DO NOTHING
427429
)
430+
SELECT COUNT(*) FROM inserted`;
431+
432+
// Check if the run is actually blocked using a separate query.
433+
// This MUST be a separate statement from the CTE above because in READ COMMITTED
434+
// isolation, each statement gets its own snapshot. The CTE's snapshot is taken when
435+
// it starts, so if a concurrent completeWaitpoint commits during the CTE, the CTE
436+
// won't see it. This fresh query gets a new snapshot that reflects the latest commits.
437+
const pendingCheck = await prisma.$queryRaw<{ pending_count: BigInt }[]>`
428438
SELECT COUNT(*) as pending_count
429-
FROM inserted i
430-
JOIN "Waitpoint" w ON w.id = i."waitpointId"
431-
WHERE w.status = 'PENDING';`;
439+
FROM "Waitpoint"
440+
WHERE id IN (${Prisma.join($waitpoints)})
441+
AND status = 'PENDING'
442+
`;
432443

433-
const isRunBlocked = Number(insert.at(0)?.pending_count ?? 0) > 0;
444+
const isRunBlocked = Number(pendingCheck.at(0)?.pending_count ?? 0) > 0;
434445

435446
let newStatus: TaskRunExecutionStatus = "SUSPENDED";
436447
if (

0 commit comments

Comments
 (0)