Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions src/agents/bash-process-registry.e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import {
addSession,
appendOutput,
drainSession,
getSession,
killAllSessions,
listFinishedSessions,
listRunningSessions,
markBackgrounded,
markExited,
resetProcessRegistryForTests,
Expand Down Expand Up @@ -114,4 +117,43 @@ describe("bash process registry", () => {
markExited(session, 0, null, "completed");
expect(listFinishedSessions()).toHaveLength(1);
});

it("killAllSessions kills running children and clears registries", () => {
const killFnA = vi.fn();
const killFnB = vi.fn();
const childA = {
pid: 100,
kill: killFnA,
removeAllListeners: vi.fn(),
} as unknown as ChildProcessWithoutNullStreams;
const session = createProcessSessionFixture({
id: "kill-test",
command: "sleep 60",
child: childA,
maxOutputChars: 1000,
});
addSession(session);
markBackgrounded(session);

const bgSession = createProcessSessionFixture({
id: "kill-test-bg",
command: "sleep 120",
child: {
pid: 200,
kill: killFnB,
removeAllListeners: vi.fn(),
} as unknown as ChildProcessWithoutNullStreams,
maxOutputChars: 1000,
});
addSession(bgSession);

killAllSessions();

expect(killFnA).toHaveBeenCalledWith("SIGKILL");
expect(killFnB).toHaveBeenCalledWith("SIGKILL");
expect(getSession("kill-test")).toBeUndefined();
expect(getSession("kill-test-bg")).toBeUndefined();
expect(listRunningSessions()).toHaveLength(0);
expect(listFinishedSessions()).toHaveLength(0);
});
});
19 changes: 19 additions & 0 deletions src/agents/bash-process-registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,25 @@ export function clearFinished() {
finishedSessions.clear();
}

/**
* Kill all running sessions and clean up resources.
* Called during gateway shutdown to prevent orphaned child processes.
*/
export function killAllSessions(): void {
for (const session of runningSessions.values()) {
if (session.child && !session.exited) {
try {
session.child.kill("SIGKILL");
} catch {
// Process already exited
}
}
}
runningSessions.clear();
finishedSessions.clear();
stopSweeper();
}

export function resetProcessRegistryForTests() {
runningSessions.clear();
finishedSessions.clear();
Expand Down
4 changes: 4 additions & 0 deletions src/gateway/server-close.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import type { Server as HttpServer } from "node:http";
import type { WebSocketServer } from "ws";
import { killAllSessions } from "../agents/bash-process-registry.js";
import type { CanvasHostHandler, CanvasHostServer } from "../canvas-host/server.js";
import { type ChannelId, listChannelPlugins } from "../channels/plugins/index.js";
import { stopGmailWatcher } from "../hooks/gmail-watcher.js";
import type { HeartbeatRunner } from "../infra/heartbeat-runner.js";
import type { PluginServicesHandle } from "../plugins/services.js";
import { getProcessSupervisor } from "../process/supervisor/index.js";

export function createGatewayCloseHandler(params: {
bonjourStop: (() => Promise<void>) | null;
Expand Down Expand Up @@ -68,6 +70,8 @@ export function createGatewayCloseHandler(params: {
await params.pluginServices.stop().catch(() => {});
}
await stopGmailWatcher();
getProcessSupervisor().cancelAll("manual-cancel");
killAllSessions();
params.cron.stop();
params.heartbeatRunner.stop();
for (const timer of params.nodePresenceTimers.values()) {
Expand Down
27 changes: 27 additions & 0 deletions src/process/supervisor/supervisor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,31 @@ describe("process supervisor", () => {
expect(streamed).toBe("streamed");
expect(exit.stdout).toBe("");
});

it("cancelAll terminates all active runs", async () => {
const supervisor = createProcessSupervisor();
const a = await supervisor.spawn({
sessionId: "s-all-1",
backendId: "test",
scopeKey: "scope:x",
mode: "child",
argv: [process.execPath, "-e", "setTimeout(() => {}, 10_000)"],
timeoutMs: 10_000,
stdinMode: "pipe-closed",
});
const b = await supervisor.spawn({
sessionId: "s-all-2",
backendId: "test",
scopeKey: "scope:y",
mode: "child",
argv: [process.execPath, "-e", "setTimeout(() => {}, 10_000)"],
timeoutMs: 10_000,
stdinMode: "pipe-closed",
});

supervisor.cancelAll();
const [exitA, exitB] = await Promise.all([a.wait(), b.wait()]);
expect(exitA.reason === "manual-cancel" || exitA.reason === "signal").toBe(true);
expect(exitB.reason === "manual-cancel" || exitB.reason === "signal").toBe(true);
});
});
7 changes: 7 additions & 0 deletions src/process/supervisor/supervisor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ export function createProcessSupervisor(): ProcessSupervisor {
}
};

const cancelAll = (reason: TerminationReason = "manual-cancel") => {
for (const runId of Array.from(active.keys())) {
cancel(runId, reason);
}
};

const spawn = async (input: SpawnInput): Promise<ManagedRun> => {
const runId = input.runId?.trim() || crypto.randomUUID();
if (input.replaceExistingScope && input.scopeKey?.trim()) {
Expand Down Expand Up @@ -273,6 +279,7 @@ export function createProcessSupervisor(): ProcessSupervisor {
spawn,
cancel,
cancelScope,
cancelAll,
reconcileOrphans: async () => {
// Deliberate no-op: this supervisor uses in-memory ownership only.
// Active runs are not recovered after process restart in the current model.
Expand Down
1 change: 1 addition & 0 deletions src/process/supervisor/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ export interface ProcessSupervisor {
spawn(input: SpawnInput): Promise<ManagedRun>;
cancel(runId: string, reason?: TerminationReason): void;
cancelScope(scopeKey: string, reason?: TerminationReason): void;
cancelAll(reason?: TerminationReason): void;
reconcileOrphans(): Promise<void>;
getRecord(runId: string): RunRecord | undefined;
}