Skip to content

Commit 23a6157

Browse files
feat: add idle timeout for StreamableHTTP sessions (#1994)
1 parent 6745894 commit 23a6157

File tree

4 files changed

+135
-20
lines changed

4 files changed

+135
-20
lines changed

CLAUDE.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ This document contains critical information about working with this codebase. Fo
2424
- Coverage: test edge cases and errors
2525
- New features require tests
2626
- Bug fixes require regression tests
27+
- Avoid `anyio.sleep()` with a fixed duration to wait for async operations. Instead:
28+
- Use `anyio.Event` — set it in the callback/handler, `await event.wait()` in the test
29+
- For stream messages, use `await stream.receive()` instead of `sleep()` + `receive_nowait()`
30+
- Exception: `sleep()` is appropriate when testing time-based features (e.g., timeouts)
31+
- Wrap indefinite waits (`event.wait()`, `stream.receive()`) in `anyio.fail_after(5)` to prevent hangs
2732

2833
- For commits fixing bugs or adding features based on user reports add:
2934

src/mcp/server/streamable_http.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,8 @@ def __init__(
180180
] = {}
181181
self._sse_stream_writers: dict[RequestId, MemoryObjectSendStream[dict[str, str]]] = {}
182182
self._terminated = False
183+
# Idle timeout cancel scope; managed by the session manager.
184+
self.idle_scope: anyio.CancelScope | None = None
183185

184186
@property
185187
def is_terminated(self) -> bool:
@@ -773,8 +775,12 @@ async def terminate(self) -> None:
773775
"""Terminate the current session, closing all streams.
774776
775777
Once terminated, all requests with this session ID will receive 404 Not Found.
778+
Calling this method multiple times is safe (idempotent).
776779
"""
777780

781+
if self._terminated: # pragma: no cover
782+
return
783+
778784
self._terminated = True
779785
logger.info(f"Terminating session: {self.mcp_session_id}")
780786

src/mcp/server/streamable_http_manager.py

Lines changed: 47 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -38,23 +38,28 @@ class StreamableHTTPSessionManager:
3838
2. Resumability via an optional event store
3939
3. Connection management and lifecycle
4040
4. Request handling and transport setup
41+
5. Idle session cleanup via optional timeout
4142
4243
Important: Only one StreamableHTTPSessionManager instance should be created
4344
per application. The instance cannot be reused after its run() context has
4445
completed. If you need to restart the manager, create a new instance.
4546
4647
Args:
4748
app: The MCP server instance
48-
event_store: Optional event store for resumability support.
49-
If provided, enables resumable connections where clients
50-
can reconnect and receive missed events.
51-
If None, sessions are still tracked but not resumable.
49+
event_store: Optional event store for resumability support. If provided, enables resumable connections
50+
where clients can reconnect and receive missed events. If None, sessions are still tracked but not
51+
resumable.
5252
json_response: Whether to use JSON responses instead of SSE streams
53-
stateless: If True, creates a completely fresh transport for each request
54-
with no session tracking or state persistence between requests.
53+
stateless: If True, creates a completely fresh transport for each request with no session tracking or
54+
state persistence between requests.
5555
security_settings: Optional transport security settings.
56-
retry_interval: Retry interval in milliseconds to suggest to clients in SSE
57-
retry field. Used for SSE polling behavior.
56+
retry_interval: Retry interval in milliseconds to suggest to clients in SSE retry field. Used for SSE
57+
polling behavior.
58+
session_idle_timeout: Optional idle timeout in seconds for stateful sessions. If set, sessions that
59+
receive no HTTP requests for this duration will be automatically terminated and removed. When
60+
retry_interval is also configured, ensure the idle timeout comfortably exceeds the retry interval to
61+
avoid reaping sessions during normal SSE polling gaps. Default is None (no timeout). A value of 1800
62+
(30 minutes) is recommended for most deployments.
5863
"""
5964

6065
def __init__(
@@ -65,13 +70,20 @@ def __init__(
6570
stateless: bool = False,
6671
security_settings: TransportSecuritySettings | None = None,
6772
retry_interval: int | None = None,
73+
session_idle_timeout: float | None = None,
6874
):
75+
if session_idle_timeout is not None and session_idle_timeout <= 0:
76+
raise ValueError("session_idle_timeout must be a positive number of seconds")
77+
if stateless and session_idle_timeout is not None:
78+
raise RuntimeError("session_idle_timeout is not supported in stateless mode")
79+
6980
self.app = app
7081
self.event_store = event_store
7182
self.json_response = json_response
7283
self.stateless = stateless
7384
self.security_settings = security_settings
7485
self.retry_interval = retry_interval
86+
self.session_idle_timeout = session_idle_timeout
7587

7688
# Session tracking (only used if not stateless)
7789
self._session_creation_lock = anyio.Lock()
@@ -219,6 +231,9 @@ async def _handle_stateful_request(
219231
if request_mcp_session_id is not None and request_mcp_session_id in self._server_instances: # pragma: no cover
220232
transport = self._server_instances[request_mcp_session_id]
221233
logger.debug("Session already exists, handling request directly")
234+
# Push back idle deadline on activity
235+
if transport.idle_scope is not None and self.session_idle_timeout is not None:
236+
transport.idle_scope.deadline = anyio.current_time() + self.session_idle_timeout
222237
await transport.handle_request(scope, receive, send)
223238
return
224239

@@ -245,19 +260,31 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE
245260
read_stream, write_stream = streams
246261
task_status.started()
247262
try:
248-
await self.app.run(
249-
read_stream,
250-
write_stream,
251-
self.app.create_initialization_options(),
252-
stateless=False, # Stateful mode
253-
)
254-
except Exception as e:
255-
logger.error(
256-
f"Session {http_transport.mcp_session_id} crashed: {e}",
257-
exc_info=True,
258-
)
263+
# Use a cancel scope for idle timeout — when the
264+
# deadline passes the scope cancels app.run() and
265+
# execution continues after the ``with`` block.
266+
# Incoming requests push the deadline forward.
267+
idle_scope = anyio.CancelScope()
268+
if self.session_idle_timeout is not None:
269+
idle_scope.deadline = anyio.current_time() + self.session_idle_timeout
270+
http_transport.idle_scope = idle_scope
271+
272+
with idle_scope:
273+
await self.app.run(
274+
read_stream,
275+
write_stream,
276+
self.app.create_initialization_options(),
277+
stateless=False,
278+
)
279+
280+
if idle_scope.cancelled_caught:
281+
assert http_transport.mcp_session_id is not None
282+
logger.info(f"Session {http_transport.mcp_session_id} idle timeout")
283+
self._server_instances.pop(http_transport.mcp_session_id, None)
284+
await http_transport.terminate()
285+
except Exception:
286+
logger.exception(f"Session {http_transport.mcp_session_id} crashed")
259287
finally:
260-
# Only remove from instances if not terminated
261288
if ( # pragma: no branch
262289
http_transport.mcp_session_id
263290
and http_transport.mcp_session_id in self._server_instances

tests/server/test_streamable_http_manager.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,3 +313,80 @@ async def mock_receive():
313313
assert error_data["id"] == "server-error"
314314
assert error_data["error"]["code"] == INVALID_REQUEST
315315
assert error_data["error"]["message"] == "Session not found"
316+
317+
318+
@pytest.mark.anyio
319+
async def test_idle_session_is_reaped():
320+
"""After idle timeout fires, the session returns 404."""
321+
app = Server("test-idle-reap")
322+
manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=0.05)
323+
324+
async with manager.run():
325+
sent_messages: list[Message] = []
326+
327+
async def mock_send(message: Message):
328+
sent_messages.append(message)
329+
330+
scope = {
331+
"type": "http",
332+
"method": "POST",
333+
"path": "/mcp",
334+
"headers": [(b"content-type", b"application/json")],
335+
}
336+
337+
async def mock_receive(): # pragma: no cover
338+
return {"type": "http.request", "body": b"", "more_body": False}
339+
340+
await manager.handle_request(scope, mock_receive, mock_send)
341+
342+
session_id = None
343+
for msg in sent_messages: # pragma: no branch
344+
if msg["type"] == "http.response.start": # pragma: no branch
345+
for header_name, header_value in msg.get("headers", []): # pragma: no branch
346+
if header_name.decode().lower() == MCP_SESSION_ID_HEADER.lower():
347+
session_id = header_value.decode()
348+
break
349+
if session_id: # pragma: no branch
350+
break
351+
352+
assert session_id is not None, "Session ID not found in response headers"
353+
354+
# Wait for the 50ms idle timeout to fire and cleanup to complete
355+
await anyio.sleep(0.1)
356+
357+
# Verify via public API: old session ID now returns 404
358+
response_messages: list[Message] = []
359+
360+
async def capture_send(message: Message):
361+
response_messages.append(message)
362+
363+
scope_with_session = {
364+
"type": "http",
365+
"method": "POST",
366+
"path": "/mcp",
367+
"headers": [
368+
(b"content-type", b"application/json"),
369+
(b"mcp-session-id", session_id.encode()),
370+
],
371+
}
372+
373+
await manager.handle_request(scope_with_session, mock_receive, capture_send)
374+
375+
response_start = next(
376+
(msg for msg in response_messages if msg["type"] == "http.response.start"),
377+
None,
378+
)
379+
assert response_start is not None
380+
assert response_start["status"] == 404
381+
382+
383+
def test_session_idle_timeout_rejects_non_positive():
384+
with pytest.raises(ValueError, match="positive number"):
385+
StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=-1)
386+
with pytest.raises(ValueError, match="positive number"):
387+
StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=0)
388+
389+
390+
def test_session_idle_timeout_rejects_stateless():
391+
with pytest.raises(RuntimeError, match="not supported in stateless"):
392+
StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=30, stateless=True)

0 commit comments

Comments
 (0)