Skip to content

Commit 84ac697

Browse files
Add idle session timeout to StreamableHTTPSessionManager
Sessions created via StreamableHTTPSessionManager persist indefinitely in _server_instances even after the client disconnects, leaking memory over time. Add a session_idle_timeout parameter that automatically terminates and removes sessions that receive no HTTP requests for the configured duration. Each session manages its own lifetime via an anyio CancelScope deadline — no background reaper task needed. Incoming requests push the deadline forward to keep active sessions alive. Github-Issue: #1283
1 parent 239d682 commit 84ac697

File tree

3 files changed

+311
-7
lines changed

3 files changed

+311
-7
lines changed

src/mcp/server/streamable_http.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,8 @@ def __init__(
169169
] = {}
170170
self._sse_stream_writers: dict[RequestId, MemoryObjectSendStream[dict[str, str]]] = {}
171171
self._terminated = False
172+
# Idle timeout cancel scope; managed by the session manager.
173+
self.idle_scope: anyio.CancelScope | None = None
172174

173175
@property
174176
def is_terminated(self) -> bool:

src/mcp/server/streamable_http_manager.py

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ class StreamableHTTPSessionManager:
3939
2. Resumability via an optional event store
4040
3. Connection management and lifecycle
4141
4. Request handling and transport setup
42+
5. Idle session cleanup via optional timeout
4243
4344
Important: Only one StreamableHTTPSessionManager instance should be created
4445
per application. The instance cannot be reused after its run() context has
@@ -56,6 +57,14 @@ class StreamableHTTPSessionManager:
5657
security_settings: Optional transport security settings.
5758
retry_interval: Retry interval in milliseconds to suggest to clients in SSE
5859
retry field. Used for SSE polling behavior.
60+
session_idle_timeout: Optional idle timeout in seconds for stateful sessions.
61+
If set, sessions that receive no HTTP requests for this
62+
duration will be automatically terminated and removed.
63+
When retry_interval is also configured, ensure the idle
64+
timeout comfortably exceeds the retry interval to avoid
65+
reaping sessions during normal SSE polling gaps.
66+
Default is None (no timeout). A value of 1800
67+
(30 minutes) is recommended for most deployments.
5968
"""
6069

6170
def __init__(
@@ -66,13 +75,20 @@ def __init__(
6675
stateless: bool = False,
6776
security_settings: TransportSecuritySettings | None = None,
6877
retry_interval: int | None = None,
78+
session_idle_timeout: float | None = None,
6979
):
80+
if session_idle_timeout is not None and session_idle_timeout <= 0:
81+
raise ValueError("session_idle_timeout must be a positive number of seconds")
82+
if stateless and session_idle_timeout is not None:
83+
raise ValueError("session_idle_timeout is not supported in stateless mode")
84+
7085
self.app = app
7186
self.event_store = event_store
7287
self.json_response = json_response
7388
self.stateless = stateless
7489
self.security_settings = security_settings
7590
self.retry_interval = retry_interval
91+
self.session_idle_timeout = session_idle_timeout
7692

7793
# Session tracking (only used if not stateless)
7894
self._session_creation_lock = anyio.Lock()
@@ -184,6 +200,9 @@ async def _handle_stateful_request(self, scope: Scope, receive: Receive, send: S
184200
if request_mcp_session_id is not None and request_mcp_session_id in self._server_instances: # pragma: no cover
185201
transport = self._server_instances[request_mcp_session_id]
186202
logger.debug("Session already exists, handling request directly")
203+
# Push back idle deadline on activity
204+
if transport.idle_scope is not None and self.session_idle_timeout is not None:
205+
transport.idle_scope.deadline = anyio.current_time() + self.session_idle_timeout
187206
await transport.handle_request(scope, receive, send)
188207
return
189208

@@ -210,16 +229,32 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE
210229
read_stream, write_stream = streams
211230
task_status.started()
212231
try:
213-
await self.app.run(
214-
read_stream,
215-
write_stream,
216-
self.app.create_initialization_options(),
217-
stateless=False, # Stateful mode
218-
)
232+
# Use a cancel scope for idle timeout — when the
233+
# deadline passes the scope cancels app.run() and
234+
# execution continues after the ``with`` block.
235+
# Incoming requests push the deadline forward.
236+
idle_scope = anyio.CancelScope()
237+
if self.session_idle_timeout is not None:
238+
idle_scope.deadline = anyio.current_time() + self.session_idle_timeout
239+
http_transport.idle_scope = idle_scope
240+
241+
with idle_scope:
242+
await self.app.run(
243+
read_stream,
244+
write_stream,
245+
self.app.create_initialization_options(),
246+
stateless=False,
247+
)
248+
249+
if idle_scope.cancelled_caught:
250+
session_id = http_transport.mcp_session_id
251+
logger.info(f"Session {session_id} idle timeout")
252+
if session_id is not None: # pragma: no branch
253+
self._server_instances.pop(session_id, None)
254+
await http_transport.terminate()
219255
except Exception:
220256
logger.exception(f"Session {http_transport.mcp_session_id} crashed")
221257
finally:
222-
# Only remove from instances if not terminated
223258
if ( # pragma: no branch
224259
http_transport.mcp_session_id
225260
and http_transport.mcp_session_id in self._server_instances
Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
"""Test for issue #1283 - Memory leak from idle sessions never being cleaned up.
2+
3+
Without an idle timeout mechanism, sessions created via StreamableHTTPSessionManager
4+
persist indefinitely in ``_server_instances`` even after the client disconnects.
5+
Over time this leaks memory.
6+
7+
The ``session_idle_timeout`` parameter on ``StreamableHTTPSessionManager`` allows
8+
the manager to automatically terminate and remove sessions that have been idle for
9+
longer than the configured duration.
10+
"""
11+
12+
import time
13+
from collections.abc import Callable, Coroutine
14+
from typing import Any
15+
16+
import anyio
17+
import pytest
18+
from starlette.types import Message, Scope
19+
20+
from mcp.server.lowlevel import Server
21+
from mcp.server.streamable_http import MCP_SESSION_ID_HEADER, StreamableHTTPServerTransport
22+
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
23+
24+
25+
def _make_scope() -> Scope:
26+
return {
27+
"type": "http",
28+
"method": "POST",
29+
"path": "/mcp",
30+
"headers": [(b"content-type", b"application/json")],
31+
}
32+
33+
34+
async def _mock_receive() -> Message: # pragma: no cover
35+
return {"type": "http.request", "body": b"", "more_body": False}
36+
37+
38+
def _make_send(sent: list[Message]) -> Callable[[Message], Coroutine[Any, Any, None]]:
39+
async def mock_send(message: Message) -> None:
40+
sent.append(message)
41+
42+
return mock_send
43+
44+
45+
def _extract_session_id(sent_messages: list[Message]) -> str:
46+
for msg in sent_messages:
47+
if msg["type"] == "http.response.start": # pragma: no branch
48+
for name, value in msg.get("headers", []): # pragma: no branch
49+
if name.decode().lower() == MCP_SESSION_ID_HEADER.lower(): # pragma: no branch
50+
return value.decode()
51+
raise AssertionError("Session ID not found in response headers") # pragma: no cover
52+
53+
54+
@pytest.mark.anyio
55+
async def test_idle_session_is_reaped():
56+
"""Session should be removed from _server_instances after idle timeout."""
57+
app = Server("test-idle-reap")
58+
manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=0.15)
59+
60+
async with manager.run():
61+
sent: list[Message] = []
62+
await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent))
63+
session_id = _extract_session_id(sent)
64+
65+
assert session_id in manager._server_instances
66+
67+
# Wait for the cancel scope deadline to fire
68+
await anyio.sleep(0.4)
69+
70+
assert session_id not in manager._server_instances
71+
72+
73+
@pytest.mark.anyio
74+
async def test_activity_resets_idle_timer():
75+
"""Requests during the timeout window should prevent the session from being reaped."""
76+
app = Server("test-idle-reset")
77+
manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=0.3)
78+
79+
async with manager.run():
80+
sent: list[Message] = []
81+
await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent))
82+
session_id = _extract_session_id(sent)
83+
84+
# Simulate ongoing activity by pushing back the idle scope deadline
85+
transport = manager._server_instances[session_id]
86+
assert transport.idle_scope is not None
87+
for _ in range(4):
88+
await anyio.sleep(0.1)
89+
transport.idle_scope.deadline = anyio.current_time() + 0.3
90+
91+
# Session should still be alive because we kept it active
92+
assert session_id in manager._server_instances
93+
94+
# Now stop activity and let the timeout expire
95+
await anyio.sleep(0.6)
96+
97+
assert session_id not in manager._server_instances
98+
99+
100+
@pytest.mark.anyio
101+
async def test_multiple_sessions_reaped_independently():
102+
"""Each session tracks its own idle timeout independently."""
103+
app = Server("test-multi-idle")
104+
manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=0.15)
105+
106+
async with manager.run():
107+
sent1: list[Message] = []
108+
await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent1))
109+
session_id_1 = _extract_session_id(sent1)
110+
111+
await anyio.sleep(0.05)
112+
sent2: list[Message] = []
113+
await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent2))
114+
session_id_2 = _extract_session_id(sent2)
115+
116+
assert session_id_1 in manager._server_instances
117+
assert session_id_2 in manager._server_instances
118+
119+
# After enough time, both should be reaped
120+
await anyio.sleep(0.4)
121+
122+
assert session_id_1 not in manager._server_instances
123+
assert session_id_2 not in manager._server_instances
124+
125+
126+
def test_session_idle_timeout_rejects_negative():
127+
"""session_idle_timeout must be a positive number."""
128+
with pytest.raises(ValueError, match="positive number"):
129+
StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=-1)
130+
131+
132+
def test_session_idle_timeout_rejects_zero():
133+
"""session_idle_timeout must be a positive number."""
134+
with pytest.raises(ValueError, match="positive number"):
135+
StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=0)
136+
137+
138+
def test_session_idle_timeout_rejects_stateless():
139+
"""session_idle_timeout is not supported in stateless mode."""
140+
with pytest.raises(ValueError, match="not supported in stateless"):
141+
StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=30, stateless=True)
142+
143+
144+
@pytest.mark.anyio
145+
async def test_terminate_idempotency():
146+
"""Calling terminate() multiple times should be safe."""
147+
transport = StreamableHTTPServerTransport(mcp_session_id="test-idempotent")
148+
149+
async with transport.connect():
150+
await transport.terminate()
151+
assert transport.is_terminated
152+
153+
# Second call should be a no-op (no exception)
154+
await transport.terminate()
155+
assert transport.is_terminated
156+
157+
158+
@pytest.mark.anyio
159+
async def test_no_idle_timeout_sessions_persist():
160+
"""When session_idle_timeout is None (default), sessions persist indefinitely."""
161+
app = Server("test-no-timeout")
162+
manager = StreamableHTTPSessionManager(app=app)
163+
164+
async with manager.run():
165+
sent: list[Message] = []
166+
await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent))
167+
session_id = _extract_session_id(sent)
168+
169+
await anyio.sleep(0.3)
170+
assert session_id in manager._server_instances
171+
172+
173+
@pytest.mark.anyio
174+
async def test_run_server_exits_promptly_after_idle_timeout():
175+
"""The run_server task must exit shortly after the idle timeout fires."""
176+
app = Server("test-lifecycle")
177+
178+
task_exited = anyio.Event()
179+
exit_timestamp: list[float] = []
180+
original_run = app.run
181+
182+
async def instrumented_run(*args: Any, **kwargs: Any) -> None:
183+
try:
184+
await original_run(*args, **kwargs)
185+
finally:
186+
exit_timestamp.append(time.monotonic())
187+
task_exited.set()
188+
189+
app.run = instrumented_run # type: ignore[assignment]
190+
191+
idle_timeout = 0.5
192+
manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=idle_timeout)
193+
194+
async with manager.run():
195+
sent: list[Message] = []
196+
await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent))
197+
session_id = _extract_session_id(sent)
198+
assert session_id in manager._server_instances
199+
200+
pre_reap_time = time.monotonic()
201+
202+
with anyio.fail_after(idle_timeout * 4):
203+
await task_exited.wait()
204+
205+
assert len(exit_timestamp) == 1
206+
total_elapsed = exit_timestamp[0] - pre_reap_time
207+
assert total_elapsed < idle_timeout * 3, (
208+
f"run_server task took {total_elapsed:.3f}s to exit; expected < {idle_timeout * 3:.1f}s"
209+
)
210+
assert session_id not in manager._server_instances
211+
212+
213+
@pytest.mark.anyio
214+
async def test_run_server_finally_block_runs_after_terminate():
215+
"""Verify that the finally block in run_server executes after terminate()."""
216+
app = Server("test-finally")
217+
218+
lifecycle_events: list[str] = []
219+
original_run = app.run
220+
221+
async def instrumented_run(*args: Any, **kwargs: Any) -> None:
222+
lifecycle_events.append("run_entered")
223+
try:
224+
await original_run(*args, **kwargs)
225+
finally:
226+
lifecycle_events.append("run_exited")
227+
228+
app.run = instrumented_run # type: ignore[assignment]
229+
230+
manager = StreamableHTTPSessionManager(app=app)
231+
232+
async with manager.run():
233+
sent: list[Message] = []
234+
await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent))
235+
session_id = _extract_session_id(sent)
236+
transport = manager._server_instances[session_id]
237+
238+
assert "run_entered" in lifecycle_events
239+
assert "run_exited" not in lifecycle_events
240+
241+
await transport.terminate()
242+
243+
with anyio.fail_after(3.0):
244+
while "run_exited" not in lifecycle_events:
245+
await anyio.sleep(0.01)
246+
247+
assert "run_exited" in lifecycle_events
248+
249+
250+
@pytest.mark.anyio
251+
async def test_idle_timeout_end_to_end():
252+
"""End-to-end: idle timeout causes session cleanup with a real Server."""
253+
app = Server("test-e2e")
254+
idle_timeout = 0.3
255+
manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=idle_timeout)
256+
257+
async with manager.run():
258+
sent: list[Message] = []
259+
await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent))
260+
session_id = _extract_session_id(sent)
261+
assert session_id in manager._server_instances
262+
263+
with anyio.fail_after(idle_timeout + 1.0):
264+
while session_id in manager._server_instances:
265+
await anyio.sleep(0.05)
266+
267+
assert session_id not in manager._server_instances

0 commit comments

Comments
 (0)