Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 42 additions & 60 deletions packages/core/src/cross-spawn-spawner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -263,29 +263,33 @@ export const make = Effect.gen(function* () {
}

const spawn = (command: ChildProcess.StandardCommand, opts: NodeChildProcess.SpawnOptions) =>
Effect.callback<readonly [NodeChildProcess.ChildProcess, ExitSignal], PlatformError.PlatformError>((resume) => {
const signal = Deferred.makeUnsafe<readonly [code: number | null, signal: NodeJS.Signals | null]>()
const proc = launch(command.command, command.args, opts)
let end = false
let exit: readonly [code: number | null, signal: NodeJS.Signals | null] | undefined
proc.on("error", (err) => {
resume(Effect.fail(toPlatformError("spawn", err, command)))
})
proc.on("exit", (...args) => {
exit = args
})
proc.on("close", (...args) => {
if (end) return
end = true
Deferred.doneUnsafe(signal, Exit.succeed(exit ?? args))
})
proc.on("spawn", () => {
resume(Effect.succeed([proc, signal]))
})
return Effect.sync(() => {
proc.kill("SIGTERM")
})
})
Effect.callback<readonly [NodeChildProcess.ChildProcess, ExitSignal, ExitSignal], PlatformError.PlatformError>(
(resume) => {
const exited = Deferred.makeUnsafe<readonly [code: number | null, signal: NodeJS.Signals | null]>()
const closed = Deferred.makeUnsafe<readonly [code: number | null, signal: NodeJS.Signals | null]>()
const proc = launch(command.command, command.args, opts)
let exit: readonly [code: number | null, signal: NodeJS.Signals | null] | undefined
proc.on("error", (err) => {
resume(Effect.fail(toPlatformError("spawn", err, command)))
})
// A detached descendant can hold stdio open after this process exits. Report its
// exit promptly, while keeping "close" for kill/escalation and failed spawns.
proc.on("exit", (...args) => {
exit = args
Deferred.doneUnsafe(exited, Exit.succeed(args))
})
proc.on("close", (...args) => {
Deferred.doneUnsafe(closed, Exit.succeed(exit ?? args))
Deferred.doneUnsafe(exited, Exit.succeed(exit ?? args))
})
proc.on("spawn", () => {
resume(Effect.succeed([proc, exited, closed]))
})
return Effect.sync(() => {
proc.kill("SIGTERM")
})
},
)

const killGroup = (
command: ChildProcess.StandardCommand,
Expand Down Expand Up @@ -319,27 +323,6 @@ export const make = Effect.gen(function* () {
return Effect.fail(toPlatformError("kill", new Error("Failed to kill child process"), command))
})

const timeout =
(
proc: NodeChildProcess.ChildProcess,
command: ChildProcess.StandardCommand,
opts: ChildProcess.KillOptions | undefined,
) =>
<A, E, R>(
f: (
command: ChildProcess.StandardCommand,
proc: NodeChildProcess.ChildProcess,
signal: NodeJS.Signals,
) => Effect.Effect<A, E, R>,
) => {
const signal = opts?.killSignal ?? "SIGTERM"
if (Predicate.isUndefined(opts?.forceKillAfter)) return f(command, proc, signal)
return Effect.timeoutOrElse(f(command, proc, signal), {
duration: opts.forceKillAfter,
orElse: () => f(command, proc, "SIGKILL"),
})
}

const source = (handle: ChildProcessHandle, from: ChildProcess.PipeFromOption | undefined) => {
const opt = from ?? "stdout"
switch (opt) {
Expand Down Expand Up @@ -368,7 +351,7 @@ export const make = Effect.gen(function* () {
const extra = fds(command.options)
const dir = yield* cwd(command.options)

const [proc, signal] = yield* Effect.acquireRelease(
const [proc, exited, closed] = yield* Effect.acquireRelease(
spawn(command, {
cwd: dir,
env: env(command.options),
Expand All @@ -377,25 +360,24 @@ export const make = Effect.gen(function* () {
shell: command.options.shell,
windowsHide: process.platform === "win32",
}),
Effect.fnUntraced(function* ([proc, signal]) {
const done = yield* Deferred.isDone(signal)
const kill = timeout(proc, command, command.options)
if (done) {
const [code] = yield* Deferred.await(signal)
if (process.platform === "win32") return yield* Effect.void
if (code !== 0 && Predicate.isNotNull(code)) return yield* Effect.ignore(kill(killGroup))
return yield* Effect.void
}
Effect.fnUntraced(function* ([proc, exited, closed]) {
const done = yield* Deferred.isDone(exited)
const send = (s: NodeJS.Signals) =>
Effect.catch(killGroup(command, proc, s), () => killOne(command, proc, s))
const sig = command.options.killSignal ?? "SIGTERM"
const attempt = send(sig).pipe(Effect.andThen(Deferred.await(signal)), Effect.asVoid)
const attempt = send(sig).pipe(Effect.andThen(Deferred.await(closed)), Effect.asVoid)
const escalated = command.options.forceKillAfter
? Effect.timeoutOrElse(attempt, {
duration: command.options.forceKillAfter,
orElse: () => send("SIGKILL").pipe(Effect.andThen(Deferred.await(signal)), Effect.asVoid),
orElse: () => send("SIGKILL").pipe(Effect.andThen(Deferred.await(closed)), Effect.asVoid),
})
: attempt
if (done) {
const [code] = yield* Deferred.await(exited)
if (process.platform === "win32") return yield* Effect.void
if (code !== 0 && Predicate.isNotNull(code)) return yield* Effect.ignore(escalated)
return yield* Effect.void
}
return yield* Effect.ignore(escalated)
}),
)
Expand All @@ -411,8 +393,8 @@ export const make = Effect.gen(function* () {
all: out.all,
getInputFd: fd.getInputFd,
getOutputFd: fd.getOutputFd,
isRunning: Effect.map(Deferred.isDone(signal), (done) => !done),
exitCode: Effect.flatMap(Deferred.await(signal), ([code, signal]) => {
isRunning: Effect.map(Deferred.isDone(exited), (done) => !done),
exitCode: Effect.flatMap(Deferred.await(exited), ([code, signal]) => {
if (Predicate.isNotNull(code)) return Effect.succeed(ExitCode(code))
return Effect.fail(
toPlatformError(
Expand All @@ -426,11 +408,11 @@ export const make = Effect.gen(function* () {
const sig = opts?.killSignal ?? "SIGTERM"
const send = (s: NodeJS.Signals) =>
Effect.catch(killGroup(command, proc, s), () => killOne(command, proc, s))
const attempt = send(sig).pipe(Effect.andThen(Deferred.await(signal)), Effect.asVoid)
const attempt = send(sig).pipe(Effect.andThen(Deferred.await(closed)), Effect.asVoid)
if (!opts?.forceKillAfter) return attempt
return Effect.timeoutOrElse(attempt, {
duration: opts.forceKillAfter,
orElse: () => send("SIGKILL").pipe(Effect.andThen(Deferred.await(signal)), Effect.asVoid),
orElse: () => send("SIGKILL").pipe(Effect.andThen(Deferred.await(closed)), Effect.asVoid),
})
},
unref: Effect.sync(() => {
Expand Down
122 changes: 122 additions & 0 deletions packages/core/test/effect/cross-spawn-spawner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,26 @@ async function gone(pid: number, timeout = 5_000) {
return !alive(pid)
}

async function readPid(file: string, timeout = 5_000) {
const end = Date.now() + timeout
while (Date.now() < end) {
const value = await fs.readFile(file, "utf-8").catch(() => undefined)
if (value) return Number(value)
await new Promise((resolve) => setTimeout(resolve, 50))
}
throw new Error(`Process did not write its pid to ${file}`)
}

function stubbornDescendant(pidFile: string, parent: string, opts?: ChildProcess.CommandOptions) {
const code = [
'const cp = require("node:child_process")',
'const fs = require("node:fs")',
'cp.spawn(process.execPath, ["-e", "const fs = require(\\"node:fs\\"); process.on(\\"SIGTERM\\", () => {}); fs.writeFileSync(process.argv[1], String(process.pid)); setInterval(() => {}, 1000)", process.argv[1]], { stdio: "inherit" })',
parent,
].join("\n")
return ChildProcess.make(process.execPath, ["-e", code, pidFile], opts)
}

describe("cross-spawn spawner", () => {
describe("basic spawning", () => {
fx.effect(
Expand Down Expand Up @@ -275,6 +295,60 @@ describe("cross-spawn spawner", () => {
}),
)

fx.live(
"forceKillAfter escalates when the parent exits before a stubborn descendant",
Effect.gen(function* () {
if (process.platform === "win32") return

const tmp = yield* Effect.acquireRelease(
Effect.promise(() => tmpdir()),
(tmp) => Effect.promise(() => tmp[Symbol.asyncDispose]()),
)
const pidFile = path.join(tmp.path, "stubborn-child.pid")
const handle = yield* stubbornDescendant(pidFile, "setInterval(() => {}, 1000)")
const pid = yield* Effect.promise(() => readPid(pidFile))

yield* handle.kill({ forceKillAfter: 100 }).pipe(Effect.ignore)
const terminated = yield* Effect.promise(() => gone(pid, 1_000))
if (!terminated) {
yield* Effect.sync(() => process.kill(pid, "SIGKILL"))
}
expect(terminated).toBe(true)
}),
10_000,
)

fx.live(
"scope cleanup escalates after a failed parent leaves a stubborn descendant",
Effect.gen(function* () {
if (process.platform === "win32") return

const tmp = yield* Effect.acquireRelease(
Effect.promise(() => tmpdir()),
(tmp) => Effect.promise(() => tmp[Symbol.asyncDispose]()),
)
const pidFile = path.join(tmp.path, "failed-parent-child.pid")
const pid = yield* Effect.scoped(
Effect.gen(function* () {
const handle = yield* stubbornDescendant(
pidFile,
'const ready = setInterval(() => { if (fs.existsSync(process.argv[1])) { clearInterval(ready); process.exit(42) } }, 10)',
{ forceKillAfter: 100 },
)
expect(yield* handle.exitCode).toBe(ChildProcessSpawner.ExitCode(42))
return yield* Effect.promise(() => readPid(pidFile))
}),
)

const terminated = yield* Effect.promise(() => gone(pid, 1_000))
if (!terminated) {
yield* Effect.sync(() => process.kill(pid, "SIGKILL"))
}
expect(terminated).toBe(true)
}),
10_000,
)

fx.effect(
"isRunning reflects process state",
Effect.gen(function* () {
Expand All @@ -286,6 +360,54 @@ describe("cross-spawn spawner", () => {
)
})

describe("detached children (issue #24731)", () => {
fx.live(
"exitCode resolves when the main process exits even if a detached child keeps stdio open",
Effect.gen(function* () {
const tmp = yield* Effect.acquireRelease(
Effect.promise(() => tmpdir()),
(tmp) => Effect.promise(() => tmp[Symbol.asyncDispose]()),
)
const pidFile = path.join(tmp.path, "daemon.pid")

// Mimics playwright-cli / a backgrounded web server: spawn a long-lived
// detached child that inherits stdio (keeping the parent's stdout pipe's
// write end open), then exit the main process immediately.
const code = [
'const cp = require("node:child_process")',
'const fs = require("node:fs")',
'const daemon = cp.spawn(process.execPath, ["-e", "setInterval(() => {}, 1000)"], { detached: true, stdio: "inherit" })',
"daemon.unref()",
"fs.writeFileSync(process.argv[1], String(daemon.pid))",
'process.stdout.write("started")',
"process.exit(0)",
].join("\n")

const handle = yield* ChildProcess.make(process.execPath, ["-e", code, pidFile])

const result = yield* Effect.raceAll([
handle.exitCode.pipe(Effect.map((exit) => ({ kind: "exit" as const, code: exit }))),
Effect.sleep("5 seconds").pipe(Effect.as({ kind: "timeout" as const, code: null })),
])

// Reap the detached daemon so it does not leak and so the spawner's
// "close" can fire during scope teardown.
const pid = Number(yield* Effect.promise(() => fs.readFile(pidFile, "utf-8").catch(() => "0")))
if (pid) {
yield* Effect.sync(() => {
try {
process.kill(pid)
} catch {}
})
}

expect(result.kind).toBe("exit")
expect(result.code).toBe(ChildProcessSpawner.ExitCode(0))
}),
15_000,
)
})

describe("error handling", () => {
fx.effect(
"fails for invalid command",
Expand Down
42 changes: 39 additions & 3 deletions packages/opencode/src/tool/shell.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Effect, Stream } from "effect"
import { Effect, Fiber, Stream } from "effect"
import os from "os"
import { createWriteStream } from "node:fs"
import * as Tool from "./tool"
Expand Down Expand Up @@ -438,6 +438,8 @@ export const ShellTool = Tool.define(
let last = ""
const list: Chunk[] = []
let used = 0
let drained = 0
let processing = false
let file = ""
let sink: ReturnType<typeof createWriteStream> | undefined
let cut = false
Expand Down Expand Up @@ -481,8 +483,9 @@ export const ShellTool = Tool.define(
yield* Effect.addFinalizer(closeSink)
const handle = yield* spawner.spawn(cmd(input.shell, input.command, input.cwd, input.env))

yield* Effect.forkScoped(
const output = yield* Effect.forkScoped(
Stream.runForEach(Stream.decodeText(handle.all), (chunk) => {
processing = true
const size = Buffer.byteLength(chunk, "utf-8")
list.push({ text: chunk, size })
used += size
Expand Down Expand Up @@ -517,6 +520,12 @@ export const ShellTool = Tool.define(
},
}),
),
Effect.ensuring(
Effect.sync(() => {
processing = false
drained++
}),
),
)
}
}
Expand All @@ -526,7 +535,14 @@ export const ShellTool = Tool.define(
output: last,
description: input.description,
},
})
}).pipe(
Effect.ensuring(
Effect.sync(() => {
processing = false
drained++
}),
),
)
}),
)

Expand Down Expand Up @@ -554,6 +570,26 @@ export const ShellTool = Tool.define(
yield* handle.kill({ forceKillAfter: "3 seconds" }).pipe(Effect.orDie)
}

if (exit.kind === "exit") {
// The process has exited, but output it already wrote may still be buffered
// in the OS pipe / Node streams. Join the reader so it drains to EOF: for a
// normal command this returns the instant the streams end (full output, no
// fixed delay). A detached child can hold the pipe open forever with no
// further output, so also stop once the reader has produced nothing for a
// full idle window. The window resets after every completely processed
// chunk (`drained`) and cannot settle while a chunk is still being written
// or reported (`processing`), so only an exited process with an idle pipe
// hits the bound.
const settle = Effect.gen(function* () {
let seen = -1
while (processing || drained !== seen) {
seen = drained
yield* Effect.sleep("500 millis")
}
})
yield* Effect.raceAll([Fiber.join(output).pipe(Effect.ignore), settle])
}

return exit.kind === "exit" ? exit.code : null
}),
).pipe(Effect.orDie)
Expand Down
Loading
Loading