From 2c87708cfd3129b341d7c43cb5a2c83c4982515b Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Mon, 9 Feb 2026 13:47:28 +0000 Subject: [PATCH 1/3] feat: add idle timeout for StreamableHTTP sessions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a session_idle_timeout parameter to StreamableHTTPSessionManager that enables automatic cleanup of idle sessions, fixing the memory leak described in #1283. Uses a CancelScope with a deadline inside each session's run_server task. When the deadline passes, app.run() is cancelled and the session cleans up. Incoming requests push the deadline forward. No background tasks needed — each session manages its own lifecycle. - terminate() on the transport is now idempotent - Effective timeout accounts for retry_interval - Default is None (no timeout, fully backwards compatible) Github-Issue: #1283 --- src/mcp/server/streamable_http.py | 6 + src/mcp/server/streamable_http_manager.py | 61 +++- .../issues/test_1283_idle_session_cleanup.py | 286 ++++++++++++++++++ 3 files changed, 346 insertions(+), 7 deletions(-) create mode 100644 tests/issues/test_1283_idle_session_cleanup.py diff --git a/src/mcp/server/streamable_http.py b/src/mcp/server/streamable_http.py index 2613b530c..bd802a20a 100644 --- a/src/mcp/server/streamable_http.py +++ b/src/mcp/server/streamable_http.py @@ -180,6 +180,8 @@ def __init__( ] = {} self._sse_stream_writers: dict[RequestId, MemoryObjectSendStream[dict[str, str]]] = {} self._terminated = False + # Idle timeout cancel scope; managed by the session manager. + self.idle_scope: anyio.CancelScope | None = None @property def is_terminated(self) -> bool: @@ -773,8 +775,12 @@ async def terminate(self) -> None: """Terminate the current session, closing all streams. Once terminated, all requests with this session ID will receive 404 Not Found. + Calling this method multiple times is safe (idempotent). """ + if self._terminated: + return + self._terminated = True logger.info(f"Terminating session: {self.mcp_session_id}") diff --git a/src/mcp/server/streamable_http_manager.py b/src/mcp/server/streamable_http_manager.py index 6a1672417..89e14b356 100644 --- a/src/mcp/server/streamable_http_manager.py +++ b/src/mcp/server/streamable_http_manager.py @@ -38,6 +38,7 @@ class StreamableHTTPSessionManager: 2. Resumability via an optional event store 3. Connection management and lifecycle 4. Request handling and transport setup + 5. Idle session cleanup via optional timeout Important: Only one StreamableHTTPSessionManager instance should be created per application. The instance cannot be reused after its run() context has @@ -55,6 +56,15 @@ class StreamableHTTPSessionManager: security_settings: Optional transport security settings. retry_interval: Retry interval in milliseconds to suggest to clients in SSE retry field. Used for SSE polling behavior. + session_idle_timeout: Optional idle timeout in seconds for stateful sessions. + If set, sessions that receive no HTTP requests for this + duration will be automatically terminated and removed. + When retry_interval is also set, the effective idle + threshold is at least ``retry_interval / 1000 * 3`` to + avoid prematurely reaping sessions that are simply + waiting for SSE polling reconnections. Default is None + (no timeout). A value of 1800 (30 minutes) is + recommended for most deployments. """ def __init__( @@ -65,13 +75,20 @@ def __init__( stateless: bool = False, security_settings: TransportSecuritySettings | None = None, retry_interval: int | None = None, + session_idle_timeout: float | None = None, ): + if session_idle_timeout is not None and session_idle_timeout <= 0: + raise ValueError("session_idle_timeout must be a positive number of seconds") + if stateless and session_idle_timeout is not None: + raise ValueError("session_idle_timeout is not supported in stateless mode") + self.app = app self.event_store = event_store self.json_response = json_response self.stateless = stateless self.security_settings = security_settings self.retry_interval = retry_interval + self.session_idle_timeout = session_idle_timeout # Session tracking (only used if not stateless) self._session_creation_lock = anyio.Lock() @@ -114,6 +131,7 @@ async def lifespan(app: Starlette) -> AsyncIterator[None]: # Store the task group for later use self._task_group = tg logger.info("StreamableHTTP session manager started") + try: yield # Let the application run finally: @@ -219,6 +237,9 @@ async def _handle_stateful_request( if request_mcp_session_id is not None and request_mcp_session_id in self._server_instances: # pragma: no cover transport = self._server_instances[request_mcp_session_id] logger.debug("Session already exists, handling request directly") + # Push back idle deadline on activity + if transport.idle_scope is not None: + transport.idle_scope.deadline = anyio.current_time() + self._effective_idle_timeout() await transport.handle_request(scope, receive, send) return @@ -245,19 +266,36 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE read_stream, write_stream = streams task_status.started() try: - await self.app.run( - read_stream, - write_stream, - self.app.create_initialization_options(), - stateless=False, # Stateful mode - ) + # Use a cancel scope for idle timeout — when the + # deadline passes the scope cancels app.run() and + # execution continues after the ``with`` block. + # Incoming requests push the deadline forward. + idle_scope = anyio.CancelScope() + if self.session_idle_timeout is not None: + timeout = self._effective_idle_timeout() + idle_scope.deadline = anyio.current_time() + timeout + http_transport.idle_scope = idle_scope + + with idle_scope: + await self.app.run( + read_stream, + write_stream, + self.app.create_initialization_options(), + stateless=False, + ) + + if idle_scope.cancelled_caught: + session_id = http_transport.mcp_session_id + logger.info(f"Session {session_id} idle timeout") + if session_id is not None: # pragma: no branch + self._server_instances.pop(session_id, None) + await http_transport.terminate() except Exception as e: logger.error( f"Session {http_transport.mcp_session_id} crashed: {e}", exc_info=True, ) finally: - # Only remove from instances if not terminated if ( # pragma: no branch http_transport.mcp_session_id and http_transport.mcp_session_id in self._server_instances @@ -295,3 +333,12 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE media_type="application/json", ) await response(scope, receive, send) + + def _effective_idle_timeout(self) -> float: + """Compute the effective idle timeout, accounting for retry_interval.""" + assert self.session_idle_timeout is not None + timeout = self.session_idle_timeout + if self.retry_interval is not None: + retry_seconds = self.retry_interval / 1000.0 + timeout = max(timeout, retry_seconds * 3) + return timeout diff --git a/tests/issues/test_1283_idle_session_cleanup.py b/tests/issues/test_1283_idle_session_cleanup.py new file mode 100644 index 000000000..42638ee22 --- /dev/null +++ b/tests/issues/test_1283_idle_session_cleanup.py @@ -0,0 +1,286 @@ +"""Test for issue #1283 - Memory leak from idle sessions never being cleaned up. + +Without an idle timeout mechanism, sessions created via StreamableHTTPSessionManager +persist indefinitely in ``_server_instances`` even after the client disconnects. +Over time this leaks memory. + +The ``session_idle_timeout`` parameter on ``StreamableHTTPSessionManager`` allows +the manager to automatically terminate and remove sessions that have been idle for +longer than the configured duration. +""" + +import time +from collections.abc import Callable, Coroutine +from typing import Any + +import anyio +import pytest +from starlette.types import Message, Scope + +from mcp.server.lowlevel import Server +from mcp.server.streamable_http import MCP_SESSION_ID_HEADER, StreamableHTTPServerTransport +from mcp.server.streamable_http_manager import StreamableHTTPSessionManager + + +def _make_scope() -> Scope: + return { + "type": "http", + "method": "POST", + "path": "/mcp", + "headers": [(b"content-type", b"application/json")], + } + + +async def _mock_receive() -> Message: # pragma: no cover + return {"type": "http.request", "body": b"", "more_body": False} + + +def _make_send(sent: list[Message]) -> Callable[[Message], Coroutine[Any, Any, None]]: + async def mock_send(message: Message) -> None: + sent.append(message) + + return mock_send + + +def _extract_session_id(sent_messages: list[Message]) -> str: + for msg in sent_messages: + if msg["type"] == "http.response.start": # pragma: no branch + for name, value in msg.get("headers", []): # pragma: no branch + if name.decode().lower() == MCP_SESSION_ID_HEADER.lower(): # pragma: no branch + return value.decode() + raise AssertionError("Session ID not found in response headers") # pragma: no cover + + +@pytest.mark.anyio +async def test_idle_session_is_reaped(): + """Session should be removed from _server_instances after idle timeout.""" + app = Server("test-idle-reap") + manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=0.15) + + async with manager.run(): + sent: list[Message] = [] + await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent)) + session_id = _extract_session_id(sent) + + assert session_id in manager._server_instances + + # Wait for the cancel scope deadline to fire + await anyio.sleep(0.4) + + assert session_id not in manager._server_instances + + +@pytest.mark.anyio +async def test_activity_resets_idle_timer(): + """Requests during the timeout window should prevent the session from being reaped.""" + app = Server("test-idle-reset") + manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=0.3) + + async with manager.run(): + sent: list[Message] = [] + await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent)) + session_id = _extract_session_id(sent) + + # Simulate ongoing activity by pushing back the idle scope deadline + transport = manager._server_instances[session_id] + assert transport.idle_scope is not None + for _ in range(4): + await anyio.sleep(0.1) + transport.idle_scope.deadline = anyio.current_time() + 0.3 + + # Session should still be alive because we kept it active + assert session_id in manager._server_instances + + # Now stop activity and let the timeout expire + await anyio.sleep(0.6) + + assert session_id not in manager._server_instances + + +@pytest.mark.anyio +async def test_multiple_sessions_reaped_independently(): + """Each session tracks its own idle timeout independently.""" + app = Server("test-multi-idle") + manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=0.15) + + async with manager.run(): + sent1: list[Message] = [] + await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent1)) + session_id_1 = _extract_session_id(sent1) + + await anyio.sleep(0.05) + sent2: list[Message] = [] + await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent2)) + session_id_2 = _extract_session_id(sent2) + + assert session_id_1 in manager._server_instances + assert session_id_2 in manager._server_instances + + # After enough time, both should be reaped + await anyio.sleep(0.4) + + assert session_id_1 not in manager._server_instances + assert session_id_2 not in manager._server_instances + + +def test_session_idle_timeout_rejects_negative(): + """session_idle_timeout must be a positive number.""" + with pytest.raises(ValueError, match="positive number"): + StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=-1) + + +def test_session_idle_timeout_rejects_zero(): + """session_idle_timeout must be a positive number.""" + with pytest.raises(ValueError, match="positive number"): + StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=0) + + +def test_session_idle_timeout_rejects_stateless(): + """session_idle_timeout is not supported in stateless mode.""" + with pytest.raises(ValueError, match="not supported in stateless"): + StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=30, stateless=True) + + +@pytest.mark.anyio +async def test_terminate_idempotency(): + """Calling terminate() multiple times should be safe.""" + transport = StreamableHTTPServerTransport(mcp_session_id="test-idempotent") + + async with transport.connect(): + await transport.terminate() + assert transport.is_terminated + + # Second call should be a no-op (no exception) + await transport.terminate() + assert transport.is_terminated + + +@pytest.mark.anyio +async def test_idle_timeout_with_retry_interval(): + """When retry_interval is set, effective timeout should account for polling gaps.""" + app = Server("test-retry-interval") + + # retry_interval = 5000ms = 5s -> retry_seconds * 3 = 15s + # session_idle_timeout = 1s -> effective = max(1, 15) = 15 + manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=1.0, retry_interval=5000) + assert manager._effective_idle_timeout() == 15.0 + + # When retry_interval is small, session_idle_timeout should dominate + manager2 = StreamableHTTPSessionManager(app=app, session_idle_timeout=10.0, retry_interval=100) + assert manager2._effective_idle_timeout() == 10.0 + + # No retry_interval -> raw timeout + manager3 = StreamableHTTPSessionManager(app=app, session_idle_timeout=5.0) + assert manager3._effective_idle_timeout() == 5.0 + + +@pytest.mark.anyio +async def test_no_idle_timeout_sessions_persist(): + """When session_idle_timeout is None (default), sessions persist indefinitely.""" + app = Server("test-no-timeout") + manager = StreamableHTTPSessionManager(app=app) + + async with manager.run(): + sent: list[Message] = [] + await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent)) + session_id = _extract_session_id(sent) + + await anyio.sleep(0.3) + assert session_id in manager._server_instances + + +@pytest.mark.anyio +async def test_run_server_exits_promptly_after_idle_timeout(): + """The run_server task must exit shortly after the idle timeout fires.""" + app = Server("test-lifecycle") + + task_exited = anyio.Event() + exit_timestamp: list[float] = [] + original_run = app.run + + async def instrumented_run(*args: Any, **kwargs: Any) -> None: + try: + await original_run(*args, **kwargs) + finally: + exit_timestamp.append(time.monotonic()) + task_exited.set() + + app.run = instrumented_run # type: ignore[assignment] + + idle_timeout = 0.5 + manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=idle_timeout) + + async with manager.run(): + sent: list[Message] = [] + await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent)) + session_id = _extract_session_id(sent) + assert session_id in manager._server_instances + + pre_reap_time = time.monotonic() + + with anyio.fail_after(idle_timeout * 4): + await task_exited.wait() + + assert len(exit_timestamp) == 1 + total_elapsed = exit_timestamp[0] - pre_reap_time + assert total_elapsed < idle_timeout * 3, ( + f"run_server task took {total_elapsed:.3f}s to exit; expected < {idle_timeout * 3:.1f}s" + ) + assert session_id not in manager._server_instances + + +@pytest.mark.anyio +async def test_run_server_finally_block_runs_after_terminate(): + """Verify that the finally block in run_server executes after terminate().""" + app = Server("test-finally") + + lifecycle_events: list[str] = [] + original_run = app.run + + async def instrumented_run(*args: Any, **kwargs: Any) -> None: + lifecycle_events.append("run_entered") + try: + await original_run(*args, **kwargs) + finally: + lifecycle_events.append("run_exited") + + app.run = instrumented_run # type: ignore[assignment] + + manager = StreamableHTTPSessionManager(app=app) + + async with manager.run(): + sent: list[Message] = [] + await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent)) + session_id = _extract_session_id(sent) + transport = manager._server_instances[session_id] + + assert "run_entered" in lifecycle_events + assert "run_exited" not in lifecycle_events + + await transport.terminate() + + with anyio.fail_after(3.0): + while "run_exited" not in lifecycle_events: + await anyio.sleep(0.01) + + assert "run_exited" in lifecycle_events + + +@pytest.mark.anyio +async def test_idle_timeout_end_to_end(): + """End-to-end: idle timeout causes session cleanup with a real Server.""" + app = Server("test-e2e") + idle_timeout = 0.3 + manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=idle_timeout) + + async with manager.run(): + sent: list[Message] = [] + await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent)) + session_id = _extract_session_id(sent) + assert session_id in manager._server_instances + + with anyio.fail_after(idle_timeout + 1.0): + while session_id in manager._server_instances: + await anyio.sleep(0.05) + + assert session_id not in manager._server_instances From 7345bd7161bf9176952a43b7ef6aa69dd03ec4f1 Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Mon, 9 Feb 2026 14:13:50 +0000 Subject: [PATCH 2/3] Clean up idle timeout implementation - Remove unrelated blank line between logger.info and try block - Only create CancelScope when session_idle_timeout is set, keeping the no-timeout path identical to the original code - Add docstring to _effective_idle_timeout explaining the 3x retry_interval multiplier rationale --- src/mcp/server/streamable_http_manager.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/mcp/server/streamable_http_manager.py b/src/mcp/server/streamable_http_manager.py index 89e14b356..bbd967bd4 100644 --- a/src/mcp/server/streamable_http_manager.py +++ b/src/mcp/server/streamable_http_manager.py @@ -131,7 +131,6 @@ async def lifespan(app: Starlette) -> AsyncIterator[None]: # Store the task group for later use self._task_group = tg logger.info("StreamableHTTP session manager started") - try: yield # Let the application run finally: @@ -335,7 +334,14 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE await response(scope, receive, send) def _effective_idle_timeout(self) -> float: - """Compute the effective idle timeout, accounting for retry_interval.""" + """Compute the effective idle timeout, accounting for retry_interval. + + When SSE retry_interval is configured, clients periodically reconnect + to resume the event stream. A gap of up to ``retry_interval`` between + connections is normal, not a sign of idleness. We use a 3x multiplier + to tolerate up to two consecutive missed polls (network jitter, slow + client) before considering the session idle. + """ assert self.session_idle_timeout is not None timeout = self.session_idle_timeout if self.retry_interval is not None: From 2c9e60cc9a2105bf8198337c23e1f52a1ca50a5f Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Mon, 9 Feb 2026 14:37:42 +0000 Subject: [PATCH 3/3] Remove _effective_idle_timeout, use session_idle_timeout directly The helper silently clamped the user's configured timeout to retry_interval * 3, which is surprising. Users should set a timeout that suits their deployment. Updated the docstring to note that the timeout should comfortably exceed retry_interval when both are set. --- src/mcp/server/streamable_http_manager.py | 34 +++++-------------- .../issues/test_1283_idle_session_cleanup.py | 19 ----------- 2 files changed, 8 insertions(+), 45 deletions(-) diff --git a/src/mcp/server/streamable_http_manager.py b/src/mcp/server/streamable_http_manager.py index bbd967bd4..12156df59 100644 --- a/src/mcp/server/streamable_http_manager.py +++ b/src/mcp/server/streamable_http_manager.py @@ -59,12 +59,11 @@ class StreamableHTTPSessionManager: session_idle_timeout: Optional idle timeout in seconds for stateful sessions. If set, sessions that receive no HTTP requests for this duration will be automatically terminated and removed. - When retry_interval is also set, the effective idle - threshold is at least ``retry_interval / 1000 * 3`` to - avoid prematurely reaping sessions that are simply - waiting for SSE polling reconnections. Default is None - (no timeout). A value of 1800 (30 minutes) is - recommended for most deployments. + When retry_interval is also configured, ensure the idle + timeout comfortably exceeds the retry interval to avoid + reaping sessions during normal SSE polling gaps. + Default is None (no timeout). A value of 1800 + (30 minutes) is recommended for most deployments. """ def __init__( @@ -237,8 +236,8 @@ async def _handle_stateful_request( transport = self._server_instances[request_mcp_session_id] logger.debug("Session already exists, handling request directly") # Push back idle deadline on activity - if transport.idle_scope is not None: - transport.idle_scope.deadline = anyio.current_time() + self._effective_idle_timeout() + if transport.idle_scope is not None and self.session_idle_timeout is not None: + transport.idle_scope.deadline = anyio.current_time() + self.session_idle_timeout await transport.handle_request(scope, receive, send) return @@ -271,8 +270,7 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE # Incoming requests push the deadline forward. idle_scope = anyio.CancelScope() if self.session_idle_timeout is not None: - timeout = self._effective_idle_timeout() - idle_scope.deadline = anyio.current_time() + timeout + idle_scope.deadline = anyio.current_time() + self.session_idle_timeout http_transport.idle_scope = idle_scope with idle_scope: @@ -332,19 +330,3 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE media_type="application/json", ) await response(scope, receive, send) - - def _effective_idle_timeout(self) -> float: - """Compute the effective idle timeout, accounting for retry_interval. - - When SSE retry_interval is configured, clients periodically reconnect - to resume the event stream. A gap of up to ``retry_interval`` between - connections is normal, not a sign of idleness. We use a 3x multiplier - to tolerate up to two consecutive missed polls (network jitter, slow - client) before considering the session idle. - """ - assert self.session_idle_timeout is not None - timeout = self.session_idle_timeout - if self.retry_interval is not None: - retry_seconds = self.retry_interval / 1000.0 - timeout = max(timeout, retry_seconds * 3) - return timeout diff --git a/tests/issues/test_1283_idle_session_cleanup.py b/tests/issues/test_1283_idle_session_cleanup.py index 42638ee22..9a9193786 100644 --- a/tests/issues/test_1283_idle_session_cleanup.py +++ b/tests/issues/test_1283_idle_session_cleanup.py @@ -155,25 +155,6 @@ async def test_terminate_idempotency(): assert transport.is_terminated -@pytest.mark.anyio -async def test_idle_timeout_with_retry_interval(): - """When retry_interval is set, effective timeout should account for polling gaps.""" - app = Server("test-retry-interval") - - # retry_interval = 5000ms = 5s -> retry_seconds * 3 = 15s - # session_idle_timeout = 1s -> effective = max(1, 15) = 15 - manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=1.0, retry_interval=5000) - assert manager._effective_idle_timeout() == 15.0 - - # When retry_interval is small, session_idle_timeout should dominate - manager2 = StreamableHTTPSessionManager(app=app, session_idle_timeout=10.0, retry_interval=100) - assert manager2._effective_idle_timeout() == 10.0 - - # No retry_interval -> raw timeout - manager3 = StreamableHTTPSessionManager(app=app, session_idle_timeout=5.0) - assert manager3._effective_idle_timeout() == 5.0 - - @pytest.mark.anyio async def test_no_idle_timeout_sessions_persist(): """When session_idle_timeout is None (default), sessions persist indefinitely."""