@@ -38,6 +38,7 @@ class StreamableHTTPSessionManager:
3838 2. Resumability via an optional event store
3939 3. Connection management and lifecycle
4040 4. Request handling and transport setup
41+ 5. Idle session cleanup via optional timeout
4142
4243 Important: Only one StreamableHTTPSessionManager instance should be created
4344 per application. The instance cannot be reused after its run() context has
@@ -55,6 +56,22 @@ class StreamableHTTPSessionManager:
5556 security_settings: Optional transport security settings.
5657 retry_interval: Retry interval in milliseconds to suggest to clients in SSE
5758 retry field. Used for SSE polling behavior.
59+ session_idle_timeout: Optional idle timeout in seconds for stateful sessions.
60+ If set, sessions that receive no HTTP requests for this
61+ duration will be automatically terminated and removed.
62+ When retry_interval is also set, the effective idle
63+ threshold is at least ``retry_interval_seconds * 3`` to
64+ avoid prematurely reaping sessions that are simply
65+ waiting for SSE polling reconnections. Default is None
66+ (no timeout). A value of 1800 (30 minutes) is
67+ recommended for most deployments.
68+
69+ Note: The idle timer is based on incoming HTTP requests
70+ (POST, GET, DELETE), not on whether SSE connections are
71+ open. If clients maintain long-lived GET SSE streams
72+ without sending other requests, set this value higher
73+ than the longest expected SSE connection lifetime to
74+ avoid premature reaping.
5875 """
5976
6077 def __init__ (
@@ -65,17 +82,23 @@ def __init__(
6582 stateless : bool = False ,
6683 security_settings : TransportSecuritySettings | None = None ,
6784 retry_interval : int | None = None ,
85+ session_idle_timeout : float | None = None ,
6886 ):
87+ if session_idle_timeout is not None and session_idle_timeout <= 0 :
88+ raise ValueError ("session_idle_timeout must be a positive number of seconds" )
89+
6990 self .app = app
7091 self .event_store = event_store
7192 self .json_response = json_response
7293 self .stateless = stateless
7394 self .security_settings = security_settings
7495 self .retry_interval = retry_interval
96+ self .session_idle_timeout = session_idle_timeout
7597
7698 # Session tracking (only used if not stateless)
7799 self ._session_creation_lock = anyio .Lock ()
78100 self ._server_instances : dict [str , StreamableHTTPServerTransport ] = {}
101+ self ._last_activity : dict [str , float ] = {}
79102
80103 # The task group will be set during lifespan
81104 self ._task_group = None
@@ -114,6 +137,11 @@ async def lifespan(app: Starlette) -> AsyncIterator[None]:
114137 # Store the task group for later use
115138 self ._task_group = tg
116139 logger .info ("StreamableHTTP session manager started" )
140+
141+ # Start idle session reaper if timeout is configured
142+ if self .session_idle_timeout is not None :
143+ tg .start_soon (self ._idle_session_reaper )
144+
117145 try :
118146 yield # Let the application run
119147 finally :
@@ -123,6 +151,7 @@ async def lifespan(app: Starlette) -> AsyncIterator[None]:
123151 self ._task_group = None
124152 # Clear any remaining server instances
125153 self ._server_instances .clear ()
154+ self ._last_activity .clear ()
126155
127156 async def handle_request (
128157 self ,
@@ -219,6 +248,8 @@ async def _handle_stateful_request(
219248 if request_mcp_session_id is not None and request_mcp_session_id in self ._server_instances : # pragma: no cover
220249 transport = self ._server_instances [request_mcp_session_id ]
221250 logger .debug ("Session already exists, handling request directly" )
251+ # Update activity timestamp for idle timeout tracking
252+ self ._last_activity [request_mcp_session_id ] = anyio .current_time ()
222253 await transport .handle_request (scope , receive , send )
223254 return
224255
@@ -237,6 +268,7 @@ async def _handle_stateful_request(
237268
238269 assert http_transport .mcp_session_id is not None
239270 self ._server_instances [http_transport .mcp_session_id ] = http_transport
271+ self ._last_activity [http_transport .mcp_session_id ] = anyio .current_time ()
240272 logger .info (f"Created new transport with session ID: { new_session_id } " )
241273
242274 # Define the server runner
@@ -269,6 +301,7 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE
269301 "active instances."
270302 )
271303 del self ._server_instances [http_transport .mcp_session_id ]
304+ self ._last_activity .pop (http_transport .mcp_session_id , None )
272305
273306 # Assert task group is not None for type checking
274307 assert self ._task_group is not None
@@ -295,3 +328,43 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE
295328 media_type = "application/json" ,
296329 )
297330 await response (scope , receive , send )
331+
332+ def _effective_idle_timeout (self ) -> float :
333+ """Compute the effective idle timeout, accounting for retry_interval.
334+
335+ When SSE polling is configured via ``retry_interval`` (milliseconds),
336+ the client may legitimately go quiet between polls. The idle threshold
337+ must be large enough so that normal polling gaps don't cause premature
338+ session reaping.
339+ """
340+ assert self .session_idle_timeout is not None
341+ timeout = self .session_idle_timeout
342+ if self .retry_interval is not None :
343+ retry_seconds = self .retry_interval / 1000.0
344+ timeout = max (timeout , retry_seconds * 3 )
345+ return timeout
346+
347+ async def _idle_session_reaper (self ) -> None :
348+ """Background task that periodically terminates idle sessions."""
349+ timeout = self ._effective_idle_timeout ()
350+ scan_interval = min (timeout / 2 , 30.0 )
351+ logger .info (f"Idle session reaper started (timeout={ timeout } s, scan_interval={ scan_interval } s)" )
352+
353+ while True :
354+ await anyio .sleep (scan_interval )
355+ now = anyio .current_time ()
356+ # Snapshot keys to avoid mutation during iteration
357+ for session_id in list (self ._server_instances .keys ()):
358+ last = self ._last_activity .get (session_id )
359+ if last is None :
360+ continue # pragma: no cover
361+ if now - last > timeout :
362+ transport = self ._server_instances .get (session_id )
363+ if transport is None :
364+ continue # pragma: no cover
365+ logger .info (
366+ f"Terminating idle session { session_id } (idle for { now - last :.1f} s, timeout={ timeout } s)"
367+ )
368+ await transport .terminate ()
369+ self ._server_instances .pop (session_id , None )
370+ self ._last_activity .pop (session_id , None )
0 commit comments