Skip to content

Commit 022f69c

Browse files
authored
fix(v3): redesign of batch completion to prevent heavy row-level contention on the BatchTaskRun (#2930)
1 parent d893b26 commit 022f69c

File tree

2 files changed

+104
-86
lines changed

2 files changed

+104
-86
lines changed

apps/webapp/app/v3/legacyRunEngineWorker.server.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@ import { env } from "~/env.server";
55
import { logger } from "~/services/logger.server";
66
import { singleton } from "~/utils/singleton";
77
import { TaskRunHeartbeatFailedService } from "./taskRunHeartbeatFailed.server";
8-
import { completeBatchTaskRunItemV3 } from "./services/batchTriggerV3.server";
8+
import {
9+
completeBatchTaskRunItemV3,
10+
tryCompleteBatchV3,
11+
} from "./services/batchTriggerV3.server";
912
import { prisma } from "~/db.server";
1013
import { marqs } from "./marqs/index.server";
1114

@@ -50,6 +53,16 @@ function initializeWorker() {
5053
maxAttempts: 10,
5154
},
5255
},
56+
tryCompleteBatchV3: {
57+
schema: z.object({
58+
batchId: z.string(),
59+
scheduleResumeOnComplete: z.boolean(),
60+
}),
61+
visibilityTimeoutMs: 30_000,
62+
retry: {
63+
maxAttempts: 5,
64+
},
65+
},
5366
scheduleRequeueMessage: {
5467
schema: z.object({
5568
messageId: z.string(),
@@ -85,6 +98,9 @@ function initializeWorker() {
8598
attempt
8699
);
87100
},
101+
tryCompleteBatchV3: async ({ payload }) => {
102+
await tryCompleteBatchV3(payload.batchId, prisma, payload.scheduleResumeOnComplete);
103+
},
88104
scheduleRequeueMessage: async ({ payload }) => {
89105
await marqs.requeueMessageById(payload.messageId);
90106
},

apps/webapp/app/v3/services/batchTriggerV3.server.ts

Lines changed: 87 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import {
1414
TaskRunAttempt,
1515
} from "@trigger.dev/database";
1616
import { z } from "zod";
17-
import { $transaction, prisma, PrismaClientOrTransaction } from "~/db.server";
17+
import { prisma, PrismaClientOrTransaction } from "~/db.server";
1818
import { env } from "~/env.server";
1919
import { batchTaskRunItemStatusForRunStatus } from "~/models/taskRun.server";
2020
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
@@ -934,6 +934,69 @@ export class BatchTriggerV3Service extends BaseService {
934934
}
935935
}
936936

937+
export async function tryCompleteBatchV3(
938+
batchId: string,
939+
tx: PrismaClientOrTransaction,
940+
scheduleResumeOnComplete: boolean
941+
) {
942+
const batch = await tx.batchTaskRun.findFirst({
943+
where: { id: batchId },
944+
select: {
945+
id: true,
946+
sealed: true,
947+
status: true,
948+
expectedCount: true,
949+
dependentTaskAttemptId: true,
950+
},
951+
});
952+
953+
if (!batch) {
954+
logger.debug("tryCompleteBatchV3: Batch not found", { batchId });
955+
return;
956+
}
957+
958+
if (batch.status === "COMPLETED") {
959+
logger.debug("tryCompleteBatchV3: Already completed", { batchId });
960+
return;
961+
}
962+
963+
if (!batch.sealed) {
964+
logger.debug("tryCompleteBatchV3: Not sealed yet", { batchId });
965+
return;
966+
}
967+
968+
// Count completed items (read-only, no contention)
969+
const completedCount = await tx.batchTaskRunItem.count({
970+
where: { batchTaskRunId: batchId, status: "COMPLETED" },
971+
});
972+
973+
if (completedCount < batch.expectedCount) {
974+
logger.debug("tryCompleteBatchV3: Not all items completed", {
975+
batchId,
976+
completedCount,
977+
expectedCount: batch.expectedCount,
978+
});
979+
return;
980+
}
981+
982+
// Mark batch COMPLETED (idempotent via status check)
983+
const updated = await tx.batchTaskRun.updateMany({
984+
where: { id: batchId, status: "PENDING" },
985+
data: { status: "COMPLETED", completedAt: new Date(), completedCount },
986+
});
987+
988+
if (updated.count === 0) {
989+
logger.debug("tryCompleteBatchV3: Already transitioned", { batchId });
990+
return;
991+
}
992+
993+
logger.debug("tryCompleteBatchV3: Batch completed", { batchId, completedCount });
994+
995+
if (scheduleResumeOnComplete && batch.dependentTaskAttemptId) {
996+
await ResumeBatchRunService.enqueue(batchId, true, tx);
997+
}
998+
}
999+
9371000
export async function completeBatchTaskRunItemV3(
9381001
itemId: string,
9391002
batchTaskRunId: string,
@@ -953,86 +1016,32 @@ export async function completeBatchTaskRunItemV3(
9531016
isRetry,
9541017
});
9551018

956-
if (isRetry) {
957-
logger.debug("completeBatchTaskRunItemV3 retrying", {
958-
itemId,
959-
batchTaskRunId,
960-
scheduleResumeOnComplete,
961-
taskRunAttemptId,
962-
retryAttempt,
963-
});
964-
}
965-
9661019
try {
967-
await $transaction(
968-
tx,
969-
"completeBatchTaskRunItemV3",
970-
async (tx, span) => {
971-
span?.setAttribute("batch_id", batchTaskRunId);
972-
973-
// Update the item to complete
974-
const updated = await tx.batchTaskRunItem.updateMany({
975-
where: {
976-
id: itemId,
977-
status: "PENDING",
978-
},
979-
data: {
980-
status: "COMPLETED",
981-
taskRunAttemptId,
982-
},
983-
});
984-
985-
if (updated.count === 0) {
986-
return;
987-
}
988-
989-
const updatedBatchRun = await tx.batchTaskRun.update({
990-
where: {
991-
id: batchTaskRunId,
992-
},
993-
data: {
994-
completedCount: {
995-
increment: 1,
996-
},
997-
},
998-
select: {
999-
sealed: true,
1000-
status: true,
1001-
completedCount: true,
1002-
expectedCount: true,
1003-
dependentTaskAttemptId: true,
1004-
},
1005-
});
1020+
// Update item to COMPLETED (no transaction needed, no contention)
1021+
const updated = await tx.batchTaskRunItem.updateMany({
1022+
where: { id: itemId, status: "PENDING" },
1023+
data: { status: "COMPLETED", taskRunAttemptId },
1024+
});
10061025

1007-
if (
1008-
updatedBatchRun.status === "PENDING" &&
1009-
updatedBatchRun.completedCount === updatedBatchRun.expectedCount &&
1010-
updatedBatchRun.sealed
1011-
) {
1012-
await tx.batchTaskRun.update({
1013-
where: {
1014-
id: batchTaskRunId,
1015-
},
1016-
data: {
1017-
status: "COMPLETED",
1018-
completedAt: new Date(),
1019-
},
1020-
});
1026+
if (updated.count === 0) {
1027+
logger.debug("completeBatchTaskRunItemV3: Item already completed", {
1028+
itemId,
1029+
batchTaskRunId,
1030+
});
1031+
return;
1032+
}
10211033

1022-
// We only need to resume the batch if it has a dependent task attempt ID
1023-
if (scheduleResumeOnComplete && updatedBatchRun.dependentTaskAttemptId) {
1024-
await ResumeBatchRunService.enqueue(batchTaskRunId, true, tx);
1025-
}
1026-
}
1027-
},
1028-
{
1029-
timeout: 10_000,
1030-
maxWait: 4_000,
1031-
}
1032-
);
1034+
// Schedule debounced completion check
1035+
// enqueue with same ID overwrites, resetting the 200ms timer (debounce behavior)
1036+
await legacyRunEngineWorker.enqueue({
1037+
id: `tryCompleteBatchV3:${batchTaskRunId}`,
1038+
job: "tryCompleteBatchV3",
1039+
payload: { batchId: batchTaskRunId, scheduleResumeOnComplete },
1040+
availableAt: new Date(Date.now() + 200),
1041+
});
10331042
} catch (error) {
10341043
if (isPrismaRetriableError(error) || isPrismaRaceConditionError(error)) {
1035-
logger.error("completeBatchTaskRunItemV3 failed with a Prisma Error, scheduling a retry", {
1044+
logger.error("completeBatchTaskRunItemV3 failed, scheduling retry", {
10361045
itemId,
10371046
batchTaskRunId,
10381047
error,
@@ -1041,24 +1050,17 @@ export async function completeBatchTaskRunItemV3(
10411050
});
10421051

10431052
if (isRetry) {
1044-
//throwing this error will cause the Redis worker to retry the job
10451053
throw error;
10461054
} else {
1047-
//schedule a retry
10481055
await legacyRunEngineWorker.enqueue({
10491056
id: `completeBatchTaskRunItem:${itemId}`,
10501057
job: "completeBatchTaskRunItem",
1051-
payload: {
1052-
itemId,
1053-
batchTaskRunId,
1054-
scheduleResumeOnComplete,
1055-
taskRunAttemptId,
1056-
},
1058+
payload: { itemId, batchTaskRunId, scheduleResumeOnComplete, taskRunAttemptId },
10571059
availableAt: new Date(Date.now() + 2_000),
10581060
});
10591061
}
10601062
} else {
1061-
logger.error("completeBatchTaskRunItemV3 failed with a non-retriable error", {
1063+
logger.error("completeBatchTaskRunItemV3 failed with non-retriable error", {
10621064
itemId,
10631065
batchTaskRunId,
10641066
error,

0 commit comments

Comments
 (0)