Skip to content

Commit 9b9e736

Browse files
committed
fallback to graphile worker when a tx is required
1 parent ea56e0a commit 9b9e736

File tree

5 files changed

+48
-16
lines changed

5 files changed

+48
-16
lines changed

apps/webapp/app/services/worker.server.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import { logger } from "./logger.server";
3333
const workerCatalog = {
3434
// @deprecated, moved to commonWorker.server.ts
3535
scheduleEmail: DeliverEmailSchema,
36-
// @deprecated, moved to commonWorker.server.ts
36+
// @deprecated, but still used when resuming batch runs in a transaction
3737
"v3.resumeBatchRun": z.object({
3838
batchRunId: z.string(),
3939
}),
@@ -164,7 +164,7 @@ function getWorkerQueue() {
164164
await sendEmail(payload);
165165
},
166166
},
167-
// @deprecated, moved to commonWorker.server.ts
167+
// @deprecated, moved to commonWorker.server.ts but still used when resuming batch runs in a transaction
168168
"v3.resumeBatchRun": {
169169
priority: 0,
170170
maxAttempts: 5,

apps/webapp/app/v3/services/batchTriggerV3.server.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -942,6 +942,15 @@ export async function completeBatchTaskRunItemV3(
942942
) {
943943
const isRetry = retryAttempt !== undefined;
944944

945+
logger.debug("completeBatchTaskRunItemV3", {
946+
itemId,
947+
batchTaskRunId,
948+
scheduleResumeOnComplete,
949+
taskRunAttemptId,
950+
retryAttempt,
951+
isRetry,
952+
});
953+
945954
if (isRetry) {
946955
logger.debug("completeBatchTaskRunItemV3 retrying", {
947956
itemId,

apps/webapp/app/v3/services/createCheckpoint.server.ts

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -392,11 +392,7 @@ export class CreateCheckpointService extends BaseService {
392392
checkpointEventId: checkpointEvent.id,
393393
});
394394

395-
await ResumeBatchRunService.enqueue(
396-
batchRun.id,
397-
batchRun.batchVersion === "v3",
398-
this._prisma
399-
);
395+
await ResumeBatchRunService.enqueue(batchRun.id, batchRun.batchVersion === "v3");
400396

401397
return {
402398
success: true,

apps/webapp/app/v3/services/finalizeTaskRun.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ export class FinalizeTaskRunService extends BaseService {
244244

245245
// This won't resume because this batch does not have a dependent task attempt ID
246246
// or is in development, but this service will mark the batch as completed
247-
await ResumeBatchRunService.enqueue(item.batchTaskRunId, false, this._prisma);
247+
await ResumeBatchRunService.enqueue(item.batchTaskRunId, false);
248248
}
249249
}
250250
}

apps/webapp/app/v3/services/resumeBatchRun.server.ts

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { marqs } from "~/v3/marqs/index.server";
44
import { BaseService } from "./baseService.server";
55
import { logger } from "~/services/logger.server";
66
import { BatchTaskRun } from "@trigger.dev/database";
7+
import { workerQueue } from "~/services/worker.server";
78

89
const finishedBatchRunStatuses = ["COMPLETED", "FAILED", "CANCELED"];
910

@@ -331,17 +332,43 @@ export class ResumeBatchRunService extends BaseService {
331332
static async enqueue(
332333
batchRunId: string,
333334
skipJobKey: boolean,
334-
tx: PrismaClientOrTransaction,
335+
tx?: PrismaClientOrTransaction,
335336
runAt?: Date
336337
) {
337-
return await commonWorker.enqueue({
338-
id: skipJobKey ? undefined : `resumeBatchRun-${batchRunId}`,
339-
job: "v3.resumeBatchRun",
340-
payload: {
338+
if (tx) {
339+
logger.debug("ResumeBatchRunService: Enqueuing resume batch run using workerQueue", {
341340
batchRunId,
342-
},
343-
availableAt: runAt,
344-
});
341+
skipJobKey,
342+
runAt,
343+
});
344+
345+
return await workerQueue.enqueue(
346+
"v3.resumeBatchRun",
347+
{
348+
batchRunId,
349+
},
350+
{
351+
jobKey: skipJobKey ? undefined : `resumeBatchRun-${batchRunId}`,
352+
runAt,
353+
tx,
354+
}
355+
);
356+
} else {
357+
logger.debug("ResumeBatchRunService: Enqueuing resume batch run using commonWorker", {
358+
batchRunId,
359+
skipJobKey,
360+
runAt,
361+
});
362+
363+
return await commonWorker.enqueue({
364+
id: skipJobKey ? undefined : `resumeBatchRun-${batchRunId}`,
365+
job: "v3.resumeBatchRun",
366+
payload: {
367+
batchRunId,
368+
},
369+
availableAt: runAt,
370+
});
371+
}
345372
}
346373
}
347374

0 commit comments

Comments
 (0)