From 84ac69740e257ff2c88ce42182d5510335004ca0 Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Mon, 9 Feb 2026 16:55:38 +0000 Subject: [PATCH] Add idle session timeout to StreamableHTTPSessionManager MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sessions created via StreamableHTTPSessionManager persist indefinitely in _server_instances even after the client disconnects, leaking memory over time. Add a session_idle_timeout parameter that automatically terminates and removes sessions that receive no HTTP requests for the configured duration. Each session manages its own lifetime via an anyio CancelScope deadline — no background reaper task needed. Incoming requests push the deadline forward to keep active sessions alive. Github-Issue: #1283 --- src/mcp/server/streamable_http.py | 2 + src/mcp/server/streamable_http_manager.py | 49 +++- .../issues/test_1283_idle_session_cleanup.py | 267 ++++++++++++++++++ 3 files changed, 311 insertions(+), 7 deletions(-) create mode 100644 tests/issues/test_1283_idle_session_cleanup.py diff --git a/src/mcp/server/streamable_http.py b/src/mcp/server/streamable_http.py index e9156f7ba..0795a4bab 100644 --- a/src/mcp/server/streamable_http.py +++ b/src/mcp/server/streamable_http.py @@ -169,6 +169,8 @@ def __init__( ] = {} self._sse_stream_writers: dict[RequestId, MemoryObjectSendStream[dict[str, str]]] = {} self._terminated = False + # Idle timeout cancel scope; managed by the session manager. + self.idle_scope: anyio.CancelScope | None = None @property def is_terminated(self) -> bool: diff --git a/src/mcp/server/streamable_http_manager.py b/src/mcp/server/streamable_http_manager.py index a954b24a4..cef42a30f 100644 --- a/src/mcp/server/streamable_http_manager.py +++ b/src/mcp/server/streamable_http_manager.py @@ -39,6 +39,7 @@ class StreamableHTTPSessionManager: 2. Resumability via an optional event store 3. Connection management and lifecycle 4. Request handling and transport setup + 5. Idle session cleanup via optional timeout Important: Only one StreamableHTTPSessionManager instance should be created per application. The instance cannot be reused after its run() context has @@ -56,6 +57,14 @@ class StreamableHTTPSessionManager: security_settings: Optional transport security settings. retry_interval: Retry interval in milliseconds to suggest to clients in SSE retry field. Used for SSE polling behavior. + session_idle_timeout: Optional idle timeout in seconds for stateful sessions. + If set, sessions that receive no HTTP requests for this + duration will be automatically terminated and removed. + When retry_interval is also configured, ensure the idle + timeout comfortably exceeds the retry interval to avoid + reaping sessions during normal SSE polling gaps. + Default is None (no timeout). A value of 1800 + (30 minutes) is recommended for most deployments. """ def __init__( @@ -66,13 +75,20 @@ def __init__( stateless: bool = False, security_settings: TransportSecuritySettings | None = None, retry_interval: int | None = None, + session_idle_timeout: float | None = None, ): + if session_idle_timeout is not None and session_idle_timeout <= 0: + raise ValueError("session_idle_timeout must be a positive number of seconds") + if stateless and session_idle_timeout is not None: + raise ValueError("session_idle_timeout is not supported in stateless mode") + self.app = app self.event_store = event_store self.json_response = json_response self.stateless = stateless self.security_settings = security_settings self.retry_interval = retry_interval + self.session_idle_timeout = session_idle_timeout # Session tracking (only used if not stateless) self._session_creation_lock = anyio.Lock() @@ -184,6 +200,9 @@ async def _handle_stateful_request(self, scope: Scope, receive: Receive, send: S if request_mcp_session_id is not None and request_mcp_session_id in self._server_instances: # pragma: no cover transport = self._server_instances[request_mcp_session_id] logger.debug("Session already exists, handling request directly") + # Push back idle deadline on activity + if transport.idle_scope is not None and self.session_idle_timeout is not None: + transport.idle_scope.deadline = anyio.current_time() + self.session_idle_timeout await transport.handle_request(scope, receive, send) return @@ -210,16 +229,32 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE read_stream, write_stream = streams task_status.started() try: - await self.app.run( - read_stream, - write_stream, - self.app.create_initialization_options(), - stateless=False, # Stateful mode - ) + # Use a cancel scope for idle timeout — when the + # deadline passes the scope cancels app.run() and + # execution continues after the ``with`` block. + # Incoming requests push the deadline forward. + idle_scope = anyio.CancelScope() + if self.session_idle_timeout is not None: + idle_scope.deadline = anyio.current_time() + self.session_idle_timeout + http_transport.idle_scope = idle_scope + + with idle_scope: + await self.app.run( + read_stream, + write_stream, + self.app.create_initialization_options(), + stateless=False, + ) + + if idle_scope.cancelled_caught: + session_id = http_transport.mcp_session_id + logger.info(f"Session {session_id} idle timeout") + if session_id is not None: # pragma: no branch + self._server_instances.pop(session_id, None) + await http_transport.terminate() except Exception: logger.exception(f"Session {http_transport.mcp_session_id} crashed") finally: - # Only remove from instances if not terminated if ( # pragma: no branch http_transport.mcp_session_id and http_transport.mcp_session_id in self._server_instances diff --git a/tests/issues/test_1283_idle_session_cleanup.py b/tests/issues/test_1283_idle_session_cleanup.py new file mode 100644 index 000000000..9a9193786 --- /dev/null +++ b/tests/issues/test_1283_idle_session_cleanup.py @@ -0,0 +1,267 @@ +"""Test for issue #1283 - Memory leak from idle sessions never being cleaned up. + +Without an idle timeout mechanism, sessions created via StreamableHTTPSessionManager +persist indefinitely in ``_server_instances`` even after the client disconnects. +Over time this leaks memory. + +The ``session_idle_timeout`` parameter on ``StreamableHTTPSessionManager`` allows +the manager to automatically terminate and remove sessions that have been idle for +longer than the configured duration. +""" + +import time +from collections.abc import Callable, Coroutine +from typing import Any + +import anyio +import pytest +from starlette.types import Message, Scope + +from mcp.server.lowlevel import Server +from mcp.server.streamable_http import MCP_SESSION_ID_HEADER, StreamableHTTPServerTransport +from mcp.server.streamable_http_manager import StreamableHTTPSessionManager + + +def _make_scope() -> Scope: + return { + "type": "http", + "method": "POST", + "path": "/mcp", + "headers": [(b"content-type", b"application/json")], + } + + +async def _mock_receive() -> Message: # pragma: no cover + return {"type": "http.request", "body": b"", "more_body": False} + + +def _make_send(sent: list[Message]) -> Callable[[Message], Coroutine[Any, Any, None]]: + async def mock_send(message: Message) -> None: + sent.append(message) + + return mock_send + + +def _extract_session_id(sent_messages: list[Message]) -> str: + for msg in sent_messages: + if msg["type"] == "http.response.start": # pragma: no branch + for name, value in msg.get("headers", []): # pragma: no branch + if name.decode().lower() == MCP_SESSION_ID_HEADER.lower(): # pragma: no branch + return value.decode() + raise AssertionError("Session ID not found in response headers") # pragma: no cover + + +@pytest.mark.anyio +async def test_idle_session_is_reaped(): + """Session should be removed from _server_instances after idle timeout.""" + app = Server("test-idle-reap") + manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=0.15) + + async with manager.run(): + sent: list[Message] = [] + await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent)) + session_id = _extract_session_id(sent) + + assert session_id in manager._server_instances + + # Wait for the cancel scope deadline to fire + await anyio.sleep(0.4) + + assert session_id not in manager._server_instances + + +@pytest.mark.anyio +async def test_activity_resets_idle_timer(): + """Requests during the timeout window should prevent the session from being reaped.""" + app = Server("test-idle-reset") + manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=0.3) + + async with manager.run(): + sent: list[Message] = [] + await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent)) + session_id = _extract_session_id(sent) + + # Simulate ongoing activity by pushing back the idle scope deadline + transport = manager._server_instances[session_id] + assert transport.idle_scope is not None + for _ in range(4): + await anyio.sleep(0.1) + transport.idle_scope.deadline = anyio.current_time() + 0.3 + + # Session should still be alive because we kept it active + assert session_id in manager._server_instances + + # Now stop activity and let the timeout expire + await anyio.sleep(0.6) + + assert session_id not in manager._server_instances + + +@pytest.mark.anyio +async def test_multiple_sessions_reaped_independently(): + """Each session tracks its own idle timeout independently.""" + app = Server("test-multi-idle") + manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=0.15) + + async with manager.run(): + sent1: list[Message] = [] + await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent1)) + session_id_1 = _extract_session_id(sent1) + + await anyio.sleep(0.05) + sent2: list[Message] = [] + await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent2)) + session_id_2 = _extract_session_id(sent2) + + assert session_id_1 in manager._server_instances + assert session_id_2 in manager._server_instances + + # After enough time, both should be reaped + await anyio.sleep(0.4) + + assert session_id_1 not in manager._server_instances + assert session_id_2 not in manager._server_instances + + +def test_session_idle_timeout_rejects_negative(): + """session_idle_timeout must be a positive number.""" + with pytest.raises(ValueError, match="positive number"): + StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=-1) + + +def test_session_idle_timeout_rejects_zero(): + """session_idle_timeout must be a positive number.""" + with pytest.raises(ValueError, match="positive number"): + StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=0) + + +def test_session_idle_timeout_rejects_stateless(): + """session_idle_timeout is not supported in stateless mode.""" + with pytest.raises(ValueError, match="not supported in stateless"): + StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=30, stateless=True) + + +@pytest.mark.anyio +async def test_terminate_idempotency(): + """Calling terminate() multiple times should be safe.""" + transport = StreamableHTTPServerTransport(mcp_session_id="test-idempotent") + + async with transport.connect(): + await transport.terminate() + assert transport.is_terminated + + # Second call should be a no-op (no exception) + await transport.terminate() + assert transport.is_terminated + + +@pytest.mark.anyio +async def test_no_idle_timeout_sessions_persist(): + """When session_idle_timeout is None (default), sessions persist indefinitely.""" + app = Server("test-no-timeout") + manager = StreamableHTTPSessionManager(app=app) + + async with manager.run(): + sent: list[Message] = [] + await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent)) + session_id = _extract_session_id(sent) + + await anyio.sleep(0.3) + assert session_id in manager._server_instances + + +@pytest.mark.anyio +async def test_run_server_exits_promptly_after_idle_timeout(): + """The run_server task must exit shortly after the idle timeout fires.""" + app = Server("test-lifecycle") + + task_exited = anyio.Event() + exit_timestamp: list[float] = [] + original_run = app.run + + async def instrumented_run(*args: Any, **kwargs: Any) -> None: + try: + await original_run(*args, **kwargs) + finally: + exit_timestamp.append(time.monotonic()) + task_exited.set() + + app.run = instrumented_run # type: ignore[assignment] + + idle_timeout = 0.5 + manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=idle_timeout) + + async with manager.run(): + sent: list[Message] = [] + await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent)) + session_id = _extract_session_id(sent) + assert session_id in manager._server_instances + + pre_reap_time = time.monotonic() + + with anyio.fail_after(idle_timeout * 4): + await task_exited.wait() + + assert len(exit_timestamp) == 1 + total_elapsed = exit_timestamp[0] - pre_reap_time + assert total_elapsed < idle_timeout * 3, ( + f"run_server task took {total_elapsed:.3f}s to exit; expected < {idle_timeout * 3:.1f}s" + ) + assert session_id not in manager._server_instances + + +@pytest.mark.anyio +async def test_run_server_finally_block_runs_after_terminate(): + """Verify that the finally block in run_server executes after terminate().""" + app = Server("test-finally") + + lifecycle_events: list[str] = [] + original_run = app.run + + async def instrumented_run(*args: Any, **kwargs: Any) -> None: + lifecycle_events.append("run_entered") + try: + await original_run(*args, **kwargs) + finally: + lifecycle_events.append("run_exited") + + app.run = instrumented_run # type: ignore[assignment] + + manager = StreamableHTTPSessionManager(app=app) + + async with manager.run(): + sent: list[Message] = [] + await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent)) + session_id = _extract_session_id(sent) + transport = manager._server_instances[session_id] + + assert "run_entered" in lifecycle_events + assert "run_exited" not in lifecycle_events + + await transport.terminate() + + with anyio.fail_after(3.0): + while "run_exited" not in lifecycle_events: + await anyio.sleep(0.01) + + assert "run_exited" in lifecycle_events + + +@pytest.mark.anyio +async def test_idle_timeout_end_to_end(): + """End-to-end: idle timeout causes session cleanup with a real Server.""" + app = Server("test-e2e") + idle_timeout = 0.3 + manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=idle_timeout) + + async with manager.run(): + sent: list[Message] = [] + await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent)) + session_id = _extract_session_id(sent) + assert session_id in manager._server_instances + + with anyio.fail_after(idle_timeout + 1.0): + while session_id in manager._server_instances: + await anyio.sleep(0.05) + + assert session_id not in manager._server_instances