Skip to content

Commit f8b58ad

Browse files
feat: add idle timeout for StreamableHTTP sessions
Add a session_idle_timeout parameter to StreamableHTTPSessionManager that enables automatic cleanup of idle sessions, fixing the memory leak described in #1283. Each session with idle timeout configured starts a per-session watcher task that sleeps until the deadline and calls terminate() when idle. Incoming requests push back the deadline. No central background reaper is needed — each session manages its own lifecycle. - Per-session idle watcher runs alongside app.run in a task group - Effective timeout accounts for retry_interval to avoid premature reaping of SSE polling sessions - terminate() on the transport is now idempotent - Default is None (no timeout, fully backwards compatible) Github-Issue: #1283
1 parent bac2789 commit f8b58ad

File tree

3 files changed

+509
-6
lines changed

3 files changed

+509
-6
lines changed

src/mcp/server/streamable_http.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,8 @@ def __init__(
180180
] = {}
181181
self._sse_stream_writers: dict[RequestId, MemoryObjectSendStream[dict[str, str]]] = {}
182182
self._terminated = False
183+
# Idle timeout deadline (monotonic time); managed by the session manager.
184+
self.idle_deadline: float | None = None
183185

184186
@property
185187
def is_terminated(self) -> bool:
@@ -773,8 +775,12 @@ async def terminate(self) -> None:
773775
"""Terminate the current session, closing all streams.
774776
775777
Once terminated, all requests with this session ID will receive 404 Not Found.
778+
Calling this method multiple times is safe (idempotent).
776779
"""
777780

781+
if self._terminated:
782+
return
783+
778784
self._terminated = True
779785
logger.info(f"Terminating session: {self.mcp_session_id}")
780786

src/mcp/server/streamable_http_manager.py

Lines changed: 96 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from uuid import uuid4
1111

1212
import anyio
13+
import anyio.abc
1314
from anyio.abc import TaskStatus
1415
from starlette.requests import Request
1516
from starlette.responses import Response
@@ -38,6 +39,7 @@ class StreamableHTTPSessionManager:
3839
2. Resumability via an optional event store
3940
3. Connection management and lifecycle
4041
4. Request handling and transport setup
42+
5. Idle session cleanup via optional timeout
4143
4244
Important: Only one StreamableHTTPSessionManager instance should be created
4345
per application. The instance cannot be reused after its run() context has
@@ -55,6 +57,22 @@ class StreamableHTTPSessionManager:
5557
security_settings: Optional transport security settings.
5658
retry_interval: Retry interval in milliseconds to suggest to clients in SSE
5759
retry field. Used for SSE polling behavior.
60+
session_idle_timeout: Optional idle timeout in seconds for stateful sessions.
61+
If set, sessions that receive no HTTP requests for this
62+
duration will be automatically terminated and removed.
63+
When retry_interval is also set, the effective idle
64+
threshold is at least ``retry_interval / 1000 * 3`` to
65+
avoid prematurely reaping sessions that are simply
66+
waiting for SSE polling reconnections. Default is None
67+
(no timeout). A value of 1800 (30 minutes) is
68+
recommended for most deployments.
69+
70+
Note: The idle timer is based on incoming HTTP requests
71+
(POST, GET, DELETE), not on whether SSE connections are
72+
open. If clients maintain long-lived GET SSE streams
73+
without sending other requests, set this value higher
74+
than the longest expected SSE connection lifetime to
75+
avoid premature reaping.
5876
"""
5977

6078
def __init__(
@@ -65,13 +83,20 @@ def __init__(
6583
stateless: bool = False,
6684
security_settings: TransportSecuritySettings | None = None,
6785
retry_interval: int | None = None,
86+
session_idle_timeout: float | None = None,
6887
):
88+
if session_idle_timeout is not None and session_idle_timeout <= 0:
89+
raise ValueError("session_idle_timeout must be a positive number of seconds")
90+
if stateless and session_idle_timeout is not None:
91+
raise ValueError("session_idle_timeout is not supported in stateless mode")
92+
6993
self.app = app
7094
self.event_store = event_store
7195
self.json_response = json_response
7296
self.stateless = stateless
7397
self.security_settings = security_settings
7498
self.retry_interval = retry_interval
99+
self.session_idle_timeout = session_idle_timeout
75100

76101
# Session tracking (only used if not stateless)
77102
self._session_creation_lock = anyio.Lock()
@@ -114,6 +139,7 @@ async def lifespan(app: Starlette) -> AsyncIterator[None]:
114139
# Store the task group for later use
115140
self._task_group = tg
116141
logger.info("StreamableHTTP session manager started")
142+
117143
try:
118144
yield # Let the application run
119145
finally:
@@ -219,6 +245,9 @@ async def _handle_stateful_request(
219245
if request_mcp_session_id is not None and request_mcp_session_id in self._server_instances: # pragma: no cover
220246
transport = self._server_instances[request_mcp_session_id]
221247
logger.debug("Session already exists, handling request directly")
248+
# Push back idle deadline on activity
249+
if transport.idle_deadline is not None:
250+
transport.idle_deadline = anyio.current_time() + self._effective_idle_timeout()
222251
await transport.handle_request(scope, receive, send)
223252
return
224253

@@ -245,12 +274,35 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE
245274
read_stream, write_stream = streams
246275
task_status.started()
247276
try:
248-
await self.app.run(
249-
read_stream,
250-
write_stream,
251-
self.app.create_initialization_options(),
252-
stateless=False, # Stateful mode
253-
)
277+
# If idle timeout is configured, run app.run and idle
278+
# watcher concurrently — the watcher terminates the
279+
# transport when it goes idle, which causes app.run
280+
# to return via the closed read stream.
281+
if self.session_idle_timeout is not None:
282+
timeout = self._effective_idle_timeout()
283+
http_transport.idle_deadline = anyio.current_time() + timeout
284+
async with anyio.create_task_group() as session_tg:
285+
session_tg.start_soon(
286+
self._idle_watcher,
287+
http_transport,
288+
session_tg,
289+
)
290+
await self.app.run(
291+
read_stream,
292+
write_stream,
293+
self.app.create_initialization_options(),
294+
stateless=False,
295+
)
296+
# app.run returned normally (e.g. client sent
297+
# a clean shutdown) — cancel the watcher.
298+
session_tg.cancel_scope.cancel() # pragma: no cover
299+
else:
300+
await self.app.run(
301+
read_stream,
302+
write_stream,
303+
self.app.create_initialization_options(),
304+
stateless=False, # Stateful mode
305+
)
254306
except Exception as e:
255307
logger.error(
256308
f"Session {http_transport.mcp_session_id} crashed: {e}",
@@ -295,3 +347,41 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE
295347
media_type="application/json",
296348
)
297349
await response(scope, receive, send)
350+
351+
def _effective_idle_timeout(self) -> float:
352+
"""Compute the effective idle timeout, accounting for retry_interval.
353+
354+
When SSE polling is configured via ``retry_interval`` (milliseconds),
355+
the client may legitimately go quiet between polls. The idle threshold
356+
must be large enough so that normal polling gaps don't cause premature
357+
session reaping.
358+
"""
359+
assert self.session_idle_timeout is not None
360+
timeout = self.session_idle_timeout
361+
if self.retry_interval is not None:
362+
retry_seconds = self.retry_interval / 1000.0
363+
timeout = max(timeout, retry_seconds * 3)
364+
return timeout
365+
366+
async def _idle_watcher(
367+
self,
368+
transport: StreamableHTTPServerTransport,
369+
task_group: anyio.abc.TaskGroup,
370+
) -> None:
371+
"""Per-session task that terminates the transport when it goes idle."""
372+
while not transport.is_terminated: # pragma: no branch
373+
assert transport.idle_deadline is not None
374+
now = anyio.current_time()
375+
remaining = transport.idle_deadline - now
376+
if remaining > 0:
377+
await anyio.sleep(remaining)
378+
continue
379+
380+
# Deadline passed — terminate
381+
session_id = transport.mcp_session_id
382+
logger.info(f"Terminating idle session {session_id}")
383+
self._server_instances.pop(session_id, None) # type: ignore[arg-type]
384+
await transport.terminate()
385+
# Cancel the session task group so app.run exits
386+
task_group.cancel_scope.cancel()
387+
return

0 commit comments

Comments
 (0)