Skip to content

fix(mcp): attempt to fix epoll busy-wait caused by leaked SSE exit stack#8307

Open
lingyun14beta wants to merge 3 commits into
AstrBotDevs:masterfrom
lingyun14beta:fix/mcp-sse-exit-stack-owner-task
Open

fix(mcp): attempt to fix epoll busy-wait caused by leaked SSE exit stack#8307
lingyun14beta wants to merge 3 commits into
AstrBotDevs:masterfrom
lingyun14beta:fix/mcp-sse-exit-stack-owner-task

Conversation

@lingyun14beta
Copy link
Copy Markdown
Contributor

@lingyun14beta lingyun14beta commented May 23, 2026

attempt to fix #8056

修改 astrbot/core/agent/mcp_client.py

MCP 重连时旧的 AsyncExitStack 未关闭,GC 回收时 asyncio 在错误的 task 里退出 anyio cancel scope,触发 _deliver_cancellationcall_soon 自循环,导致 epoll_waittimeout=0 无限空转,主线程 CPU 100%。

Modifications / 改动点

用 owner task 模式替代原有的 _old_exit_stacks 方案,每次连接在独立的 _connection_task 中运行并持有 AsyncExitStack,清理时直接 cancel 该 task,确保 anyio cancel scope 始终在正确的 task context 里退出。

  • This is NOT a breaking change. / 这不是一个破坏性变更。

Screenshots or Test Results / 运行截图或测试结果

image 验证脚本直接模拟了核心行为,本地astr也能跑,看起来没问题)

Checklist / 检查清单

  • 😊 If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.
    / 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。

  • 👀 My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
    / 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”

  • 🤓 I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in requirements.txt and pyproject.toml.
    / 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到 requirements.txtpyproject.toml 文件相应位置。

  • 😮 My changes do not introduce malicious code.
    / 我的更改没有引入恶意代码。

Summary by Sourcery

Ensure MCP client connections are owned and cleaned up by dedicated asyncio tasks to prevent cancel-scope violations and event-loop busy-waiting during reconnects and cleanup.

Bug Fixes:

  • Fix epoll busy-wait and CPU spin caused by AsyncExitStack being closed from the wrong task or GC finalizer during MCP reconnections.

Enhancements:

  • Refactor MCP client connection management to use per-connection owner tasks that encapsulate the AsyncExitStack lifecycle and reconnection handling.
  • Improve MCP client cleanup to cancel and await all connection tasks, avoiding leaked resources across reconnections.

@auto-assign auto-assign Bot requested review from Fridemn and anka-afk May 23, 2026 15:52
@gemini-code-assist
Copy link
Copy Markdown
Contributor

Warning

Gemini encountered an error creating the review. You can try again by commenting /gemini review.

@dosubot dosubot Bot added size:L This PR changes 100-499 lines, ignoring generated files. area:core The bug / feature is about astrbot's core, backend labels May 23, 2026
@lingyun14beta
Copy link
Copy Markdown
Contributor Author

/gemini review

Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 2 issues, and left some high level feedback:

  • In connect_to_server, a new _connection_task is created unconditionally without checking for or cancelling any existing task; consider asserting that _connection_task is None or explicitly cancelling/awaiting the old task to avoid accidental concurrent connections or task leaks if connect_to_server is called twice.
  • The helper _cancel_connection_task is only used in _reconnect but similar cancellation/awaiting logic is duplicated in cleanup; it would be clearer and less error-prone to centralize connection-task teardown (including tracking in _old_connection_tasks) through a single helper so reconnect and cleanup follow the same code path.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `connect_to_server`, a new `_connection_task` is created unconditionally without checking for or cancelling any existing task; consider asserting that `_connection_task` is `None` or explicitly cancelling/awaiting the old task to avoid accidental concurrent connections or task leaks if `connect_to_server` is called twice.
- The helper `_cancel_connection_task` is only used in `_reconnect` but similar cancellation/awaiting logic is duplicated in `cleanup`; it would be clearer and less error-prone to centralize connection-task teardown (including tracking in `_old_connection_tasks`) through a single helper so reconnect and cleanup follow the same code path.

## Individual Comments

### Comment 1
<location path="astrbot/core/agent/mcp_client.py" line_range="471-475" />
<code_context>
+        # We race two futures: the connection task itself (signals failure) and
+        # a waiter for _session_ready (signals success).  The waiter must be
+        # cancelled after asyncio.wait returns to avoid a task leak.
+        ready_waiter = asyncio.ensure_future(self._session_ready.wait())
+        try:
+            done, _ = await asyncio.wait(
</code_context>
<issue_to_address>
**suggestion:** Prefer asyncio.create_task over asyncio.ensure_future with coroutines

Because `self._session_ready.wait()` is a coroutine and this is asyncio code, `asyncio.create_task` is the clearer, more idiomatic way to schedule it on the current event loop and avoids the ambiguous "future or coroutine" behavior of `asyncio.ensure_future`.

```suggestion
        # Wait until the session is initialised (or the task fails).
        # We race two futures: the connection task itself (signals failure) and
        # a waiter for _session_ready (signals success).  The waiter must be
        # cancelled after asyncio.wait returns to avoid a task leak.
        ready_waiter = asyncio.create_task(self._session_ready.wait())
```
</issue_to_address>

### Comment 2
<location path="astrbot/core/agent/mcp_client.py" line_range="416" />
<code_context>
         self._server_name: str | None = None
         self._reconnect_lock = asyncio.Lock()  # Lock for thread-safe reconnection
         self._reconnecting: bool = False  # For logging and debugging
+        self._session_ready = asyncio.Event()  # set when session is initialised
+
+    async def _run_connection(self, mcp_server_config: dict, name: str) -> None:
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying the new connection lifecycle by using a per-call Future to signal readiness and centralizing connection-task cancellation/await logic in a single helper.

The new connection lifecycle does add extra moving parts that can be simplified without changing the “owner task holds cancel scopes” property.

Consider these focused changes:

### 1. Remove global `_session_ready` and the task/event race

Right now you have:

- `_run_connection` + `_do_connect`
- `self._session_ready` as an instance attribute, recreated per connection
- `connect_to_server` racing `_connection_task` vs an `asyncio.Event().wait()` future

You can keep `_run_connection` as the single owner task, but signal “initial connect done or failed” via a per-call `Future` passed into `_run_connection`, instead of a shared `Event` and a race.

For example:

```python
class MCPClient:
    def __init__(self) -> None:
        self.session: mcp.ClientSession | None = None
        self._connection_task: asyncio.Task | None = None
        self._old_connection_tasks: list[asyncio.Task] = []
        self.exit_stack = AsyncExitStack()
        # remove: self._session_ready
```

Change `_run_connection` to accept a ready future:

```python
async def _run_connection(
    self,
    mcp_server_config: dict,
    name: str,
    ready: asyncio.Future[None],
) -> None:
    self.exit_stack = AsyncExitStack()
    try:
        try:
            await self._do_connect(mcp_server_config, name)
        except Exception as exc:
            if not ready.done():
                ready.set_exception(exc)
            raise
        else:
            if not ready.done():
                ready.set_result(None)

        # Hold the connection open until cancelled.
        await asyncio.Event().wait()
    finally:
        try:
            await self.exit_stack.aclose()
        except Exception as e:
            logger.debug(f"Error closing exit stack for {name}: {e}")
        # Ensure the waiter is not left pending if we exit very early
        if not ready.done():
            ready.set_exception(RuntimeError("Connection task exited early"))
```

Then `connect_to_server` becomes a single-await on that future, no `asyncio.wait`/`ensure_future`/`Event` race:

```python
async def connect_to_server(self, mcp_server_config: dict, name: str) -> None:
    self._mcp_server_config = mcp_server_config
    self._server_name = name

    ready: asyncio.Future[None] = asyncio.get_running_loop().create_future()

    self._connection_task = asyncio.create_task(
        self._run_connection(mcp_server_config, name, ready),
        name=f"mcp-conn:{name}",
    )

    try:
        await ready  # either completes or raises from _do_connect/_run_connection
    except asyncio.CancelledError:
        # Caller cancelled while waiting; tear down the task and re-raise
        if self._connection_task and not self._connection_task.done():
            self._cancel_connection_task(self._connection_task)
            self._connection_task = None
        raise
    # If we get here, the connection is established and self.session initialized
```

This:

- Eliminates `_session_ready` as mutable shared state.
- Eliminates the extra “ready_waiter” task and `asyncio.wait`/FIRST_COMPLETED logic.
- Keeps the property that `_run_connection` owns the `AsyncExitStack` and cancel scopes.

### 2. Unify cancellation/await logic for `_connection_task`

You already have `_cancel_connection_task` plus similar logic in `cleanup`. To reduce duplication and make task lifetime easier to reason about, reuse the helper in `cleanup`:

```python
def _cancel_connection_task(self, task: asyncio.Task) -> None:
    if task.done():
        return
    task.cancel()
    self._old_connection_tasks.append(task)
```

Then:

```python
async def cleanup(self) -> None:
    """Clean up resources by cancelling the connection owner task."""
    if self._connection_task:
        self._cancel_connection_task(self._connection_task)
        self._connection_task = None

    if self._old_connection_tasks:
        pending = [t for t in self._old_connection_tasks if not t.done()]
        if pending:
            await asyncio.gather(*pending, return_exceptions=True)
        self._old_connection_tasks.clear()

    self.running_event.set()
```

This keeps all cancellation + tracking behaviour in one place and makes it clear that every cancelled connection owner task is awaited exactly once.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment thread astrbot/core/agent/mcp_client.py Outdated
Comment thread astrbot/core/agent/mcp_client.py Outdated
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the MCP client to manage connections via a dedicated owner task, ensuring that AsyncExitStack and associated cancel scopes are closed within the correct task context to prevent runtime errors. Review feedback identifies a critical race condition where the shared exit_stack instance variable could be overwritten during reconnection, suggesting the use of a local variable to capture the stack instance. Further improvements include initializing exit_stack to None in the constructor to avoid redundant allocation and pruning finished tasks from the tracking list to prevent memory accumulation in long-running processes.

Comment thread astrbot/core/agent/mcp_client.py Outdated
Comment thread astrbot/core/agent/mcp_client.py Outdated
Comment thread astrbot/core/agent/mcp_client.py
@lingyun14beta
Copy link
Copy Markdown
Contributor Author

@sourcery-ai review

@lingyun14beta
Copy link
Copy Markdown
Contributor Author

/gemini review

Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 4 issues, and left some high level feedback:

  • Consider making exit_stack a private attribute (e.g. _exit_stack) or documenting it as internal-only to avoid accidental external access now that its lifecycle is tightly coupled to _run_connection.
  • In _run_connection, you might want to log or surface more context when the connection task exits early (e.g. before ready is resolved) so operators can distinguish intentional shutdown from unexpected failures.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- Consider making `exit_stack` a private attribute (e.g. `_exit_stack`) or documenting it as internal-only to avoid accidental external access now that its lifecycle is tightly coupled to `_run_connection`.
- In `_run_connection`, you might want to log or surface more context when the connection task exits early (e.g. before `ready` is resolved) so operators can distinguish intentional shutdown from unexpected failures.

## Individual Comments

### Comment 1
<location path="astrbot/core/agent/mcp_client.py" line_range="440-441" />
<code_context>
+        # Capture the stack in a local variable so that if self.exit_stack is
+        # overwritten by a concurrent _run_connection (during reconnect), this
+        # task's finally block still closes only the resources it opened.
+        stack = self.exit_stack = AsyncExitStack()
+        try:
+            try:
+                await self._do_connect(mcp_server_config, name)
</code_context>
<issue_to_address>
**suggestion:** Consider avoiding storing the AsyncExitStack on `self` or resetting it to `None` after closure to prevent accidental reuse

After `stack.aclose()` in the `finally` block, `self.exit_stack` still references a closed `AsyncExitStack`. Even though callers are told not to use `exit_stack`, leaving a closed stack on `self` increases the risk of future misuse. Consider either keeping `stack` local (not storing it on `self`) or explicitly setting `self.exit_stack = None` after `stack.aclose()` completes.

Suggested implementation:

```python
        # Capture the stack in a local variable so that if self.exit_stack is
        # overwritten by a concurrent _run_connection (during reconnect), this
        # task's finally block still closes only the resources it opened.
        stack = AsyncExitStack()
        self.exit_stack = stack

```

```python
        finally:

```

```python
            await stack.aclose()
            # Ensure we don't keep a reference to a closed AsyncExitStack on self,
            # but only clear it if it still refers to this task's stack to avoid
            # races with a concurrent reconnect.
            if self.exit_stack is stack:
                self.exit_stack = None

```

If the actual `finally` block does not contain `await stack.aclose()` exactly as shown, adjust the second SEARCH/REPLACE block to match the existing close call (e.g. `await self.exit_stack.aclose()` or `stack.aclose()` without `await`) and append the `if self.exit_stack is stack: self.exit_stack = None` logic immediately after the close call in that block.
</issue_to_address>

### Comment 2
<location path="astrbot/core/agent/mcp_client.py" line_range="484-486" />
<code_context>

+        ready: asyncio.Future = asyncio.get_running_loop().create_future()
+
+        self._connection_task = asyncio.create_task(
+            self._run_connection(mcp_server_config, name, ready),
+            name=f"mcp-conn:{name}",
+        )
+
</code_context>
<issue_to_address>
**issue (bug_risk):** Clarify semantics or enforce that only one live `_connection_task` exists at a time

`connect_to_server` assumes a single logical connection, but nothing prevents it from being called again while a previous `_connection_task` is still running. In that case, `self._connection_task` is overwritten, the old task keeps running, and it’s never tracked in `_old_connection_tasks` (since `_cancel_connection_task` isn’t called). If multiple concurrent connections are not desired, consider either cancelling/awaiting any existing `_connection_task` before starting a new one, or clearly documenting that only one call may be active at a time to avoid orphaned tasks.
</issue_to_address>

### Comment 3
<location path="astrbot/core/agent/mcp_client.py" line_range="740-744" />
<code_context>
+            self._cancel_connection_task(self._connection_task)
+            self._connection_task = None
+
+        if self._old_connection_tasks:
+            pending = [t for t in self._old_connection_tasks if not t.done()]
+            if pending:
+                await asyncio.gather(*pending, return_exceptions=True)
+            self._old_connection_tasks.clear()
+
+        # Set running_event to unblock any waiting tasks
</code_context>
<issue_to_address>
**issue (bug_risk):** Be careful about calling `cleanup` from within a connection task, as it will await on the task it just cancelled

Since `cleanup` cancels `self._connection_task` and then awaits `_old_connection_tasks`, if `cleanup` is ever invoked from inside the currently running `_connection_task` you’ll end up awaiting on the current task, which raises `RuntimeError`. If that call path is possible, consider avoiding adding the current task to `_old_connection_tasks` when `cleanup` is called from it, or skip awaiting any task equal to `asyncio.current_task()`.
</issue_to_address>

### Comment 4
<location path="astrbot/core/agent/mcp_client.py" line_range="417" />
<code_context>
         self._reconnect_lock = asyncio.Lock()  # Lock for thread-safe reconnection
         self._reconnecting: bool = False  # For logging and debugging

+    async def _run_connection(
+        self,
+        mcp_server_config: dict,
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying the connection lifecycle by consolidating task/stack ownership into a single connection task with one readiness signal and straightforward cancel-and-await shutdown semantics.

You can keep the “host task owns the stack” invariant while dropping a lot of the extra machinery:

### 1. Collapse `_run_connection` / `_do_connect` / `ready` into a single task + event

You don’t need a separate `ready` `Future` plus two layers of functions. Let the connection task manage the stack and signal readiness via an internal `Event` or just `self.session`:

```python
class MCPClient:
    def __init__(self) -> None:
        self.session: mcp.ClientSession | None = None
        self._connection_task: asyncio.Task | None = None
        self._connected = asyncio.Event()
        # no exit_stack attribute exposed; it's purely internal to the task

    async def _connection_task_main(self, mcp_server_config: dict, name: str) -> None:
        stack = AsyncExitStack()
        try:
            cfg = _prepare_config(mcp_server_config.copy())
            # ... your existing logging_callback, etc. ...
            # all previous _do_connect logic moves here, using `stack`

            # when fully connected:
            self.session = session
            self._connected.set()

            # Hold the connection open until cancelled.
            await asyncio.Future()  # parked forever; cancellation exits
        finally:
            try:
                await stack.aclose()
            except Exception as e:
                logger.debug(f"Error closing exit stack for {name}: {e}")
            self.session = None
            self._connected.clear()
```

Then `connect_to_server` becomes simpler and doesn’t need to juggle a `ready` future:

```python
async def connect_to_server(self, mcp_server_config: dict, name: str) -> None:
    self._mcp_server_config = mcp_server_config
    self._server_name = name

    loop = asyncio.get_running_loop()
    self._connection_task = loop.create_task(
        self._connection_task_main(mcp_server_config, name),
        name=f"mcp-conn:{name}",
    )

    try:
        await self._connected.wait()
    except asyncio.CancelledError:
        # caller cancelled while waiting: cancel and await the task inline
        task = self._connection_task
        self._connection_task = None
        if task:
            task.cancel()
            with contextlib.suppress(asyncio.CancelledError):
                await task
        raise
```

This preserves the key property that the same task that enters the stack also exits it, without an explicit `ready` `Future` threading through multiple functions.

### 2. Replace `_old_connection_tasks` with “cancel + await” of a single task

Instead of maintaining a list with pruning and a shared helper, you can cancel and await the current task directly on reconnect and cleanup, which is easier to reason about:

```python
async def _stop_connection_task(self) -> None:
    task = self._connection_task
    if not task:
        return
    self._connection_task = None
    if task.done():
        return
    task.cancel()
    with contextlib.suppress(asyncio.CancelledError):
        await task
```

Use it in `_reconnect`:

```python
async def _reconnect(self) -> None:
    async with self._reconnect_lock:
        if self._reconnecting:
            logger.debug(f"MCP Client {self._server_name} is already reconnecting, skipping")
            return

        if not self._mcp_server_config or not self._server_name:
            raise Exception("Cannot reconnect: missing connection configuration")

        self._reconnecting = True
        try:
            logger.info(f"Attempting to reconnect to MCP server {self._server_name}...")

            await self._stop_connection_task()
            self.session = None

            await self.connect_to_server(self._mcp_server_config, self._server_name)
            await self.list_tools_and_save()
            logger.info(f"Successfully reconnected to MCP server {self._server_name}")
        except Exception as e:
            logger.error(f"Failed to reconnect to MCP server {self._server_name}: {e}")
        finally:
            self._reconnecting = False
```

And in `cleanup`:

```python
async def cleanup(self) -> None:
    await self._stop_connection_task()
    self.running_event.set()
```

This removes `_old_connection_tasks` and `_cancel_connection_task` entirely while still ensuring the connection task finishes and its `AsyncExitStack` is closed from the correct task context.

### 3. Use a clearer “park forever” idiom

If you keep a parked wait, `await asyncio.Future()` with a comment is more idiomatic than `await asyncio.Event().wait()`:

```python
# Hold the connection open until cancelled by the owner.
await asyncio.Future()  # never completed; cancellation exits the task
```

With these changes, the “one owner task owns the stack and lifetime” model remains intact, but the control flow becomes:

- one connection task function,
- one readiness signal (`self._connected`),
- one task reference (`self._connection_task`),
- simple cancel+await on reconnect/cleanup.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment thread astrbot/core/agent/mcp_client.py
Comment thread astrbot/core/agent/mcp_client.py
Comment on lines +740 to +744
if self._old_connection_tasks:
pending = [t for t in self._old_connection_tasks if not t.done()]
if pending:
await asyncio.gather(*pending, return_exceptions=True)
self._old_connection_tasks.clear()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Be careful about calling cleanup from within a connection task, as it will await on the task it just cancelled

Since cleanup cancels self._connection_task and then awaits _old_connection_tasks, if cleanup is ever invoked from inside the currently running _connection_task you’ll end up awaiting on the current task, which raises RuntimeError. If that call path is possible, consider avoiding adding the current task to _old_connection_tasks when cleanup is called from it, or skip awaiting any task equal to asyncio.current_task().

Comment thread astrbot/core/agent/mcp_client.py
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:core The bug / feature is about astrbot's core, backend size:L This PR changes 100-499 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug]启动后过一段时间,CPU占用极高,epoll_wait非阻塞忙等

1 participant