Skip to content

Commit 8ededb2

Browse files
committed
Fix stdio client shutdown: cancellation deadlock, orphaned children, spurious kills
Four user-facing bugs in the stdio client's shutdown path, all rooted in the same design problem — the shutdown sequence depended on pipe state and was not protected from cancellation: - Cancelling stdio_client (a client timeout, app shutdown) skipped the entire shutdown sequence: the first await in the finally block re-raised the pending cancellation, so stdin was never closed gracefully and the process tree was never terminated. Control then fell through to anyio's Process.aclose, whose shielded wait only returns once every pipe has closed - and a pipe inherited by a grandchild (npx- and uv-style servers always have one) never closes, so the client deadlocked forever. The shutdown now runs inside a shielded cancel scope with every wait time-bounded, and never relies on a pipe-gated wait. - The grace-period check used process.wait(), which on the asyncio backend resolves only when the process has exited AND its pipes have closed. A well-behaved server that exited instantly on stdin closure but left a background child holding stdout was misclassified as hung, burned the full grace period, and got its tree terminated with a spurious warning. The wait now polls returncode, which reflects process death alone. - Process-tree termination derived the group ID with os.getpgid(pid), which fails once the leader has been reaped even while its descendants are alive — and the fallback then "terminated" the dead leader, leaking every descendant. Since the process is spawned with start_new_session=True, the pgid is by definition the leader's pid; use it directly, and treat ProcessLookupError from killpg as "group already gone" rather than a reason to fall back. - Writing to a dead server's stdin surfaced a raw backend exception (ConnectionResetError inside an exception group) to the caller instead of a clean closed-stream signal. stdin_writer now treats BrokenResourceError and ConnectionError like ClosedResourceError. Windows fixes, by analysis (CI must validate): FallbackProcess.wait() ran Popen.wait() in a non-cancellable worker thread, so the timeout around the grace period could never fire and shutdown could hang indefinitely — it now polls cancellably, and returncode reflects death without requiring a wait. terminate_windows_process_tree dropped its timeout_seconds parameter, which was documented but never used (Job Object termination is immediate; the docstring now says so). Cleanup in the same files: the escalation now lives in one function with two named timeouts instead of three hardcoded 2.0s; the Job Object is tracked in a WeakKeyDictionary instead of a monkey-patched private attribute on anyio's Process; the deprecated terminate_windows_process is removed (migration.md updated); the always-true CREATE_NO_WINDOW hasattr dance and a retry path that double-spawned on spawn failure are gone; the client's JSON parse error handler catches ValueError instead of Exception.
1 parent 8a6abc0 commit 8ededb2

4 files changed

Lines changed: 266 additions & 217 deletions

File tree

docs/migration.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,14 @@ The `headers`, `timeout`, `sse_read_timeout`, and `auth` parameters have been re
105105

106106
Note: `sse_client` retains its `headers`, `timeout`, `sse_read_timeout`, and `auth` parameters — only the streamable HTTP transport changed.
107107

108+
### `terminate_windows_process` removed
109+
110+
The deprecated `mcp.os.win32.utilities.terminate_windows_process` function has been
111+
removed. Process termination is handled internally by the `stdio_client` context
112+
manager; there is no replacement API. The Windows tree-termination helper
113+
`terminate_windows_process_tree` no longer accepts a `timeout_seconds` argument —
114+
the value was never used (Job Object termination is immediate).
115+
108116
### Removed type aliases and classes
109117

110118
The following deprecated type aliases and classes have been removed from `mcp.types`:

src/mcp/client/stdio.py

Lines changed: 112 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
import anyio
99
import anyio.lowlevel
10-
from anyio.abc import Process
10+
from anyio.abc import AsyncResource, Process
1111
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
1212
from anyio.streams.text import TextReceiveStream
1313
from pydantic import BaseModel, Field
@@ -16,6 +16,7 @@
1616
from mcp.os.posix.utilities import terminate_posix_process_tree
1717
from mcp.os.win32.utilities import (
1818
FallbackProcess,
19+
close_process_job,
1920
create_windows_process,
2021
get_windows_executable_command,
2122
terminate_windows_process_tree,
@@ -44,9 +45,18 @@
4445
else ["HOME", "LOGNAME", "PATH", "SHELL", "TERM", "USER"]
4546
)
4647

47-
# Timeout for process termination before falling back to force kill
48+
# How long the server gets to exit on its own after its stdin is closed, before its
49+
# process tree is terminated.
4850
PROCESS_TERMINATION_TIMEOUT = 2.0
4951

52+
# How long the process tree gets to die after a graceful termination request
53+
# (SIGTERM on POSIX) before it is force-killed. Windows tree termination is an
54+
# immediate hard kill, so this only stretches the POSIX path.
55+
FORCE_KILL_TIMEOUT = 2.0
56+
57+
# How often to poll for process death while waiting out the grace period.
58+
_EXIT_POLL_INTERVAL = 0.01
59+
5060

5161
def get_default_environment() -> dict[str, str]:
5262
"""Returns a default environment object including only environment variables deemed
@@ -128,13 +138,10 @@ async def stdio_client(server: StdioServerParameters, errlog: TextIO = sys.stder
128138
)
129139
except OSError:
130140
# Clean up streams if process creation fails
131-
await read_stream.aclose()
132-
await write_stream.aclose()
133-
await read_stream_writer.aclose()
134-
await write_stream_reader.aclose()
141+
await _aclose_all(read_stream, write_stream, read_stream_writer, write_stream_reader)
135142
raise
136143

137-
async def stdout_reader():
144+
async def stdout_reader() -> None:
138145
assert process.stdout, "Opened process is missing stdout"
139146

140147
try:
@@ -151,17 +158,20 @@ async def stdout_reader():
151158
for line in lines:
152159
try:
153160
message = types.jsonrpc_message_adapter.validate_json(line, by_name=False)
154-
except Exception as exc: # pragma: no cover
161+
except ValueError as exc:
155162
logger.exception("Failed to parse JSONRPC message from server")
156163
await read_stream_writer.send(exc)
157164
continue
158165

159166
session_message = SessionMessage(message)
160167
await read_stream_writer.send(session_message)
161-
except anyio.ClosedResourceError: # pragma: lax no cover
168+
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
169+
# ClosedResourceError: our own shutdown closed the writer. BrokenResourceError:
170+
# the caller exited with a message still undelivered, so closing the read
171+
# stream broke this task's blocked send. Both just mean shutdown is underway.
162172
await anyio.lowlevel.checkpoint()
163173

164-
async def stdin_writer():
174+
async def stdin_writer() -> None:
165175
assert process.stdin, "Opened process is missing stdin"
166176

167177
try:
@@ -174,41 +184,96 @@ async def stdin_writer():
174184
errors=server.encoding_error_handler,
175185
)
176186
)
177-
except anyio.ClosedResourceError: # pragma: no cover
187+
except (anyio.ClosedResourceError, anyio.BrokenResourceError, ConnectionError):
188+
# The server stopped reading — its process died or closed stdin. The exact
189+
# exception depends on platform and backend; all of them just mean the pipe
190+
# is gone. The read side reports the closure to the caller.
178191
await anyio.lowlevel.checkpoint()
179192

180-
async with anyio.create_task_group() as tg, process:
193+
async with anyio.create_task_group() as tg:
181194
tg.start_soon(stdout_reader)
182195
tg.start_soon(stdin_writer)
183196
try:
184197
yield read_stream, write_stream
185198
finally:
186-
# MCP spec: stdio shutdown sequence
187-
# 1. Close input stream to server
188-
# 2. Wait for server to exit, or send SIGTERM if it doesn't exit in time
189-
# 3. Send SIGKILL if still not exited
190-
if process.stdin: # pragma: no branch
191-
try:
192-
await process.stdin.aclose()
193-
except Exception: # pragma: no cover
194-
# stdin might already be closed, which is fine
195-
pass
196-
197-
try:
198-
# Give the process time to exit gracefully after stdin closes
199-
with anyio.fail_after(PROCESS_TERMINATION_TIMEOUT):
200-
await process.wait()
201-
except TimeoutError:
202-
# Process didn't exit from stdin closure, use platform-specific termination
203-
# which handles SIGTERM -> SIGKILL escalation
204-
await _terminate_process_tree(process)
205-
except ProcessLookupError: # pragma: no cover
206-
# Process already exited, which is fine
207-
pass
208-
await read_stream.aclose()
209-
await write_stream.aclose()
210-
await read_stream_writer.aclose()
211-
await write_stream_reader.aclose()
199+
# The shutdown sequence must run to completion even when the caller is
200+
# being cancelled — a cancellation that skipped it would leak the server
201+
# process (and its children) and could block forever on the way out.
202+
# Every wait inside the shield is time-bounded. The shield holds against
203+
# anyio-level cancellation and one native task.cancel(); a second native
204+
# cancel delivered mid-cleanup can still abort it — there is no
205+
# backend-neutral way to refuse repeated native cancellation.
206+
with anyio.CancelScope(shield=True):
207+
await _shutdown_process_tree(process)
208+
await _aclose_all(read_stream, write_stream, read_stream_writer, write_stream_reader)
209+
210+
211+
async def _aclose_all(*streams: AsyncResource) -> None:
212+
"""Close every given stream."""
213+
for stream in streams:
214+
await stream.aclose()
215+
216+
217+
async def _shutdown_process_tree(process: Process | FallbackProcess) -> None:
218+
"""Shut the server process down per the MCP spec stdio sequence.
219+
220+
1. Close the server's stdin so it can exit on its own.
221+
2. Give it ``PROCESS_TERMINATION_TIMEOUT`` seconds to exit.
222+
3. Otherwise terminate its whole process tree (SIGTERM then SIGKILL on POSIX,
223+
Job Object hard kill on Windows).
224+
"""
225+
if process.stdin: # pragma: no branch
226+
try:
227+
await process.stdin.aclose()
228+
except (OSError, anyio.BrokenResourceError, anyio.ClosedResourceError):
229+
# stdin is already closed or the pipe is already gone, which is fine
230+
await anyio.lowlevel.checkpoint()
231+
232+
# The await is hoisted out of the `if` so no cancel-unwind from the timeout's
233+
# internal scope traverses the frame at the branching line: Python 3.11's tracer
234+
# loses the branch arc at a throw()-traversed suspension point (python/cpython#106749).
235+
exited = await _wait_for_process_exit(process, PROCESS_TERMINATION_TIMEOUT)
236+
if not exited:
237+
# Process didn't exit from stdin closure; use platform-specific termination,
238+
# which kills the entire process tree, not just the spawned process.
239+
await _terminate_process_tree(process)
240+
# Wait (bounded) for the kill to be observed by the event loop: on asyncio,
241+
# ``returncode`` flips only once the child watcher's callback has been
242+
# delivered, and that delivery is also what lets the subprocess transport
243+
# close instead of leaking a ResourceWarning into whatever runs next.
244+
# Kill-death is prompt, so this wait normally takes one poll interval.
245+
await _wait_for_process_exit(process, FORCE_KILL_TIMEOUT)
246+
247+
# On Windows, drop the process's Job Object handle now: the job is configured to
248+
# kill its remaining members when the handle closes, so closing it here makes
249+
# that reaping deterministic instead of GC-timed. (POSIX deliberately leaves a
250+
# well-behaved server's surviving children alive; no-op there.)
251+
close_process_job(process)
252+
253+
# The process is dead, but its stdout pipe can still be held open by something
254+
# that inherited it (an orphaned grandchild, say), in which case the reader task
255+
# would never see EOF. Closing our wrapper poisons the Python-level reader so the
256+
# reader task finishes either way; the OS-level pipe end itself lives until the
257+
# subprocess transport is closed.
258+
if process.stdout: # pragma: no branch
259+
await process.stdout.aclose()
260+
261+
262+
async def _wait_for_process_exit(process: Process | FallbackProcess, timeout: float) -> bool:
263+
"""Wait for the process itself to die, returning whether it did within ``timeout``.
264+
265+
Deliberately does not use ``process.wait()``: on the asyncio backend that resolves
266+
only once the process has exited *and* every one of its pipes has closed — and
267+
pipes are inherited by the server's own children, so a well-behaved server that
268+
exits instantly but leaves a background child alive would be misclassified as
269+
hung and get its whole tree terminated. ``returncode`` reflects process death
270+
alone.
271+
"""
272+
with anyio.move_on_after(timeout):
273+
while process.returncode is None:
274+
await anyio.sleep(_EXIT_POLL_INTERVAL)
275+
return True
276+
return False
212277

213278

214279
def _get_executable_command(command: str) -> str:
@@ -232,39 +297,33 @@ async def _create_platform_compatible_process(
232297
env: dict[str, str] | None = None,
233298
errlog: TextIO = sys.stderr,
234299
cwd: Path | str | None = None,
235-
):
300+
) -> Process | FallbackProcess:
236301
"""Creates a subprocess in a platform-compatible way.
237302
238303
Unix: Creates process in a new session/process group for killpg support
239304
Windows: Creates process in a Job Object for reliable child termination
240305
"""
241306
if sys.platform == "win32": # pragma: no cover
242-
process = await create_windows_process(command, args, env, errlog, cwd)
307+
return await create_windows_process(command, args, env, errlog, cwd)
243308
else: # pragma: lax no cover
244-
process = await anyio.open_process(
309+
return await anyio.open_process(
245310
[command, *args],
246311
env=env,
247312
stderr=errlog,
248313
cwd=cwd,
249314
start_new_session=True,
250315
)
251316

252-
return process
253-
254317

255-
async def _terminate_process_tree(process: Process | FallbackProcess, timeout_seconds: float = 2.0) -> None:
318+
async def _terminate_process_tree(process: Process | FallbackProcess) -> None:
256319
"""Terminate a process and all its children using platform-specific methods.
257320
258-
Unix: Uses os.killpg() for atomic process group termination
259-
Windows: Uses Job Objects via pywin32 for reliable child process cleanup
260-
261-
Args:
262-
process: The process to terminate
263-
timeout_seconds: Timeout in seconds before force killing (default: 2.0)
321+
Unix: SIGTERM to the process group, escalating to SIGKILL after
322+
``FORCE_KILL_TIMEOUT``. Windows: immediate Job Object termination.
264323
"""
265324
if sys.platform == "win32": # pragma: no cover
266-
await terminate_windows_process_tree(process, timeout_seconds)
325+
await terminate_windows_process_tree(process)
267326
else: # pragma: lax no cover
268-
# FallbackProcess should only be used for Windows compatibility
327+
# Windows-only FallbackProcess never reaches the POSIX path.
269328
assert isinstance(process, Process)
270-
await terminate_posix_process_tree(process, timeout_seconds)
329+
await terminate_posix_process_tree(process, FORCE_KILL_TIMEOUT)

src/mcp/os/posix/utilities.py

Lines changed: 57 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -9,49 +9,72 @@
99

1010
logger = logging.getLogger(__name__)
1111

12+
# How often to probe for surviving process-group members between SIGTERM and SIGKILL.
13+
_GROUP_POLL_INTERVAL = 0.01
14+
1215

1316
async def terminate_posix_process_tree(process: Process, timeout_seconds: float = 2.0) -> None:
14-
"""Terminate a process and all its children on POSIX systems.
17+
"""Terminate a process and all its descendants on POSIX systems.
18+
19+
The process was spawned with ``start_new_session=True``, so it leads its own
20+
process group and its pgid equals its pid. ``os.killpg`` on that group reaches
21+
every descendant in one atomic call — including descendants whose parent (even
22+
the group leader itself) has already exited, which a walk of the process tree
23+
would miss.
1524
16-
Uses os.killpg() for atomic process group termination.
25+
Sends SIGTERM to the group, waits up to ``timeout_seconds`` for the group to
26+
disappear, then SIGKILLs whatever remains.
1727
18-
Args:
19-
process: The process to terminate
20-
timeout_seconds: Timeout in seconds before force killing (default: 2.0)
28+
Descendants that move themselves into a new session or process group
29+
(daemonizers) escape a group kill by design.
2130
"""
22-
pid = getattr(process, "pid", None) or getattr(getattr(process, "popen", None), "pid", None)
23-
if not pid:
24-
# No PID means there's no process to terminate - it either never started,
25-
# already exited, or we have an invalid process object
26-
return
31+
# start_new_session=True at spawn makes the leader's pid the pgid; do not ask the
32+
# OS via getpgid(), which fails with ProcessLookupError once the leader has been
33+
# reaped even while other group members are still alive.
34+
pgid = process.pid
2735

2836
try:
29-
pgid = os.getpgid(pid)
3037
os.killpg(pgid, signal.SIGTERM)
31-
32-
with anyio.move_on_after(timeout_seconds):
33-
while True:
34-
try:
35-
# Check if process group still exists (signal 0 = check only)
36-
os.killpg(pgid, 0)
37-
await anyio.sleep(0.1)
38-
except ProcessLookupError:
39-
return
40-
38+
except ProcessLookupError:
39+
# The entire group is already gone; nothing to terminate.
40+
return
41+
except PermissionError:
42+
# kill(2): EPERM means permission was denied for *every* remaining member —
43+
# none of the group is ours to signal. Try the leader directly anyway, the one
44+
# process we spawned, in case it is the unsignalable survivor.
45+
logger.warning("No permission to signal process group %d; terminating the spawned process only", pgid)
4146
try:
42-
os.killpg(pgid, signal.SIGKILL)
43-
except ProcessLookupError:
47+
process.terminate()
48+
except OSError:
49+
# The leader itself is unsignalable or already gone; nothing more to do.
4450
pass
51+
return
4552

46-
except (ProcessLookupError, PermissionError, OSError) as e:
47-
logger.warning(f"Process group termination failed for PID {pid}: {e}, falling back to simple terminate")
48-
try:
49-
process.terminate()
50-
with anyio.fail_after(timeout_seconds):
51-
await process.wait()
52-
except Exception:
53-
logger.warning(f"Process termination failed for PID {pid}, attempting force kill")
53+
with anyio.move_on_after(timeout_seconds):
54+
while True:
5455
try:
55-
process.kill()
56-
except Exception:
57-
logger.exception(f"Failed to kill process {pid}")
56+
# Probe for surviving group members (signal 0 checks without signalling).
57+
# The probe keeps succeeding while unreaped zombies remain in the group,
58+
# so this also waits out reaping rather than racing it.
59+
os.killpg(pgid, 0)
60+
except ProcessLookupError:
61+
return
62+
except PermissionError:
63+
# Someone survives but is no longer ours to signal; keep waiting — it
64+
# may still exit on its own within the timeout.
65+
pass
66+
# Touching returncode reaps the leader on trio (the property calls
67+
# Popen.poll()); without it nothing reaps during this loop and the
68+
# leader's zombie keeps the group alive for the full timeout. On
69+
# asyncio it is a cheap attribute read.
70+
_ = process.returncode
71+
await anyio.sleep(_GROUP_POLL_INTERVAL)
72+
73+
try:
74+
os.killpg(pgid, signal.SIGKILL)
75+
except ProcessLookupError:
76+
# The group died between the last probe and the kill.
77+
pass
78+
except PermissionError:
79+
# Whatever survives is not ours to kill.
80+
pass

0 commit comments

Comments
 (0)