diff --git a/src/agents/bash-process-registry.e2e.test.ts b/src/agents/bash-process-registry.e2e.test.ts index 29f47a3f6c3e..4606c3d77e9e 100644 --- a/src/agents/bash-process-registry.e2e.test.ts +++ b/src/agents/bash-process-registry.e2e.test.ts @@ -5,7 +5,10 @@ import { addSession, appendOutput, drainSession, + getSession, + killAllSessions, listFinishedSessions, + listRunningSessions, markBackgrounded, markExited, resetProcessRegistryForTests, @@ -114,4 +117,43 @@ describe("bash process registry", () => { markExited(session, 0, null, "completed"); expect(listFinishedSessions()).toHaveLength(1); }); + + it("killAllSessions kills running children and clears registries", () => { + const killFnA = vi.fn(); + const killFnB = vi.fn(); + const childA = { + pid: 100, + kill: killFnA, + removeAllListeners: vi.fn(), + } as unknown as ChildProcessWithoutNullStreams; + const session = createProcessSessionFixture({ + id: "kill-test", + command: "sleep 60", + child: childA, + maxOutputChars: 1000, + }); + addSession(session); + markBackgrounded(session); + + const bgSession = createProcessSessionFixture({ + id: "kill-test-bg", + command: "sleep 120", + child: { + pid: 200, + kill: killFnB, + removeAllListeners: vi.fn(), + } as unknown as ChildProcessWithoutNullStreams, + maxOutputChars: 1000, + }); + addSession(bgSession); + + killAllSessions(); + + expect(killFnA).toHaveBeenCalledWith("SIGKILL"); + expect(killFnB).toHaveBeenCalledWith("SIGKILL"); + expect(getSession("kill-test")).toBeUndefined(); + expect(getSession("kill-test-bg")).toBeUndefined(); + expect(listRunningSessions()).toHaveLength(0); + expect(listFinishedSessions()).toHaveLength(0); + }); }); diff --git a/src/agents/bash-process-registry.ts b/src/agents/bash-process-registry.ts index 0e84065c7f2f..f6469842abfa 100644 --- a/src/agents/bash-process-registry.ts +++ b/src/agents/bash-process-registry.ts @@ -268,6 +268,25 @@ export function clearFinished() { finishedSessions.clear(); } +/** + * Kill all running sessions and clean up resources. + * Called during gateway shutdown to prevent orphaned child processes. + */ +export function killAllSessions(): void { + for (const session of runningSessions.values()) { + if (session.child && !session.exited) { + try { + session.child.kill("SIGKILL"); + } catch { + // Process already exited + } + } + } + runningSessions.clear(); + finishedSessions.clear(); + stopSweeper(); +} + export function resetProcessRegistryForTests() { runningSessions.clear(); finishedSessions.clear(); diff --git a/src/gateway/server-close.ts b/src/gateway/server-close.ts index da9f5a39e97d..b1981ad14e37 100644 --- a/src/gateway/server-close.ts +++ b/src/gateway/server-close.ts @@ -1,10 +1,12 @@ import type { Server as HttpServer } from "node:http"; import type { WebSocketServer } from "ws"; +import { killAllSessions } from "../agents/bash-process-registry.js"; import type { CanvasHostHandler, CanvasHostServer } from "../canvas-host/server.js"; import { type ChannelId, listChannelPlugins } from "../channels/plugins/index.js"; import { stopGmailWatcher } from "../hooks/gmail-watcher.js"; import type { HeartbeatRunner } from "../infra/heartbeat-runner.js"; import type { PluginServicesHandle } from "../plugins/services.js"; +import { getProcessSupervisor } from "../process/supervisor/index.js"; export function createGatewayCloseHandler(params: { bonjourStop: (() => Promise) | null; @@ -68,6 +70,8 @@ export function createGatewayCloseHandler(params: { await params.pluginServices.stop().catch(() => {}); } await stopGmailWatcher(); + getProcessSupervisor().cancelAll("manual-cancel"); + killAllSessions(); params.cron.stop(); params.heartbeatRunner.stop(); for (const timer of params.nodePresenceTimers.values()) { diff --git a/src/process/supervisor/supervisor.test.ts b/src/process/supervisor/supervisor.test.ts index 79d184672a5d..569dd46d8b6b 100644 --- a/src/process/supervisor/supervisor.test.ts +++ b/src/process/supervisor/supervisor.test.ts @@ -99,4 +99,31 @@ describe("process supervisor", () => { expect(streamed).toBe("streamed"); expect(exit.stdout).toBe(""); }); + + it("cancelAll terminates all active runs", async () => { + const supervisor = createProcessSupervisor(); + const a = await supervisor.spawn({ + sessionId: "s-all-1", + backendId: "test", + scopeKey: "scope:x", + mode: "child", + argv: [process.execPath, "-e", "setTimeout(() => {}, 10_000)"], + timeoutMs: 10_000, + stdinMode: "pipe-closed", + }); + const b = await supervisor.spawn({ + sessionId: "s-all-2", + backendId: "test", + scopeKey: "scope:y", + mode: "child", + argv: [process.execPath, "-e", "setTimeout(() => {}, 10_000)"], + timeoutMs: 10_000, + stdinMode: "pipe-closed", + }); + + supervisor.cancelAll(); + const [exitA, exitB] = await Promise.all([a.wait(), b.wait()]); + expect(exitA.reason === "manual-cancel" || exitA.reason === "signal").toBe(true); + expect(exitB.reason === "manual-cancel" || exitB.reason === "signal").toBe(true); + }); }); diff --git a/src/process/supervisor/supervisor.ts b/src/process/supervisor/supervisor.ts index 3c6834003f0e..59496345ffe9 100644 --- a/src/process/supervisor/supervisor.ts +++ b/src/process/supervisor/supervisor.ts @@ -58,6 +58,12 @@ export function createProcessSupervisor(): ProcessSupervisor { } }; + const cancelAll = (reason: TerminationReason = "manual-cancel") => { + for (const runId of Array.from(active.keys())) { + cancel(runId, reason); + } + }; + const spawn = async (input: SpawnInput): Promise => { const runId = input.runId?.trim() || crypto.randomUUID(); if (input.replaceExistingScope && input.scopeKey?.trim()) { @@ -273,6 +279,7 @@ export function createProcessSupervisor(): ProcessSupervisor { spawn, cancel, cancelScope, + cancelAll, reconcileOrphans: async () => { // Deliberate no-op: this supervisor uses in-memory ownership only. // Active runs are not recovered after process restart in the current model. diff --git a/src/process/supervisor/types.ts b/src/process/supervisor/types.ts index 04c571b08b2d..c6190fd0488e 100644 --- a/src/process/supervisor/types.ts +++ b/src/process/supervisor/types.ts @@ -91,6 +91,7 @@ export interface ProcessSupervisor { spawn(input: SpawnInput): Promise; cancel(runId: string, reason?: TerminationReason): void; cancelScope(scopeKey: string, reason?: TerminationReason): void; + cancelAll(reason?: TerminationReason): void; reconcileOrphans(): Promise; getRecord(runId: string): RunRecord | undefined; }