From e611c2fd7c8002a3e1e0fe8316ea8f92618aa00b Mon Sep 17 00:00:00 2001 From: lingyun14 Date: Sat, 23 May 2026 23:32:05 +0800 Subject: [PATCH 1/3] fix(mcp): prevent epoll busy-wait caused by leaked SSE exit stack --- astrbot/core/agent/mcp_client.py | 152 +++++++++++++++++++++++++------ 1 file changed, 126 insertions(+), 26 deletions(-) diff --git a/astrbot/core/agent/mcp_client.py b/astrbot/core/agent/mcp_client.py index b75999ea65..a5a1598027 100644 --- a/astrbot/core/agent/mcp_client.py +++ b/astrbot/core/agent/mcp_client.py @@ -385,8 +385,22 @@ class MCPClient: def __init__(self) -> None: # Initialize session and client objects self.session: mcp.ClientSession | None = None + + # _connection_task owns the AsyncExitStack and the sse_client/stdio_client + # context manager for the *current* connection. Cleaning up is done by + # cancelling this task: because the task itself entered the anyio + # create_task_group cancel scope, anyio's _host_task check passes and the + # scope exits cleanly in its own task context, avoiding the + # RuntimeError: Attempted to exit cancel scope in a different task + # that occurred when aclose() was called from a different task (or from the + # GC finalizer). Old connection tasks are kept in _old_connection_tasks + # until they finish after being cancelled. + self._connection_task: asyncio.Task | None = None + self._old_connection_tasks: list[asyncio.Task] = [] + + # Kept for internal use inside _run_connection; external code must not + # touch the exit_stack directly. self.exit_stack = AsyncExitStack() - self._old_exit_stacks: list[AsyncExitStack] = [] # Track old stacks for cleanup self.name: str | None = None self.active: bool = True @@ -399,9 +413,42 @@ def __init__(self) -> None: self._server_name: str | None = None self._reconnect_lock = asyncio.Lock() # Lock for thread-safe reconnection self._reconnecting: bool = False # For logging and debugging + self._session_ready = asyncio.Event() # set when session is initialised + + async def _run_connection(self, mcp_server_config: dict, name: str) -> None: + """Own the full lifetime of one MCP connection. + + This coroutine is always run inside a dedicated asyncio.Task + (_connection_task). Because *this task* is the one that enters every + anyio cancel scope (via sse_client / streamablehttp_client), anyio's + _host_task check is always satisfied when the stack is later closed — + either in the task's own finally block (normal path) or when the task + is cancelled from outside (cleanup / reconnect path). + + This avoids the + RuntimeError: Attempted to exit cancel scope in a different task + that previously occurred when aclose() was called from a different task + or from the asyncio async-generator GC finalizer. + """ + self.exit_stack = AsyncExitStack() + try: + await self._do_connect(mcp_server_config, name) + self._session_ready.set() + # Hold the connection open until cancelled. + await asyncio.Event().wait() + finally: + try: + await self.exit_stack.aclose() + except Exception as e: + logger.debug(f"Error closing exit stack for {name}: {e}") async def connect_to_server(self, mcp_server_config: dict, name: str) -> None: - """Connect to MCP server + """Connect to MCP server by spawning a dedicated owner task. + + The owner task (_connection_task) holds the AsyncExitStack and all + anyio cancel scopes for the lifetime of this connection. To disconnect, + cancel _connection_task — the finally block in _run_connection will call + aclose() from within the correct task context. If `url` parameter exists: 1. When transport is specified as `streamable_http`, use Streamable HTTP connection. @@ -412,10 +459,48 @@ async def connect_to_server(self, mcp_server_config: dict, name: str) -> None: mcp_server_config (dict): Configuration for the MCP server. See https://modelcontextprotocol.io/quickstart/server """ - # Store config for reconnection self._mcp_server_config = mcp_server_config self._server_name = name + self._session_ready = asyncio.Event() + self._connection_task = asyncio.create_task( + self._run_connection(mcp_server_config, name), + name=f"mcp-conn:{name}", + ) + + # Wait until the session is initialised (or the task fails). + # We race two futures: the connection task itself (signals failure) and + # a waiter for _session_ready (signals success). The waiter must be + # cancelled after asyncio.wait returns to avoid a task leak. + ready_waiter = asyncio.ensure_future(self._session_ready.wait()) + try: + done, _ = await asyncio.wait( + {self._connection_task, ready_waiter}, + return_when=asyncio.FIRST_COMPLETED, + ) + except asyncio.CancelledError: + # Caller was cancelled while waiting — tear down the connection task. + # cancel() is asynchronous; the task will not finish until the next + # event-loop iteration, so we track it in _old_connection_tasks so + # that cleanup() can await it later. + if not self._connection_task.done(): + self._cancel_connection_task(self._connection_task) + self._connection_task = None + raise + finally: + # Always clean up ready_waiter regardless of how we exit. + if not ready_waiter.done(): + ready_waiter.cancel() + + if self._connection_task in done: + # Task finished before session was ready — it raised an exception. + exc = self._connection_task.exception() + if exc: + raise exc + raise Exception(f"MCP connection task for {name} exited unexpectedly") + + async def _do_connect(self, mcp_server_config: dict, name: str) -> None: + """Internal: perform the actual connection inside _run_connection's task.""" cfg = _prepare_config(mcp_server_config.copy()) def logging_callback( @@ -533,16 +618,27 @@ async def list_tools_and_save(self) -> mcp.ListToolsResult: self.tools = response.tools return response + def _cancel_connection_task(self, task: asyncio.Task) -> None: + """Cancel a connection owner task and track it until it finishes.""" + if task.done(): + return + task.cancel() + self._old_connection_tasks.append(task) + async def _reconnect(self) -> None: """Reconnect to the MCP server using the stored configuration. + Cancels the current _connection_task (which owns the exit_stack and all + anyio cancel scopes) and starts a fresh one. Because each connection + task enters and exits its own anyio cancel scope, there is no + cross-task cancel-scope violation and no GC finalizer surprise. + Uses asyncio.Lock to ensure thread-safe reconnection in concurrent environments. Raises: Exception: raised when reconnection fails """ async with self._reconnect_lock: - # Check if already reconnecting (useful for logging) if self._reconnecting: logger.debug( f"MCP Client {self._server_name} is already reconnecting, skipping" @@ -558,17 +654,16 @@ async def _reconnect(self) -> None: f"Attempting to reconnect to MCP server {self._server_name}..." ) - # Save old exit_stack for later cleanup (don't close it now to avoid cancel scope issues) - if self.exit_stack: - self._old_exit_stacks.append(self.exit_stack) - - # Mark old session as invalid + # Cancel the old connection task. Its finally block will call + # exit_stack.aclose() from within the correct task context, so + # anyio cancel scopes are exited cleanly without triggering the + # GC-finalizer busy-spin bug. + if self._connection_task and not self._connection_task.done(): + self._cancel_connection_task(self._connection_task) + self._connection_task = None self.session = None - # Create new exit stack for new connection - self.exit_stack = AsyncExitStack() - - # Reconnect using stored config + # Reconnect — this creates a new _connection_task. await self.connect_to_server(self._mcp_server_config, self._server_name) await self.list_tools_and_save() @@ -633,19 +728,24 @@ async def _call_with_retry(): return await _call_with_retry() async def cleanup(self) -> None: - """Clean up resources including old exit stacks from reconnections""" - # Close current exit stack - try: - await self.exit_stack.aclose() - except Exception as e: - logger.debug(f"Error closing current exit stack: {e}") - - # Don't close old exit stacks as they may be in different task contexts - # They will be garbage collected naturally - # Just clear the list to release references - self._old_exit_stacks.clear() - - # Set running_event first to unblock any waiting tasks + """Clean up resources by cancelling the connection owner task.""" + # Cancel the current connection task. Its finally block calls aclose() + # from the correct task context, so anyio cancel scopes exit cleanly. + if self._connection_task and not self._connection_task.done(): + self._connection_task.cancel() + try: + await self._connection_task + except (asyncio.CancelledError, Exception) as e: + logger.debug(f"Connection task for {self._server_name} finished: {e}") + + # Wait for any old connection tasks from previous reconnects to finish. + if self._old_connection_tasks: + pending = [t for t in self._old_connection_tasks if not t.done()] + if pending: + await asyncio.gather(*pending, return_exceptions=True) + self._old_connection_tasks.clear() + + # Set running_event to unblock any waiting tasks self.running_event.set() From 1bdce3abd620e8658eeded1f01c29ece56e44efa Mon Sep 17 00:00:00 2001 From: lingyun14 Date: Sun, 24 May 2026 00:09:10 +0800 Subject: [PATCH 2/3] fix(mcp): address review feedback on owner task connection lifecycle --- astrbot/core/agent/mcp_client.py | 76 ++++++++++++++++---------------- 1 file changed, 37 insertions(+), 39 deletions(-) diff --git a/astrbot/core/agent/mcp_client.py b/astrbot/core/agent/mcp_client.py index a5a1598027..2eb87765da 100644 --- a/astrbot/core/agent/mcp_client.py +++ b/astrbot/core/agent/mcp_client.py @@ -400,7 +400,7 @@ def __init__(self) -> None: # Kept for internal use inside _run_connection; external code must not # touch the exit_stack directly. - self.exit_stack = AsyncExitStack() + self.exit_stack: AsyncExitStack | None = None self.name: str | None = None self.active: bool = True @@ -413,9 +413,13 @@ def __init__(self) -> None: self._server_name: str | None = None self._reconnect_lock = asyncio.Lock() # Lock for thread-safe reconnection self._reconnecting: bool = False # For logging and debugging - self._session_ready = asyncio.Event() # set when session is initialised - async def _run_connection(self, mcp_server_config: dict, name: str) -> None: + async def _run_connection( + self, + mcp_server_config: dict, + name: str, + ready: asyncio.Future, + ) -> None: """Own the full lifetime of one MCP connection. This coroutine is always run inside a dedicated asyncio.Task @@ -430,17 +434,30 @@ async def _run_connection(self, mcp_server_config: dict, name: str) -> None: that previously occurred when aclose() was called from a different task or from the asyncio async-generator GC finalizer. """ - self.exit_stack = AsyncExitStack() + # Capture the stack in a local variable so that if self.exit_stack is + # overwritten by a concurrent _run_connection (during reconnect), this + # task's finally block still closes only the resources it opened. + stack = self.exit_stack = AsyncExitStack() try: - await self._do_connect(mcp_server_config, name) - self._session_ready.set() + try: + await self._do_connect(mcp_server_config, name) + except Exception as exc: + if not ready.done(): + ready.set_exception(exc) + raise + else: + if not ready.done(): + ready.set_result(None) # Hold the connection open until cancelled. await asyncio.Event().wait() finally: try: - await self.exit_stack.aclose() + await stack.aclose() except Exception as e: logger.debug(f"Error closing exit stack for {name}: {e}") + # Ensure the waiter is not left pending if we exit very early. + if not ready.done(): + ready.set_exception(RuntimeError("Connection task exited early")) async def connect_to_server(self, mcp_server_config: dict, name: str) -> None: """Connect to MCP server by spawning a dedicated owner task. @@ -461,43 +478,25 @@ async def connect_to_server(self, mcp_server_config: dict, name: str) -> None: """ self._mcp_server_config = mcp_server_config self._server_name = name - self._session_ready = asyncio.Event() + + ready: asyncio.Future = asyncio.get_running_loop().create_future() self._connection_task = asyncio.create_task( - self._run_connection(mcp_server_config, name), + self._run_connection(mcp_server_config, name, ready), name=f"mcp-conn:{name}", ) - # Wait until the session is initialised (or the task fails). - # We race two futures: the connection task itself (signals failure) and - # a waiter for _session_ready (signals success). The waiter must be - # cancelled after asyncio.wait returns to avoid a task leak. - ready_waiter = asyncio.ensure_future(self._session_ready.wait()) try: - done, _ = await asyncio.wait( - {self._connection_task, ready_waiter}, - return_when=asyncio.FIRST_COMPLETED, - ) + await ready except asyncio.CancelledError: # Caller was cancelled while waiting — tear down the connection task. # cancel() is asynchronous; the task will not finish until the next # event-loop iteration, so we track it in _old_connection_tasks so # that cleanup() can await it later. - if not self._connection_task.done(): + if self._connection_task and not self._connection_task.done(): self._cancel_connection_task(self._connection_task) self._connection_task = None raise - finally: - # Always clean up ready_waiter regardless of how we exit. - if not ready_waiter.done(): - ready_waiter.cancel() - - if self._connection_task in done: - # Task finished before session was ready — it raised an exception. - exc = self._connection_task.exception() - if exc: - raise exc - raise Exception(f"MCP connection task for {name} exited unexpectedly") async def _do_connect(self, mcp_server_config: dict, name: str) -> None: """Internal: perform the actual connection inside _run_connection's task.""" @@ -620,6 +619,9 @@ async def list_tools_and_save(self) -> mcp.ListToolsResult: def _cancel_connection_task(self, task: asyncio.Task) -> None: """Cancel a connection owner task and track it until it finishes.""" + # Prune already-finished tasks to avoid accumulating references over + # many reconnections in a long-running process. + self._old_connection_tasks = [t for t in self._old_connection_tasks if not t.done()] if task.done(): return task.cancel() @@ -729,16 +731,12 @@ async def _call_with_retry(): async def cleanup(self) -> None: """Clean up resources by cancelling the connection owner task.""" - # Cancel the current connection task. Its finally block calls aclose() - # from the correct task context, so anyio cancel scopes exit cleanly. - if self._connection_task and not self._connection_task.done(): - self._connection_task.cancel() - try: - await self._connection_task - except (asyncio.CancelledError, Exception) as e: - logger.debug(f"Connection task for {self._server_name} finished: {e}") + # Cancel current and any old connection tasks via the shared helper so + # all cancellation + tracking behaviour goes through one code path. + if self._connection_task: + self._cancel_connection_task(self._connection_task) + self._connection_task = None - # Wait for any old connection tasks from previous reconnects to finish. if self._old_connection_tasks: pending = [t for t in self._old_connection_tasks if not t.done()] if pending: From 409eb143d5aa4a58973cfbe322be5a7beef5ef8f Mon Sep 17 00:00:00 2001 From: lingyun14 Date: Sun, 24 May 2026 00:30:22 +0800 Subject: [PATCH 3/3] Update mcp_client.py --- astrbot/core/agent/mcp_client.py | 46 +++++++++++++++++++++----------- 1 file changed, 30 insertions(+), 16 deletions(-) diff --git a/astrbot/core/agent/mcp_client.py b/astrbot/core/agent/mcp_client.py index 2eb87765da..cb68814983 100644 --- a/astrbot/core/agent/mcp_client.py +++ b/astrbot/core/agent/mcp_client.py @@ -383,23 +383,15 @@ def _normalize(node: Any) -> Any: class MCPClient: def __init__(self) -> None: - # Initialize session and client objects self.session: mcp.ClientSession | None = None - # _connection_task owns the AsyncExitStack and the sse_client/stdio_client - # context manager for the *current* connection. Cleaning up is done by - # cancelling this task: because the task itself entered the anyio - # create_task_group cancel scope, anyio's _host_task check passes and the - # scope exits cleanly in its own task context, avoiding the + # Each connection runs in its own task so that anyio cancel scopes + # are always exited from the task that entered them, preventing # RuntimeError: Attempted to exit cancel scope in a different task - # that occurred when aclose() was called from a different task (or from the - # GC finalizer). Old connection tasks are kept in _old_connection_tasks - # until they finish after being cancelled. self._connection_task: asyncio.Task | None = None self._old_connection_tasks: list[asyncio.Task] = [] - # Kept for internal use inside _run_connection; external code must not - # touch the exit_stack directly. + # Internal; managed exclusively by _run_connection. self.exit_stack: AsyncExitStack | None = None self.name: str | None = None @@ -408,11 +400,10 @@ def __init__(self) -> None: self.server_errlogs: list[str] = [] self.running_event = asyncio.Event() - # Store connection config for reconnection self._mcp_server_config: dict | None = None self._server_name: str | None = None self._reconnect_lock = asyncio.Lock() # Lock for thread-safe reconnection - self._reconnecting: bool = False # For logging and debugging + self._reconnecting: bool = False async def _run_connection( self, @@ -455,7 +446,11 @@ async def _run_connection( await stack.aclose() except Exception as e: logger.debug(f"Error closing exit stack for {name}: {e}") - # Ensure the waiter is not left pending if we exit very early. + # Clear the instance reference only if it still points to this task's + # stack; a concurrent reconnect may have already replaced it. + if self.exit_stack is stack: + self.exit_stack = None + # Guard against the task exiting before ready was resolved. if not ready.done(): ready.set_exception(RuntimeError("Connection task exited early")) @@ -481,6 +476,12 @@ async def connect_to_server(self, mcp_server_config: dict, name: str) -> None: ready: asyncio.Future = asyncio.get_running_loop().create_future() + # Defensively cancel any existing connection task that was not cleaned + # up before this call (e.g. if connect_to_server is called twice). + if self._connection_task and not self._connection_task.done(): + self._cancel_connection_task(self._connection_task) + self._connection_task = None + self._connection_task = asyncio.create_task( self._run_connection(mcp_server_config, name, ready), name=f"mcp-conn:{name}", @@ -495,11 +496,22 @@ async def connect_to_server(self, mcp_server_config: dict, name: str) -> None: # that cleanup() can await it later. if self._connection_task and not self._connection_task.done(): self._cancel_connection_task(self._connection_task) - self._connection_task = None + self._connection_task = None + raise + except Exception: + # _do_connect raised; the connection task's finally block may still + # be running (e.g. awaiting stack.aclose()). Track it so that + # cleanup() can await it, but do NOT cancel it — we want the + # finally block to finish cleaning up resources naturally. + if self._connection_task and not self._connection_task.done(): + self._old_connection_tasks.append(self._connection_task) + self._connection_task = None raise async def _do_connect(self, mcp_server_config: dict, name: str) -> None: """Internal: perform the actual connection inside _run_connection's task.""" + # exit_stack is always set by _run_connection before _do_connect is called. + assert self.exit_stack is not None cfg = _prepare_config(mcp_server_config.copy()) def logging_callback( @@ -621,7 +633,9 @@ def _cancel_connection_task(self, task: asyncio.Task) -> None: """Cancel a connection owner task and track it until it finishes.""" # Prune already-finished tasks to avoid accumulating references over # many reconnections in a long-running process. - self._old_connection_tasks = [t for t in self._old_connection_tasks if not t.done()] + self._old_connection_tasks = [ + t for t in self._old_connection_tasks if not t.done() + ] if task.done(): return task.cancel()