fix(mcp): attempt to fix epoll busy-wait caused by leaked SSE exit stack#8307
fix(mcp): attempt to fix epoll busy-wait caused by leaked SSE exit stack#8307lingyun14beta wants to merge 3 commits into
Conversation
|
Warning Gemini encountered an error creating the review. You can try again by commenting |
|
/gemini review |
There was a problem hiding this comment.
Hey - I've found 2 issues, and left some high level feedback:
- In
connect_to_server, a new_connection_taskis created unconditionally without checking for or cancelling any existing task; consider asserting that_connection_taskisNoneor explicitly cancelling/awaiting the old task to avoid accidental concurrent connections or task leaks ifconnect_to_serveris called twice. - The helper
_cancel_connection_taskis only used in_reconnectbut similar cancellation/awaiting logic is duplicated incleanup; it would be clearer and less error-prone to centralize connection-task teardown (including tracking in_old_connection_tasks) through a single helper so reconnect and cleanup follow the same code path.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `connect_to_server`, a new `_connection_task` is created unconditionally without checking for or cancelling any existing task; consider asserting that `_connection_task` is `None` or explicitly cancelling/awaiting the old task to avoid accidental concurrent connections or task leaks if `connect_to_server` is called twice.
- The helper `_cancel_connection_task` is only used in `_reconnect` but similar cancellation/awaiting logic is duplicated in `cleanup`; it would be clearer and less error-prone to centralize connection-task teardown (including tracking in `_old_connection_tasks`) through a single helper so reconnect and cleanup follow the same code path.
## Individual Comments
### Comment 1
<location path="astrbot/core/agent/mcp_client.py" line_range="471-475" />
<code_context>
+ # We race two futures: the connection task itself (signals failure) and
+ # a waiter for _session_ready (signals success). The waiter must be
+ # cancelled after asyncio.wait returns to avoid a task leak.
+ ready_waiter = asyncio.ensure_future(self._session_ready.wait())
+ try:
+ done, _ = await asyncio.wait(
</code_context>
<issue_to_address>
**suggestion:** Prefer asyncio.create_task over asyncio.ensure_future with coroutines
Because `self._session_ready.wait()` is a coroutine and this is asyncio code, `asyncio.create_task` is the clearer, more idiomatic way to schedule it on the current event loop and avoids the ambiguous "future or coroutine" behavior of `asyncio.ensure_future`.
```suggestion
# Wait until the session is initialised (or the task fails).
# We race two futures: the connection task itself (signals failure) and
# a waiter for _session_ready (signals success). The waiter must be
# cancelled after asyncio.wait returns to avoid a task leak.
ready_waiter = asyncio.create_task(self._session_ready.wait())
```
</issue_to_address>
### Comment 2
<location path="astrbot/core/agent/mcp_client.py" line_range="416" />
<code_context>
self._server_name: str | None = None
self._reconnect_lock = asyncio.Lock() # Lock for thread-safe reconnection
self._reconnecting: bool = False # For logging and debugging
+ self._session_ready = asyncio.Event() # set when session is initialised
+
+ async def _run_connection(self, mcp_server_config: dict, name: str) -> None:
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying the new connection lifecycle by using a per-call Future to signal readiness and centralizing connection-task cancellation/await logic in a single helper.
The new connection lifecycle does add extra moving parts that can be simplified without changing the “owner task holds cancel scopes” property.
Consider these focused changes:
### 1. Remove global `_session_ready` and the task/event race
Right now you have:
- `_run_connection` + `_do_connect`
- `self._session_ready` as an instance attribute, recreated per connection
- `connect_to_server` racing `_connection_task` vs an `asyncio.Event().wait()` future
You can keep `_run_connection` as the single owner task, but signal “initial connect done or failed” via a per-call `Future` passed into `_run_connection`, instead of a shared `Event` and a race.
For example:
```python
class MCPClient:
def __init__(self) -> None:
self.session: mcp.ClientSession | None = None
self._connection_task: asyncio.Task | None = None
self._old_connection_tasks: list[asyncio.Task] = []
self.exit_stack = AsyncExitStack()
# remove: self._session_ready
```
Change `_run_connection` to accept a ready future:
```python
async def _run_connection(
self,
mcp_server_config: dict,
name: str,
ready: asyncio.Future[None],
) -> None:
self.exit_stack = AsyncExitStack()
try:
try:
await self._do_connect(mcp_server_config, name)
except Exception as exc:
if not ready.done():
ready.set_exception(exc)
raise
else:
if not ready.done():
ready.set_result(None)
# Hold the connection open until cancelled.
await asyncio.Event().wait()
finally:
try:
await self.exit_stack.aclose()
except Exception as e:
logger.debug(f"Error closing exit stack for {name}: {e}")
# Ensure the waiter is not left pending if we exit very early
if not ready.done():
ready.set_exception(RuntimeError("Connection task exited early"))
```
Then `connect_to_server` becomes a single-await on that future, no `asyncio.wait`/`ensure_future`/`Event` race:
```python
async def connect_to_server(self, mcp_server_config: dict, name: str) -> None:
self._mcp_server_config = mcp_server_config
self._server_name = name
ready: asyncio.Future[None] = asyncio.get_running_loop().create_future()
self._connection_task = asyncio.create_task(
self._run_connection(mcp_server_config, name, ready),
name=f"mcp-conn:{name}",
)
try:
await ready # either completes or raises from _do_connect/_run_connection
except asyncio.CancelledError:
# Caller cancelled while waiting; tear down the task and re-raise
if self._connection_task and not self._connection_task.done():
self._cancel_connection_task(self._connection_task)
self._connection_task = None
raise
# If we get here, the connection is established and self.session initialized
```
This:
- Eliminates `_session_ready` as mutable shared state.
- Eliminates the extra “ready_waiter” task and `asyncio.wait`/FIRST_COMPLETED logic.
- Keeps the property that `_run_connection` owns the `AsyncExitStack` and cancel scopes.
### 2. Unify cancellation/await logic for `_connection_task`
You already have `_cancel_connection_task` plus similar logic in `cleanup`. To reduce duplication and make task lifetime easier to reason about, reuse the helper in `cleanup`:
```python
def _cancel_connection_task(self, task: asyncio.Task) -> None:
if task.done():
return
task.cancel()
self._old_connection_tasks.append(task)
```
Then:
```python
async def cleanup(self) -> None:
"""Clean up resources by cancelling the connection owner task."""
if self._connection_task:
self._cancel_connection_task(self._connection_task)
self._connection_task = None
if self._old_connection_tasks:
pending = [t for t in self._old_connection_tasks if not t.done()]
if pending:
await asyncio.gather(*pending, return_exceptions=True)
self._old_connection_tasks.clear()
self.running_event.set()
```
This keeps all cancellation + tracking behaviour in one place and makes it clear that every cancelled connection owner task is awaited exactly once.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
There was a problem hiding this comment.
Code Review
This pull request refactors the MCP client to manage connections via a dedicated owner task, ensuring that AsyncExitStack and associated cancel scopes are closed within the correct task context to prevent runtime errors. Review feedback identifies a critical race condition where the shared exit_stack instance variable could be overwritten during reconnection, suggesting the use of a local variable to capture the stack instance. Further improvements include initializing exit_stack to None in the constructor to avoid redundant allocation and pruning finished tasks from the tracking list to prevent memory accumulation in long-running processes.
|
@sourcery-ai review |
|
/gemini review |
There was a problem hiding this comment.
Hey - I've found 4 issues, and left some high level feedback:
- Consider making
exit_stacka private attribute (e.g._exit_stack) or documenting it as internal-only to avoid accidental external access now that its lifecycle is tightly coupled to_run_connection. - In
_run_connection, you might want to log or surface more context when the connection task exits early (e.g. beforereadyis resolved) so operators can distinguish intentional shutdown from unexpected failures.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Consider making `exit_stack` a private attribute (e.g. `_exit_stack`) or documenting it as internal-only to avoid accidental external access now that its lifecycle is tightly coupled to `_run_connection`.
- In `_run_connection`, you might want to log or surface more context when the connection task exits early (e.g. before `ready` is resolved) so operators can distinguish intentional shutdown from unexpected failures.
## Individual Comments
### Comment 1
<location path="astrbot/core/agent/mcp_client.py" line_range="440-441" />
<code_context>
+ # Capture the stack in a local variable so that if self.exit_stack is
+ # overwritten by a concurrent _run_connection (during reconnect), this
+ # task's finally block still closes only the resources it opened.
+ stack = self.exit_stack = AsyncExitStack()
+ try:
+ try:
+ await self._do_connect(mcp_server_config, name)
</code_context>
<issue_to_address>
**suggestion:** Consider avoiding storing the AsyncExitStack on `self` or resetting it to `None` after closure to prevent accidental reuse
After `stack.aclose()` in the `finally` block, `self.exit_stack` still references a closed `AsyncExitStack`. Even though callers are told not to use `exit_stack`, leaving a closed stack on `self` increases the risk of future misuse. Consider either keeping `stack` local (not storing it on `self`) or explicitly setting `self.exit_stack = None` after `stack.aclose()` completes.
Suggested implementation:
```python
# Capture the stack in a local variable so that if self.exit_stack is
# overwritten by a concurrent _run_connection (during reconnect), this
# task's finally block still closes only the resources it opened.
stack = AsyncExitStack()
self.exit_stack = stack
```
```python
finally:
```
```python
await stack.aclose()
# Ensure we don't keep a reference to a closed AsyncExitStack on self,
# but only clear it if it still refers to this task's stack to avoid
# races with a concurrent reconnect.
if self.exit_stack is stack:
self.exit_stack = None
```
If the actual `finally` block does not contain `await stack.aclose()` exactly as shown, adjust the second SEARCH/REPLACE block to match the existing close call (e.g. `await self.exit_stack.aclose()` or `stack.aclose()` without `await`) and append the `if self.exit_stack is stack: self.exit_stack = None` logic immediately after the close call in that block.
</issue_to_address>
### Comment 2
<location path="astrbot/core/agent/mcp_client.py" line_range="484-486" />
<code_context>
+ ready: asyncio.Future = asyncio.get_running_loop().create_future()
+
+ self._connection_task = asyncio.create_task(
+ self._run_connection(mcp_server_config, name, ready),
+ name=f"mcp-conn:{name}",
+ )
+
</code_context>
<issue_to_address>
**issue (bug_risk):** Clarify semantics or enforce that only one live `_connection_task` exists at a time
`connect_to_server` assumes a single logical connection, but nothing prevents it from being called again while a previous `_connection_task` is still running. In that case, `self._connection_task` is overwritten, the old task keeps running, and it’s never tracked in `_old_connection_tasks` (since `_cancel_connection_task` isn’t called). If multiple concurrent connections are not desired, consider either cancelling/awaiting any existing `_connection_task` before starting a new one, or clearly documenting that only one call may be active at a time to avoid orphaned tasks.
</issue_to_address>
### Comment 3
<location path="astrbot/core/agent/mcp_client.py" line_range="740-744" />
<code_context>
+ self._cancel_connection_task(self._connection_task)
+ self._connection_task = None
+
+ if self._old_connection_tasks:
+ pending = [t for t in self._old_connection_tasks if not t.done()]
+ if pending:
+ await asyncio.gather(*pending, return_exceptions=True)
+ self._old_connection_tasks.clear()
+
+ # Set running_event to unblock any waiting tasks
</code_context>
<issue_to_address>
**issue (bug_risk):** Be careful about calling `cleanup` from within a connection task, as it will await on the task it just cancelled
Since `cleanup` cancels `self._connection_task` and then awaits `_old_connection_tasks`, if `cleanup` is ever invoked from inside the currently running `_connection_task` you’ll end up awaiting on the current task, which raises `RuntimeError`. If that call path is possible, consider avoiding adding the current task to `_old_connection_tasks` when `cleanup` is called from it, or skip awaiting any task equal to `asyncio.current_task()`.
</issue_to_address>
### Comment 4
<location path="astrbot/core/agent/mcp_client.py" line_range="417" />
<code_context>
self._reconnect_lock = asyncio.Lock() # Lock for thread-safe reconnection
self._reconnecting: bool = False # For logging and debugging
+ async def _run_connection(
+ self,
+ mcp_server_config: dict,
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying the connection lifecycle by consolidating task/stack ownership into a single connection task with one readiness signal and straightforward cancel-and-await shutdown semantics.
You can keep the “host task owns the stack” invariant while dropping a lot of the extra machinery:
### 1. Collapse `_run_connection` / `_do_connect` / `ready` into a single task + event
You don’t need a separate `ready` `Future` plus two layers of functions. Let the connection task manage the stack and signal readiness via an internal `Event` or just `self.session`:
```python
class MCPClient:
def __init__(self) -> None:
self.session: mcp.ClientSession | None = None
self._connection_task: asyncio.Task | None = None
self._connected = asyncio.Event()
# no exit_stack attribute exposed; it's purely internal to the task
async def _connection_task_main(self, mcp_server_config: dict, name: str) -> None:
stack = AsyncExitStack()
try:
cfg = _prepare_config(mcp_server_config.copy())
# ... your existing logging_callback, etc. ...
# all previous _do_connect logic moves here, using `stack`
# when fully connected:
self.session = session
self._connected.set()
# Hold the connection open until cancelled.
await asyncio.Future() # parked forever; cancellation exits
finally:
try:
await stack.aclose()
except Exception as e:
logger.debug(f"Error closing exit stack for {name}: {e}")
self.session = None
self._connected.clear()
```
Then `connect_to_server` becomes simpler and doesn’t need to juggle a `ready` future:
```python
async def connect_to_server(self, mcp_server_config: dict, name: str) -> None:
self._mcp_server_config = mcp_server_config
self._server_name = name
loop = asyncio.get_running_loop()
self._connection_task = loop.create_task(
self._connection_task_main(mcp_server_config, name),
name=f"mcp-conn:{name}",
)
try:
await self._connected.wait()
except asyncio.CancelledError:
# caller cancelled while waiting: cancel and await the task inline
task = self._connection_task
self._connection_task = None
if task:
task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await task
raise
```
This preserves the key property that the same task that enters the stack also exits it, without an explicit `ready` `Future` threading through multiple functions.
### 2. Replace `_old_connection_tasks` with “cancel + await” of a single task
Instead of maintaining a list with pruning and a shared helper, you can cancel and await the current task directly on reconnect and cleanup, which is easier to reason about:
```python
async def _stop_connection_task(self) -> None:
task = self._connection_task
if not task:
return
self._connection_task = None
if task.done():
return
task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await task
```
Use it in `_reconnect`:
```python
async def _reconnect(self) -> None:
async with self._reconnect_lock:
if self._reconnecting:
logger.debug(f"MCP Client {self._server_name} is already reconnecting, skipping")
return
if not self._mcp_server_config or not self._server_name:
raise Exception("Cannot reconnect: missing connection configuration")
self._reconnecting = True
try:
logger.info(f"Attempting to reconnect to MCP server {self._server_name}...")
await self._stop_connection_task()
self.session = None
await self.connect_to_server(self._mcp_server_config, self._server_name)
await self.list_tools_and_save()
logger.info(f"Successfully reconnected to MCP server {self._server_name}")
except Exception as e:
logger.error(f"Failed to reconnect to MCP server {self._server_name}: {e}")
finally:
self._reconnecting = False
```
And in `cleanup`:
```python
async def cleanup(self) -> None:
await self._stop_connection_task()
self.running_event.set()
```
This removes `_old_connection_tasks` and `_cancel_connection_task` entirely while still ensuring the connection task finishes and its `AsyncExitStack` is closed from the correct task context.
### 3. Use a clearer “park forever” idiom
If you keep a parked wait, `await asyncio.Future()` with a comment is more idiomatic than `await asyncio.Event().wait()`:
```python
# Hold the connection open until cancelled by the owner.
await asyncio.Future() # never completed; cancellation exits the task
```
With these changes, the “one owner task owns the stack and lifetime” model remains intact, but the control flow becomes:
- one connection task function,
- one readiness signal (`self._connected`),
- one task reference (`self._connection_task`),
- simple cancel+await on reconnect/cleanup.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| if self._old_connection_tasks: | ||
| pending = [t for t in self._old_connection_tasks if not t.done()] | ||
| if pending: | ||
| await asyncio.gather(*pending, return_exceptions=True) | ||
| self._old_connection_tasks.clear() |
There was a problem hiding this comment.
issue (bug_risk): Be careful about calling cleanup from within a connection task, as it will await on the task it just cancelled
Since cleanup cancels self._connection_task and then awaits _old_connection_tasks, if cleanup is ever invoked from inside the currently running _connection_task you’ll end up awaiting on the current task, which raises RuntimeError. If that call path is possible, consider avoiding adding the current task to _old_connection_tasks when cleanup is called from it, or skip awaiting any task equal to asyncio.current_task().
attempt to fix #8056
修改
astrbot/core/agent/mcp_client.py。MCP 重连时旧的
AsyncExitStack未关闭,GC 回收时 asyncio 在错误的 task 里退出 anyio cancel scope,触发_deliver_cancellation的call_soon自循环,导致epoll_wait以timeout=0无限空转,主线程 CPU 100%。Modifications / 改动点
用 owner task 模式替代原有的
_old_exit_stacks方案,每次连接在独立的_connection_task中运行并持有AsyncExitStack,清理时直接 cancel 该 task,确保 anyio cancel scope 始终在正确的 task context 里退出。Screenshots or Test Results / 运行截图或测试结果
Checklist / 检查清单
😊 If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.
/ 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。
👀 My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
/ 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”。
🤓 I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in
requirements.txtandpyproject.toml./ 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到
requirements.txt和pyproject.toml文件相应位置。😮 My changes do not introduce malicious code.
/ 我的更改没有引入恶意代码。
Summary by Sourcery
Ensure MCP client connections are owned and cleaned up by dedicated asyncio tasks to prevent cancel-scope violations and event-loop busy-waiting during reconnects and cleanup.
Bug Fixes:
Enhancements: