Skip to content

Commit 87c28b1

Browse files
feat: add idle timeout for StreamableHTTP sessions
Add session_idle_timeout parameter to StreamableHTTPSessionManager that enables automatic cleanup of idle sessions, fixing the memory leak described in issue #1283. Key design decisions: - Idle timeout logic lives in the session manager, not the transport. The manager already owns the session lifecycle, so this is the natural place for reaping. The transport stays unaware of timeout logic. - A background reaper task scans _server_instances periodically and terminates sessions that have been idle longer than the threshold. - When retry_interval (SSE polling) is configured, the effective timeout is at least retry_interval_seconds * 3 to avoid reaping sessions that are simply between polling reconnections. - terminate() on the transport is now idempotent (early return if already terminated), making it safe to call from both the reaper and explicit DELETE requests. Based on the approach from PR #1159 by @hopeful0, reworked to: - Move idle tracking to the session manager instead of the transport - Remove __aenter__/__aexit__ context manager and condition variables - Account for retry_interval in the idle threshold - Add comprehensive unit and integration tests Github-Issue: #1283 Reported-by: hopeful0
1 parent aca6b0d commit 87c28b1

File tree

3 files changed

+246
-0
lines changed

3 files changed

+246
-0
lines changed

src/mcp/server/streamable_http.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -773,8 +773,12 @@ async def terminate(self) -> None:
773773
"""Terminate the current session, closing all streams.
774774
775775
Once terminated, all requests with this session ID will receive 404 Not Found.
776+
Calling this method multiple times is safe (idempotent).
776777
"""
777778

779+
if self._terminated:
780+
return
781+
778782
self._terminated = True
779783
logger.info(f"Terminating session: {self.mcp_session_id}")
780784

src/mcp/server/streamable_http_manager.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ class StreamableHTTPSessionManager:
3838
2. Resumability via an optional event store
3939
3. Connection management and lifecycle
4040
4. Request handling and transport setup
41+
5. Idle session cleanup via optional timeout
4142
4243
Important: Only one StreamableHTTPSessionManager instance should be created
4344
per application. The instance cannot be reused after its run() context has
@@ -55,6 +56,21 @@ class StreamableHTTPSessionManager:
5556
security_settings: Optional transport security settings.
5657
retry_interval: Retry interval in milliseconds to suggest to clients in SSE
5758
retry field. Used for SSE polling behavior.
59+
session_idle_timeout: Optional idle timeout in seconds for stateful sessions.
60+
If set, sessions that receive no HTTP requests for this
61+
duration will be automatically terminated and removed.
62+
When retry_interval is also set, the effective idle
63+
threshold is at least ``retry_interval_seconds * 3`` to
64+
avoid prematurely reaping sessions that are simply
65+
waiting for SSE polling reconnections. Default is None
66+
(no timeout).
67+
68+
Note: The idle timer is based on incoming HTTP requests
69+
(POST, GET, DELETE), not on whether SSE connections are
70+
open. If clients maintain long-lived GET SSE streams
71+
without sending other requests, set this value higher
72+
than the longest expected SSE connection lifetime to
73+
avoid premature reaping.
5874
"""
5975

6076
def __init__(
@@ -65,17 +81,23 @@ def __init__(
6581
stateless: bool = False,
6682
security_settings: TransportSecuritySettings | None = None,
6783
retry_interval: int | None = None,
84+
session_idle_timeout: float | None = None,
6885
):
86+
if session_idle_timeout is not None and session_idle_timeout <= 0:
87+
raise ValueError("session_idle_timeout must be a positive number of seconds")
88+
6989
self.app = app
7090
self.event_store = event_store
7191
self.json_response = json_response
7292
self.stateless = stateless
7393
self.security_settings = security_settings
7494
self.retry_interval = retry_interval
95+
self.session_idle_timeout = session_idle_timeout
7596

7697
# Session tracking (only used if not stateless)
7798
self._session_creation_lock = anyio.Lock()
7899
self._server_instances: dict[str, StreamableHTTPServerTransport] = {}
100+
self._last_activity: dict[str, float] = {}
79101

80102
# The task group will be set during lifespan
81103
self._task_group = None
@@ -114,6 +136,11 @@ async def lifespan(app: Starlette) -> AsyncIterator[None]:
114136
# Store the task group for later use
115137
self._task_group = tg
116138
logger.info("StreamableHTTP session manager started")
139+
140+
# Start idle session reaper if timeout is configured
141+
if self.session_idle_timeout is not None:
142+
tg.start_soon(self._idle_session_reaper)
143+
117144
try:
118145
yield # Let the application run
119146
finally:
@@ -123,6 +150,7 @@ async def lifespan(app: Starlette) -> AsyncIterator[None]:
123150
self._task_group = None
124151
# Clear any remaining server instances
125152
self._server_instances.clear()
153+
self._last_activity.clear()
126154

127155
async def handle_request(
128156
self,
@@ -219,6 +247,8 @@ async def _handle_stateful_request(
219247
if request_mcp_session_id is not None and request_mcp_session_id in self._server_instances: # pragma: no cover
220248
transport = self._server_instances[request_mcp_session_id]
221249
logger.debug("Session already exists, handling request directly")
250+
# Update activity timestamp for idle timeout tracking
251+
self._last_activity[request_mcp_session_id] = anyio.current_time()
222252
await transport.handle_request(scope, receive, send)
223253
return
224254

@@ -237,6 +267,7 @@ async def _handle_stateful_request(
237267

238268
assert http_transport.mcp_session_id is not None
239269
self._server_instances[http_transport.mcp_session_id] = http_transport
270+
self._last_activity[http_transport.mcp_session_id] = anyio.current_time()
240271
logger.info(f"Created new transport with session ID: {new_session_id}")
241272

242273
# Define the server runner
@@ -269,6 +300,7 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE
269300
"active instances."
270301
)
271302
del self._server_instances[http_transport.mcp_session_id]
303+
self._last_activity.pop(http_transport.mcp_session_id, None)
272304

273305
# Assert task group is not None for type checking
274306
assert self._task_group is not None
@@ -295,3 +327,43 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE
295327
media_type="application/json",
296328
)
297329
await response(scope, receive, send)
330+
331+
def _effective_idle_timeout(self) -> float:
332+
"""Compute the effective idle timeout, accounting for retry_interval.
333+
334+
When SSE polling is configured via ``retry_interval`` (milliseconds),
335+
the client may legitimately go quiet between polls. The idle threshold
336+
must be large enough so that normal polling gaps don't cause premature
337+
session reaping.
338+
"""
339+
assert self.session_idle_timeout is not None
340+
timeout = self.session_idle_timeout
341+
if self.retry_interval is not None:
342+
retry_seconds = self.retry_interval / 1000.0
343+
timeout = max(timeout, retry_seconds * 3)
344+
return timeout
345+
346+
async def _idle_session_reaper(self) -> None:
347+
"""Background task that periodically terminates idle sessions."""
348+
timeout = self._effective_idle_timeout()
349+
scan_interval = min(timeout / 2, 30.0)
350+
logger.info(f"Idle session reaper started (timeout={timeout}s, scan_interval={scan_interval}s)")
351+
352+
while True:
353+
await anyio.sleep(scan_interval)
354+
now = anyio.current_time()
355+
# Snapshot keys to avoid mutation during iteration
356+
for session_id in list(self._server_instances.keys()):
357+
last = self._last_activity.get(session_id)
358+
if last is None:
359+
continue # pragma: no cover
360+
if now - last > timeout:
361+
transport = self._server_instances.get(session_id)
362+
if transport is None:
363+
continue # pragma: no cover
364+
logger.info(
365+
f"Terminating idle session {session_id} (idle for {now - last:.1f}s, timeout={timeout}s)"
366+
)
367+
await transport.terminate()
368+
self._server_instances.pop(session_id, None)
369+
self._last_activity.pop(session_id, None)

tests/server/test_streamable_http_manager.py

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,3 +313,173 @@ async def mock_receive():
313313
assert error_data["id"] == "server-error"
314314
assert error_data["error"]["code"] == INVALID_REQUEST
315315
assert error_data["error"]["message"] == "Session not found"
316+
317+
318+
# --- Idle timeout tests ---
319+
320+
321+
def _make_scope() -> dict:
322+
return {
323+
"type": "http",
324+
"method": "POST",
325+
"path": "/mcp",
326+
"headers": [(b"content-type", b"application/json")],
327+
}
328+
329+
330+
async def _idle_mock_receive() -> Message: # pragma: no cover
331+
return {"type": "http.request", "body": b"", "more_body": False}
332+
333+
334+
def _make_send(sent: list[Message]):
335+
async def mock_send(message: Message) -> None:
336+
sent.append(message)
337+
338+
return mock_send
339+
340+
341+
def _extract_session_id(sent_messages: list[Message]) -> str:
342+
for msg in sent_messages:
343+
if msg["type"] == "http.response.start":
344+
for header_name, header_value in msg.get("headers", []):
345+
if header_name.decode().lower() == MCP_SESSION_ID_HEADER.lower():
346+
return header_value.decode()
347+
raise AssertionError("Session ID not found in response headers")
348+
349+
350+
def _make_blocking_run(stop_event: anyio.Event):
351+
"""Create a mock app.run that blocks until stop_event is set."""
352+
353+
async def blocking_run(*args, **kwargs): # type: ignore[no-untyped-def]
354+
await stop_event.wait()
355+
356+
return blocking_run
357+
358+
359+
@pytest.mark.anyio
360+
async def test_stateful_session_cleanup_on_idle_timeout():
361+
"""Session should be terminated and removed after being idle for the configured timeout."""
362+
app = Server("test-idle-timeout")
363+
stop = anyio.Event()
364+
app.run = _make_blocking_run(stop) # type: ignore[assignment]
365+
366+
manager = StreamableHTTPSessionManager(
367+
app=app,
368+
session_idle_timeout=0.15,
369+
)
370+
371+
async with manager.run():
372+
sent: list[Message] = []
373+
await manager.handle_request(_make_scope(), _idle_mock_receive, _make_send(sent))
374+
session_id = _extract_session_id(sent)
375+
376+
assert session_id in manager._server_instances
377+
assert session_id in manager._last_activity
378+
379+
# Wait for reaper to fire
380+
await anyio.sleep(0.4)
381+
382+
assert session_id not in manager._server_instances
383+
assert session_id not in manager._last_activity
384+
385+
stop.set()
386+
387+
388+
@pytest.mark.anyio
389+
async def test_idle_timeout_reset_on_activity():
390+
"""Activity (requests) should reset the idle timer and prevent premature reaping."""
391+
app = Server("test-idle-reset")
392+
stop = anyio.Event()
393+
app.run = _make_blocking_run(stop) # type: ignore[assignment]
394+
395+
manager = StreamableHTTPSessionManager(
396+
app=app,
397+
session_idle_timeout=0.3,
398+
)
399+
400+
async with manager.run():
401+
sent: list[Message] = []
402+
await manager.handle_request(_make_scope(), _idle_mock_receive, _make_send(sent))
403+
session_id = _extract_session_id(sent)
404+
405+
# Simulate ongoing activity by bumping the timestamp
406+
for _ in range(4):
407+
await anyio.sleep(0.1)
408+
manager._last_activity[session_id] = anyio.current_time()
409+
410+
# Session should still be alive
411+
assert session_id in manager._server_instances
412+
413+
# Now stop activity and let timeout expire
414+
await anyio.sleep(0.6)
415+
assert session_id not in manager._server_instances
416+
417+
stop.set()
418+
419+
420+
@pytest.mark.anyio
421+
async def test_terminate_idempotency():
422+
"""Calling terminate() multiple times should be safe."""
423+
transport = StreamableHTTPServerTransport(
424+
mcp_session_id="test-idempotent",
425+
)
426+
427+
# Set up minimal streams so terminate() has something to close
428+
async with transport.connect():
429+
await transport.terminate()
430+
assert transport.is_terminated
431+
432+
# Second call should be a no-op (no exception)
433+
await transport.terminate()
434+
assert transport.is_terminated
435+
436+
437+
@pytest.mark.anyio
438+
async def test_idle_timeout_with_retry_interval():
439+
"""When retry_interval is set, effective timeout should account for polling gaps."""
440+
app = Server("test-retry-interval")
441+
442+
# retry_interval = 5000ms = 5s → retry_seconds * 3 = 15s
443+
# session_idle_timeout = 1s → effective = max(1, 15) = 15
444+
manager = StreamableHTTPSessionManager(
445+
app=app,
446+
session_idle_timeout=1.0,
447+
retry_interval=5000,
448+
)
449+
assert manager._effective_idle_timeout() == 15.0
450+
451+
# When retry_interval is small, session_idle_timeout should dominate
452+
manager2 = StreamableHTTPSessionManager(
453+
app=app,
454+
session_idle_timeout=10.0,
455+
retry_interval=100, # 0.1s → 0.3s, less than 10
456+
)
457+
assert manager2._effective_idle_timeout() == 10.0
458+
459+
# No retry_interval → raw timeout
460+
manager3 = StreamableHTTPSessionManager(
461+
app=app,
462+
session_idle_timeout=5.0,
463+
)
464+
assert manager3._effective_idle_timeout() == 5.0
465+
466+
467+
@pytest.mark.anyio
468+
async def test_no_idle_timeout_no_reaper():
469+
"""When session_idle_timeout is None, no reaper task should run and sessions persist."""
470+
app = Server("test-no-timeout")
471+
stop = anyio.Event()
472+
app.run = _make_blocking_run(stop) # type: ignore[assignment]
473+
474+
manager = StreamableHTTPSessionManager(app=app)
475+
476+
async with manager.run():
477+
sent: list[Message] = []
478+
await manager.handle_request(_make_scope(), _idle_mock_receive, _make_send(sent))
479+
session_id = _extract_session_id(sent)
480+
481+
# Wait a while — session should never be reaped
482+
await anyio.sleep(0.3)
483+
assert session_id in manager._server_instances
484+
485+
stop.set()

0 commit comments

Comments
 (0)