Skip to content

Commit 436f20e

Browse files
authored
feat(cli): auto-cancel dev runs on CLI exit via detached watchdog (#3191)
When the dev CLI exits (e.g. ctrl+c via pnpm), runs that were mid-execution previously stayed stuck in EXECUTING status for up to 5 minutes until the heartbeat timeout fired. Now they are cancelled within seconds. The dev CLI spawns a lightweight detached watchdog process at startup. The watchdog monitors the CLI process ID and, when it detects the CLI has exited, calls a new POST /engine/v1/dev/disconnect endpoint to cancel all in-flight runs immediately (skipping PENDING_CANCEL since the worker is known to be dead). Watchdog design: - Fully detached (detached: true, stdio: ignore, unref()) so it survives even when pnpm sends SIGKILL to the process tree - Active run IDs maintained via atomic file write (.trigger/active-runs.json) - Single-instance guarantee via PID file (.trigger/watchdog.pid) - Safety timeout: exits after 24 hours to prevent zombie processes - On clean shutdown, the watchdog is killed (no disconnect needed) Disconnect endpoint: - Rate-limited: 5 calls/min per environment - Capped at 500 runs per call - Small counts (<= 25): cancelled inline with pMap concurrency 10 - Large counts: delegated to the bulk action system - Uses finalizeRun: true to skip PENDING_CANCEL and go straight to FINISHED Run engine change: - cancelRun() now respects finalizeRun when the run is in EXECUTING status, skipping the PENDING_CANCEL waiting state and going directly to FINISHED
1 parent d1ea8d8 commit 436f20e

File tree

10 files changed

+564
-31
lines changed

10 files changed

+564
-31
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"trigger.dev": patch
3+
"@trigger.dev/core": patch
4+
---
5+
6+
Auto-cancel in-flight dev runs when the CLI exits, using a detached watchdog process that survives pnpm SIGKILL
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: feature
4+
---
5+
6+
Added `/engine/v1/dev/disconnect` endpoint to auto-cancel runs when the CLI disconnects. Maximum of 500 runs can be cancelled. Uses the bulk action system when there are more than 25 runs to cancel.
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
import { json } from "@remix-run/server-runtime";
2+
import { Ratelimit } from "@upstash/ratelimit";
3+
import { tryCatch } from "@trigger.dev/core";
4+
import { DevDisconnectRequestBody } from "@trigger.dev/core/v3";
5+
import { BulkActionId, RunId } from "@trigger.dev/core/v3/isomorphic";
6+
import { BulkActionNotificationType, BulkActionType } from "@trigger.dev/database";
7+
import { prisma } from "~/db.server";
8+
import { logger } from "~/services/logger.server";
9+
import { RateLimiter } from "~/services/rateLimiter.server";
10+
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
11+
import { CancelTaskRunService } from "~/v3/services/cancelTaskRun.server";
12+
import { commonWorker } from "~/v3/commonWorker.server";
13+
import pMap from "p-map";
14+
15+
const CANCEL_REASON = "Dev session ended (CLI exited)";
16+
17+
// Below this threshold, cancel runs inline with pMap.
18+
// Above it, create a bulk action and process asynchronously.
19+
const BULK_ACTION_THRESHOLD = 25;
20+
21+
// Maximum number of runs that can be cancelled in a single disconnect call.
22+
const MAX_RUNS = 500;
23+
24+
// Rate limit: 5 calls per minute per environment
25+
const disconnectRateLimiter = new RateLimiter({
26+
keyPrefix: "dev-disconnect",
27+
limiter: Ratelimit.fixedWindow(5, "1 m"),
28+
logFailure: true,
29+
});
30+
31+
const { action } = createActionApiRoute(
32+
{
33+
body: DevDisconnectRequestBody,
34+
maxContentLength: 1024 * 256, // 256KB
35+
method: "POST",
36+
},
37+
async ({ authentication, body }) => {
38+
// Only allow dev environments — this endpoint uses finalizeRun which
39+
// skips PENDING_CANCEL and immediately finalizes executing runs.
40+
if (authentication.environment.type !== "DEVELOPMENT") {
41+
return json({ error: "This endpoint is only available for dev environments" }, { status: 403 });
42+
}
43+
44+
const environmentId = authentication.environment.id;
45+
46+
// Rate limit per environment
47+
const rateLimitResult = await disconnectRateLimiter.limit(environmentId);
48+
if (!rateLimitResult.success) {
49+
return json(
50+
{ error: "Rate limit exceeded", retryAfter: Math.ceil((rateLimitResult.reset - Date.now()) / 1000) },
51+
{ status: 429 }
52+
);
53+
}
54+
55+
if (body.runFriendlyIds.length > MAX_RUNS) {
56+
return json(
57+
{ error: `A maximum of ${MAX_RUNS} runs can be cancelled per request` },
58+
{ status: 400 }
59+
);
60+
}
61+
62+
const { runFriendlyIds } = body;
63+
64+
if (runFriendlyIds.length === 0) {
65+
return json({ cancelled: 0 }, { status: 200 });
66+
}
67+
68+
logger.info("Dev disconnect: cancelling runs", {
69+
environmentId,
70+
runCount: runFriendlyIds.length,
71+
});
72+
73+
// For small numbers of runs, cancel inline
74+
if (runFriendlyIds.length <= BULK_ACTION_THRESHOLD) {
75+
const cancelled = await cancelRunsInline(runFriendlyIds, environmentId);
76+
return json({ cancelled }, { status: 200 });
77+
}
78+
79+
// For large numbers, create a bulk action to process asynchronously
80+
const bulkActionId = await createBulkCancelAction(
81+
runFriendlyIds,
82+
authentication.environment.project.id,
83+
environmentId
84+
);
85+
86+
logger.info("Dev disconnect: created bulk action for large run set", {
87+
environmentId,
88+
bulkActionId,
89+
runCount: runFriendlyIds.length,
90+
});
91+
92+
return json({ cancelled: 0, bulkActionId }, { status: 200 });
93+
}
94+
);
95+
96+
async function cancelRunsInline(
97+
runFriendlyIds: string[],
98+
environmentId: string
99+
): Promise<number> {
100+
const runIds = runFriendlyIds.map((fid) => RunId.toId(fid));
101+
102+
const runs = await prisma.taskRun.findMany({
103+
where: {
104+
id: { in: runIds },
105+
runtimeEnvironmentId: environmentId,
106+
},
107+
select: {
108+
id: true,
109+
engine: true,
110+
friendlyId: true,
111+
status: true,
112+
createdAt: true,
113+
completedAt: true,
114+
taskEventStore: true,
115+
},
116+
});
117+
118+
let cancelled = 0;
119+
const cancelService = new CancelTaskRunService(prisma);
120+
121+
await pMap(
122+
runs,
123+
async (run) => {
124+
const [error, result] = await tryCatch(
125+
cancelService.call(run, { reason: CANCEL_REASON, finalizeRun: true })
126+
);
127+
128+
if (error) {
129+
logger.error("Dev disconnect: failed to cancel run", {
130+
runId: run.id,
131+
error,
132+
});
133+
} else if (result && !result.alreadyFinished) {
134+
cancelled++;
135+
}
136+
},
137+
{ concurrency: 10 }
138+
);
139+
140+
logger.info("Dev disconnect: completed inline cancellation", {
141+
environmentId,
142+
cancelled,
143+
total: runFriendlyIds.length,
144+
});
145+
146+
return cancelled;
147+
}
148+
149+
async function createBulkCancelAction(
150+
runFriendlyIds: string[],
151+
projectId: string,
152+
environmentId: string
153+
): Promise<string> {
154+
const { id, friendlyId } = BulkActionId.generate();
155+
156+
await prisma.bulkActionGroup.create({
157+
data: {
158+
id,
159+
friendlyId,
160+
projectId,
161+
environmentId,
162+
name: "Dev session disconnect",
163+
type: BulkActionType.CANCEL,
164+
params: { runId: runFriendlyIds, finalizeRun: true },
165+
queryName: "bulk_action_v1",
166+
totalCount: runFriendlyIds.length,
167+
completionNotification: BulkActionNotificationType.NONE,
168+
},
169+
});
170+
171+
await commonWorker.enqueue({
172+
id: `processBulkAction-${id}`,
173+
job: "processBulkAction",
174+
payload: { bulkActionId: id },
175+
});
176+
177+
return friendlyId;
178+
}
179+
180+
export { action };

apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,11 +138,13 @@ export class BulkActionService extends BaseService {
138138
}
139139

140140
// 2. Parse the params
141+
const rawParams = group.params && typeof group.params === "object" ? group.params : {};
142+
const finalizeRun = "finalizeRun" in rawParams && (rawParams as any).finalizeRun === true;
141143
const filters = parseRunListInputOptions({
142144
organizationId: group.project.organizationId,
143145
projectId: group.projectId,
144146
environmentId: group.environmentId,
145-
...(group.params && typeof group.params === "object" ? group.params : {}),
147+
...rawParams,
146148
});
147149

148150
const runsRepository = new RunsRepository({
@@ -199,6 +201,7 @@ export class BulkActionService extends BaseService {
199201
cancelService.call(run, {
200202
reason: `Bulk action ${group.friendlyId} cancelled run`,
201203
bulkActionId: bulkActionId,
204+
finalizeRun,
202205
})
203206
);
204207
if (error) {

apps/webapp/app/v3/services/cancelTaskRun.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ export type CancelTaskRunServiceOptions = {
88
cancelAttempts?: boolean;
99
cancelledAt?: Date;
1010
bulkActionId?: string;
11+
/** Skip PENDING_CANCEL and finalize immediately (use when the worker is known to be dead). */
12+
finalizeRun?: boolean;
1113
};
1214

1315
type CancelTaskRunServiceResult = {
@@ -57,6 +59,7 @@ export class CancelTaskRunService extends BaseService {
5759
runId: taskRun.id,
5860
completedAt: options?.cancelledAt,
5961
reason: options?.reason,
62+
finalizeRun: options?.finalizeRun,
6063
bulkActionId: options?.bulkActionId,
6164
tx: this._prisma,
6265
});

internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1436,35 +1436,39 @@ export class RunAttemptSystem {
14361436
});
14371437

14381438
//if executing, we need to message the worker to cancel the run and put it into `PENDING_CANCEL` status
1439+
//unless finalizeRun is true (worker is known to be dead), in which case skip straight to FINISHED
14391440
if (
14401441
isExecuting(latestSnapshot.executionStatus) ||
14411442
isPendingExecuting(latestSnapshot.executionStatus)
14421443
) {
1443-
const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(prisma, {
1444-
run,
1445-
snapshot: {
1446-
executionStatus: "PENDING_CANCEL",
1447-
description: "Run was cancelled",
1448-
},
1449-
previousSnapshotId: latestSnapshot.id,
1450-
environmentId: latestSnapshot.environmentId,
1451-
environmentType: latestSnapshot.environmentType,
1452-
projectId: latestSnapshot.projectId,
1453-
organizationId: latestSnapshot.organizationId,
1454-
workerId,
1455-
runnerId,
1456-
});
1444+
if (!finalizeRun) {
1445+
const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(prisma, {
1446+
run,
1447+
snapshot: {
1448+
executionStatus: "PENDING_CANCEL",
1449+
description: "Run was cancelled",
1450+
},
1451+
previousSnapshotId: latestSnapshot.id,
1452+
environmentId: latestSnapshot.environmentId,
1453+
environmentType: latestSnapshot.environmentType,
1454+
projectId: latestSnapshot.projectId,
1455+
organizationId: latestSnapshot.organizationId,
1456+
workerId,
1457+
runnerId,
1458+
});
14571459

1458-
//the worker needs to be notified so it can kill the run and complete the attempt
1459-
await sendNotificationToWorker({
1460-
runId,
1461-
snapshot: newSnapshot,
1462-
eventBus: this.$.eventBus,
1463-
});
1464-
return {
1465-
alreadyFinished: false,
1466-
...executionResultFromSnapshot(newSnapshot),
1467-
};
1460+
//the worker needs to be notified so it can kill the run and complete the attempt
1461+
await sendNotificationToWorker({
1462+
runId,
1463+
snapshot: newSnapshot,
1464+
eventBus: this.$.eventBus,
1465+
});
1466+
return {
1467+
alreadyFinished: false,
1468+
...executionResultFromSnapshot(newSnapshot),
1469+
};
1470+
}
1471+
// finalizeRun is true — fall through to finish the run immediately
14681472
}
14691473

14701474
//not executing, so we will actually finish the run

packages/cli-v3/src/apiClient.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import {
77
DevConfigResponseBody,
88
DevDequeueRequestBody,
99
DevDequeueResponseBody,
10+
DevDisconnectRequestBody,
11+
DevDisconnectResponseBody,
1012
EnvironmentVariableResponseBody,
1113
FailDeploymentRequestBody,
1214
FailDeploymentResponseBody,
@@ -557,6 +559,7 @@ export class CliApiClient {
557559
heartbeatRun: this.devHeartbeatRun.bind(this),
558560
startRunAttempt: this.devStartRunAttempt.bind(this),
559561
completeRunAttempt: this.devCompleteRunAttempt.bind(this),
562+
disconnect: this.devDisconnect.bind(this),
560563
setEngineURL: this.setEngineURL.bind(this),
561564
} as const;
562565
}
@@ -681,6 +684,23 @@ export class CliApiClient {
681684
return eventSource;
682685
}
683686

687+
private async devDisconnect(
688+
body: DevDisconnectRequestBody
689+
): Promise<ApiResult<DevDisconnectResponseBody>> {
690+
if (!this.accessToken) {
691+
throw new Error("devDisconnect: No access token");
692+
}
693+
694+
return wrapZodFetch(DevDisconnectResponseBody, `${this.engineURL}/engine/v1/dev/disconnect`, {
695+
method: "POST",
696+
headers: {
697+
Authorization: `Bearer ${this.accessToken}`,
698+
Accept: "application/json",
699+
},
700+
body: JSON.stringify(body),
701+
});
702+
}
703+
684704
private async devDequeue(
685705
body: DevDequeueRequestBody
686706
): Promise<ApiResult<DevDequeueResponseBody>> {

0 commit comments

Comments
 (0)