diff --git a/packages/core/src/cross-spawn-spawner.ts b/packages/core/src/cross-spawn-spawner.ts index ad8d4126d454..da3251cea4da 100644 --- a/packages/core/src/cross-spawn-spawner.ts +++ b/packages/core/src/cross-spawn-spawner.ts @@ -263,29 +263,33 @@ export const make = Effect.gen(function* () { } const spawn = (command: ChildProcess.StandardCommand, opts: NodeChildProcess.SpawnOptions) => - Effect.callback((resume) => { - const signal = Deferred.makeUnsafe() - const proc = launch(command.command, command.args, opts) - let end = false - let exit: readonly [code: number | null, signal: NodeJS.Signals | null] | undefined - proc.on("error", (err) => { - resume(Effect.fail(toPlatformError("spawn", err, command))) - }) - proc.on("exit", (...args) => { - exit = args - }) - proc.on("close", (...args) => { - if (end) return - end = true - Deferred.doneUnsafe(signal, Exit.succeed(exit ?? args)) - }) - proc.on("spawn", () => { - resume(Effect.succeed([proc, signal])) - }) - return Effect.sync(() => { - proc.kill("SIGTERM") - }) - }) + Effect.callback( + (resume) => { + const exited = Deferred.makeUnsafe() + const closed = Deferred.makeUnsafe() + const proc = launch(command.command, command.args, opts) + let exit: readonly [code: number | null, signal: NodeJS.Signals | null] | undefined + proc.on("error", (err) => { + resume(Effect.fail(toPlatformError("spawn", err, command))) + }) + // A detached descendant can hold stdio open after this process exits. Report its + // exit promptly, while keeping "close" for kill/escalation and failed spawns. + proc.on("exit", (...args) => { + exit = args + Deferred.doneUnsafe(exited, Exit.succeed(args)) + }) + proc.on("close", (...args) => { + Deferred.doneUnsafe(closed, Exit.succeed(exit ?? args)) + Deferred.doneUnsafe(exited, Exit.succeed(exit ?? args)) + }) + proc.on("spawn", () => { + resume(Effect.succeed([proc, exited, closed])) + }) + return Effect.sync(() => { + proc.kill("SIGTERM") + }) + }, + ) const killGroup = ( command: ChildProcess.StandardCommand, @@ -319,27 +323,6 @@ export const make = Effect.gen(function* () { return Effect.fail(toPlatformError("kill", new Error("Failed to kill child process"), command)) }) - const timeout = - ( - proc: NodeChildProcess.ChildProcess, - command: ChildProcess.StandardCommand, - opts: ChildProcess.KillOptions | undefined, - ) => - ( - f: ( - command: ChildProcess.StandardCommand, - proc: NodeChildProcess.ChildProcess, - signal: NodeJS.Signals, - ) => Effect.Effect, - ) => { - const signal = opts?.killSignal ?? "SIGTERM" - if (Predicate.isUndefined(opts?.forceKillAfter)) return f(command, proc, signal) - return Effect.timeoutOrElse(f(command, proc, signal), { - duration: opts.forceKillAfter, - orElse: () => f(command, proc, "SIGKILL"), - }) - } - const source = (handle: ChildProcessHandle, from: ChildProcess.PipeFromOption | undefined) => { const opt = from ?? "stdout" switch (opt) { @@ -368,7 +351,7 @@ export const make = Effect.gen(function* () { const extra = fds(command.options) const dir = yield* cwd(command.options) - const [proc, signal] = yield* Effect.acquireRelease( + const [proc, exited, closed] = yield* Effect.acquireRelease( spawn(command, { cwd: dir, env: env(command.options), @@ -377,25 +360,24 @@ export const make = Effect.gen(function* () { shell: command.options.shell, windowsHide: process.platform === "win32", }), - Effect.fnUntraced(function* ([proc, signal]) { - const done = yield* Deferred.isDone(signal) - const kill = timeout(proc, command, command.options) - if (done) { - const [code] = yield* Deferred.await(signal) - if (process.platform === "win32") return yield* Effect.void - if (code !== 0 && Predicate.isNotNull(code)) return yield* Effect.ignore(kill(killGroup)) - return yield* Effect.void - } + Effect.fnUntraced(function* ([proc, exited, closed]) { + const done = yield* Deferred.isDone(exited) const send = (s: NodeJS.Signals) => Effect.catch(killGroup(command, proc, s), () => killOne(command, proc, s)) const sig = command.options.killSignal ?? "SIGTERM" - const attempt = send(sig).pipe(Effect.andThen(Deferred.await(signal)), Effect.asVoid) + const attempt = send(sig).pipe(Effect.andThen(Deferred.await(closed)), Effect.asVoid) const escalated = command.options.forceKillAfter ? Effect.timeoutOrElse(attempt, { duration: command.options.forceKillAfter, - orElse: () => send("SIGKILL").pipe(Effect.andThen(Deferred.await(signal)), Effect.asVoid), + orElse: () => send("SIGKILL").pipe(Effect.andThen(Deferred.await(closed)), Effect.asVoid), }) : attempt + if (done) { + const [code] = yield* Deferred.await(exited) + if (process.platform === "win32") return yield* Effect.void + if (code !== 0 && Predicate.isNotNull(code)) return yield* Effect.ignore(escalated) + return yield* Effect.void + } return yield* Effect.ignore(escalated) }), ) @@ -411,8 +393,8 @@ export const make = Effect.gen(function* () { all: out.all, getInputFd: fd.getInputFd, getOutputFd: fd.getOutputFd, - isRunning: Effect.map(Deferred.isDone(signal), (done) => !done), - exitCode: Effect.flatMap(Deferred.await(signal), ([code, signal]) => { + isRunning: Effect.map(Deferred.isDone(exited), (done) => !done), + exitCode: Effect.flatMap(Deferred.await(exited), ([code, signal]) => { if (Predicate.isNotNull(code)) return Effect.succeed(ExitCode(code)) return Effect.fail( toPlatformError( @@ -426,11 +408,11 @@ export const make = Effect.gen(function* () { const sig = opts?.killSignal ?? "SIGTERM" const send = (s: NodeJS.Signals) => Effect.catch(killGroup(command, proc, s), () => killOne(command, proc, s)) - const attempt = send(sig).pipe(Effect.andThen(Deferred.await(signal)), Effect.asVoid) + const attempt = send(sig).pipe(Effect.andThen(Deferred.await(closed)), Effect.asVoid) if (!opts?.forceKillAfter) return attempt return Effect.timeoutOrElse(attempt, { duration: opts.forceKillAfter, - orElse: () => send("SIGKILL").pipe(Effect.andThen(Deferred.await(signal)), Effect.asVoid), + orElse: () => send("SIGKILL").pipe(Effect.andThen(Deferred.await(closed)), Effect.asVoid), }) }, unref: Effect.sync(() => { diff --git a/packages/core/test/effect/cross-spawn-spawner.test.ts b/packages/core/test/effect/cross-spawn-spawner.test.ts index 8a2fab493063..7da34ab16e0d 100644 --- a/packages/core/test/effect/cross-spawn-spawner.test.ts +++ b/packages/core/test/effect/cross-spawn-spawner.test.ts @@ -58,6 +58,26 @@ async function gone(pid: number, timeout = 5_000) { return !alive(pid) } +async function readPid(file: string, timeout = 5_000) { + const end = Date.now() + timeout + while (Date.now() < end) { + const value = await fs.readFile(file, "utf-8").catch(() => undefined) + if (value) return Number(value) + await new Promise((resolve) => setTimeout(resolve, 50)) + } + throw new Error(`Process did not write its pid to ${file}`) +} + +function stubbornDescendant(pidFile: string, parent: string, opts?: ChildProcess.CommandOptions) { + const code = [ + 'const cp = require("node:child_process")', + 'const fs = require("node:fs")', + 'cp.spawn(process.execPath, ["-e", "const fs = require(\\"node:fs\\"); process.on(\\"SIGTERM\\", () => {}); fs.writeFileSync(process.argv[1], String(process.pid)); setInterval(() => {}, 1000)", process.argv[1]], { stdio: "inherit" })', + parent, + ].join("\n") + return ChildProcess.make(process.execPath, ["-e", code, pidFile], opts) +} + describe("cross-spawn spawner", () => { describe("basic spawning", () => { fx.effect( @@ -275,6 +295,60 @@ describe("cross-spawn spawner", () => { }), ) + fx.live( + "forceKillAfter escalates when the parent exits before a stubborn descendant", + Effect.gen(function* () { + if (process.platform === "win32") return + + const tmp = yield* Effect.acquireRelease( + Effect.promise(() => tmpdir()), + (tmp) => Effect.promise(() => tmp[Symbol.asyncDispose]()), + ) + const pidFile = path.join(tmp.path, "stubborn-child.pid") + const handle = yield* stubbornDescendant(pidFile, "setInterval(() => {}, 1000)") + const pid = yield* Effect.promise(() => readPid(pidFile)) + + yield* handle.kill({ forceKillAfter: 100 }).pipe(Effect.ignore) + const terminated = yield* Effect.promise(() => gone(pid, 1_000)) + if (!terminated) { + yield* Effect.sync(() => process.kill(pid, "SIGKILL")) + } + expect(terminated).toBe(true) + }), + 10_000, + ) + + fx.live( + "scope cleanup escalates after a failed parent leaves a stubborn descendant", + Effect.gen(function* () { + if (process.platform === "win32") return + + const tmp = yield* Effect.acquireRelease( + Effect.promise(() => tmpdir()), + (tmp) => Effect.promise(() => tmp[Symbol.asyncDispose]()), + ) + const pidFile = path.join(tmp.path, "failed-parent-child.pid") + const pid = yield* Effect.scoped( + Effect.gen(function* () { + const handle = yield* stubbornDescendant( + pidFile, + 'const ready = setInterval(() => { if (fs.existsSync(process.argv[1])) { clearInterval(ready); process.exit(42) } }, 10)', + { forceKillAfter: 100 }, + ) + expect(yield* handle.exitCode).toBe(ChildProcessSpawner.ExitCode(42)) + return yield* Effect.promise(() => readPid(pidFile)) + }), + ) + + const terminated = yield* Effect.promise(() => gone(pid, 1_000)) + if (!terminated) { + yield* Effect.sync(() => process.kill(pid, "SIGKILL")) + } + expect(terminated).toBe(true) + }), + 10_000, + ) + fx.effect( "isRunning reflects process state", Effect.gen(function* () { @@ -286,6 +360,54 @@ describe("cross-spawn spawner", () => { ) }) + describe("detached children (issue #24731)", () => { + fx.live( + "exitCode resolves when the main process exits even if a detached child keeps stdio open", + Effect.gen(function* () { + const tmp = yield* Effect.acquireRelease( + Effect.promise(() => tmpdir()), + (tmp) => Effect.promise(() => tmp[Symbol.asyncDispose]()), + ) + const pidFile = path.join(tmp.path, "daemon.pid") + + // Mimics playwright-cli / a backgrounded web server: spawn a long-lived + // detached child that inherits stdio (keeping the parent's stdout pipe's + // write end open), then exit the main process immediately. + const code = [ + 'const cp = require("node:child_process")', + 'const fs = require("node:fs")', + 'const daemon = cp.spawn(process.execPath, ["-e", "setInterval(() => {}, 1000)"], { detached: true, stdio: "inherit" })', + "daemon.unref()", + "fs.writeFileSync(process.argv[1], String(daemon.pid))", + 'process.stdout.write("started")', + "process.exit(0)", + ].join("\n") + + const handle = yield* ChildProcess.make(process.execPath, ["-e", code, pidFile]) + + const result = yield* Effect.raceAll([ + handle.exitCode.pipe(Effect.map((exit) => ({ kind: "exit" as const, code: exit }))), + Effect.sleep("5 seconds").pipe(Effect.as({ kind: "timeout" as const, code: null })), + ]) + + // Reap the detached daemon so it does not leak and so the spawner's + // "close" can fire during scope teardown. + const pid = Number(yield* Effect.promise(() => fs.readFile(pidFile, "utf-8").catch(() => "0"))) + if (pid) { + yield* Effect.sync(() => { + try { + process.kill(pid) + } catch {} + }) + } + + expect(result.kind).toBe("exit") + expect(result.code).toBe(ChildProcessSpawner.ExitCode(0)) + }), + 15_000, + ) + }) + describe("error handling", () => { fx.effect( "fails for invalid command", diff --git a/packages/opencode/src/tool/shell.ts b/packages/opencode/src/tool/shell.ts index b6a95b5c0970..d766722e460a 100644 --- a/packages/opencode/src/tool/shell.ts +++ b/packages/opencode/src/tool/shell.ts @@ -1,4 +1,4 @@ -import { Effect, Stream } from "effect" +import { Effect, Fiber, Stream } from "effect" import os from "os" import { createWriteStream } from "node:fs" import * as Tool from "./tool" @@ -438,6 +438,8 @@ export const ShellTool = Tool.define( let last = "" const list: Chunk[] = [] let used = 0 + let drained = 0 + let processing = false let file = "" let sink: ReturnType | undefined let cut = false @@ -481,8 +483,9 @@ export const ShellTool = Tool.define( yield* Effect.addFinalizer(closeSink) const handle = yield* spawner.spawn(cmd(input.shell, input.command, input.cwd, input.env)) - yield* Effect.forkScoped( + const output = yield* Effect.forkScoped( Stream.runForEach(Stream.decodeText(handle.all), (chunk) => { + processing = true const size = Buffer.byteLength(chunk, "utf-8") list.push({ text: chunk, size }) used += size @@ -517,6 +520,12 @@ export const ShellTool = Tool.define( }, }), ), + Effect.ensuring( + Effect.sync(() => { + processing = false + drained++ + }), + ), ) } } @@ -526,7 +535,14 @@ export const ShellTool = Tool.define( output: last, description: input.description, }, - }) + }).pipe( + Effect.ensuring( + Effect.sync(() => { + processing = false + drained++ + }), + ), + ) }), ) @@ -554,6 +570,26 @@ export const ShellTool = Tool.define( yield* handle.kill({ forceKillAfter: "3 seconds" }).pipe(Effect.orDie) } + if (exit.kind === "exit") { + // The process has exited, but output it already wrote may still be buffered + // in the OS pipe / Node streams. Join the reader so it drains to EOF: for a + // normal command this returns the instant the streams end (full output, no + // fixed delay). A detached child can hold the pipe open forever with no + // further output, so also stop once the reader has produced nothing for a + // full idle window. The window resets after every completely processed + // chunk (`drained`) and cannot settle while a chunk is still being written + // or reported (`processing`), so only an exited process with an idle pipe + // hits the bound. + const settle = Effect.gen(function* () { + let seen = -1 + while (processing || drained !== seen) { + seen = drained + yield* Effect.sleep("500 millis") + } + }) + yield* Effect.raceAll([Fiber.join(output).pipe(Effect.ignore), settle]) + } + return exit.kind === "exit" ? exit.code : null }), ).pipe(Effect.orDie) diff --git a/packages/opencode/test/tool/shell.test.ts b/packages/opencode/test/tool/shell.test.ts index ddaa5c2ec7b1..4706a54a70ab 100644 --- a/packages/opencode/test/tool/shell.test.ts +++ b/packages/opencode/test/tool/shell.test.ts @@ -1156,6 +1156,48 @@ describe("tool.shell abort", () => { }), ), ) + + it.live( + "returns promptly when a command leaves a detached child holding stdio open", + () => + Effect.gen(function* () { + if (process.platform === "win32") return + const dir = yield* tmpdirScoped() + const pidFile = path.join(dir, "daemon.pid") + // Spawn a long-lived detached child that inherits stdio (keeps the shell's + // stdout pipe open), record its pid, print output, then exit immediately. + const code = + 'const cp=require("node:child_process");const fs=require("node:fs");' + + 'const d=cp.spawn(process.execPath,["-e","setTimeout(()=>{},60000)"],{detached:true,stdio:"inherit"});' + + 'd.unref();fs.writeFileSync(Bun.argv[1],String(d.pid));process.stdout.write("STARTED");process.exit(0)' + const command = `${bin} -e ${squote(code)} ${squote(pidFile)}` + + const start = Date.now() + const result = yield* runIn( + projectRoot, + run({ command, description: "Detached child", timeout: 30_000 }), + ) + const elapsed = Date.now() - start + + // Reap the detached daemon so the pipe closes and it does not leak. + const fsvc = yield* AppFileSystem.Service + const pid = Number(yield* fsvc.readFileString(pidFile).pipe(Effect.catch(() => Effect.succeed("0")))) + if (pid) + yield* Effect.sync(() => { + try { + process.kill(pid) + } catch {} + }) + + expect(result.output).toContain("STARTED") + expect(result.metadata.exit).toBe(0) + expect(result.output).not.toContain("exceeding timeout") + // Returned via the idle drain (~500ms), not by waiting on the daemon's + // inherited pipe (60s) or the command timeout (30s). + expect(elapsed).toBeLessThan(10_000) + }), + 40_000, + ) }) describe("tool.shell truncation", () => { @@ -1227,4 +1269,33 @@ describe("tool.shell truncation", () => { }), ), ) + + it.live("saves all fast output while metadata processing is delayed", () => + runIn( + projectRoot, + Effect.gen(function* () { + const byteCount = 1_000_000 + let delayed = false + const result = yield* run( + { + command: fill("bytes", byteCount), + description: "Generate fast output with delayed metadata", + }, + { + ...ctx, + metadata: (input) => { + const output = (input.metadata as { output?: string })?.output + if (!output || delayed) return Effect.void + delayed = true + return Effect.sleep("1200 millis") + }, + }, + ) + const filepath = (result.metadata as { outputPath?: string }).outputPath + expect(filepath).toBeTruthy() + const saved = yield* (yield* AppFileSystem.Service).readFileString(filepath!) + expect(saved.length).toBe(byteCount) + }), + ), + ) })