Skip to content

Commit 57d2578

Browse files
committed
fix(streamable-http): fail request when resumption can't complete
1 parent 616476f commit 57d2578

2 files changed

Lines changed: 96 additions & 5 deletions

File tree

src/mcp/client/streamable_http.py

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from mcp.shared._httpx_utils import create_mcp_http_client
2020
from mcp.shared.message import ClientMessageMetadata, SessionMessage
2121
from mcp.types import (
22+
CONNECTION_CLOSED,
2223
INTERNAL_ERROR,
2324
INVALID_REQUEST,
2425
PARSE_ERROR,
@@ -366,10 +367,16 @@ async def _handle_sse_response(
366367
except Exception:
367368
logger.debug("SSE stream ended", exc_info=True) # pragma: no cover
368369

369-
# Stream ended without response - reconnect if we received an event with ID
370-
if last_event_id is not None: # pragma: no branch
371-
logger.info("SSE stream disconnected, reconnecting...")
372-
await self._handle_reconnection(ctx, last_event_id, retry_interval_ms)
370+
# Stream ended without a terminal response/error. If the server provided an event id,
371+
# try resuming; otherwise fail the request instead of hanging forever.
372+
if last_event_id is None:
373+
error_data = ErrorData(code=CONNECTION_CLOSED, message="SSE stream disconnected before response completed")
374+
error_msg = SessionMessage(JSONRPCError(jsonrpc="2.0", id=original_request_id, error=error_data))
375+
await ctx.read_stream_writer.send(error_msg)
376+
return
377+
378+
logger.info("SSE stream disconnected, reconnecting...")
379+
await self._handle_reconnection(ctx, last_event_id, retry_interval_ms)
373380

374381
async def _handle_reconnection(
375382
self,
@@ -380,7 +387,19 @@ async def _handle_reconnection(
380387
) -> None:
381388
"""Reconnect with Last-Event-ID to resume stream after server disconnect."""
382389
# Bail if max retries exceeded
383-
if attempt >= MAX_RECONNECTION_ATTEMPTS: # pragma: no cover
390+
if attempt >= MAX_RECONNECTION_ATTEMPTS:
391+
original_request_id = None
392+
if isinstance(ctx.session_message.message, JSONRPCRequest): # pragma: no branch
393+
original_request_id = ctx.session_message.message.id
394+
395+
if original_request_id is not None:
396+
error_data = ErrorData(
397+
code=CONNECTION_CLOSED,
398+
message="SSE stream disconnected and could not be resumed",
399+
data={"last_event_id": last_event_id},
400+
)
401+
error_msg = SessionMessage(JSONRPCError(jsonrpc="2.0", id=original_request_id, error=error_data))
402+
await ctx.read_stream_writer.send(error_msg)
384403
logger.debug(f"Max reconnection attempts ({MAX_RECONNECTION_ATTEMPTS}) exceeded")
385404
return
386405

tests/interaction/transports/test_hosting_resume.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,78 @@ async def call() -> None:
296296
assert received == snapshot(["before close", "after close"])
297297

298298

299+
async def test_a_call_whose_stream_closes_and_cannot_be_resumed_fails_instead_of_hanging() -> None:
300+
"""If a resumable response stream disconnects and the server session is gone, the client fails
301+
the request instead of hanging forever.
302+
303+
The server closes the call's SSE stream after emitting one related notification. The test then
304+
deletes the active server-side session to force the client's reconnect GET to return 404.
305+
Without a terminal response/error on the read stream, ClientSession.send_request waits forever
306+
(read timeout defaults to None). The transport must surface a request-scoped error when it
307+
gives up reconnecting.
308+
"""
309+
reconnect_attempted = anyio.Event()
310+
allow_exit = anyio.Event()
311+
done = anyio.Event()
312+
raised: list[BaseException] = []
313+
manager_ref = None
314+
deleted_session = False
315+
316+
mcp = MCPServer("resumable")
317+
318+
@mcp.tool()
319+
async def interrupt(ctx: Context) -> str:
320+
await ctx.info("before close")
321+
await ctx.close_sse_stream()
322+
await allow_exit.wait()
323+
return "unreachable"
324+
325+
async def record_request(request: httpx.Request) -> None:
326+
nonlocal deleted_session
327+
if request.method != "GET":
328+
return
329+
if request.headers.get("last-event-id") is None:
330+
return
331+
reconnect_attempted.set()
332+
if deleted_session or manager_ref is None:
333+
return
334+
session_ids = list(manager_ref._server_instances.keys())
335+
if session_ids: # pragma: no branch
336+
del manager_ref._server_instances[session_ids[0]]
337+
deleted_session = True
338+
339+
async with mounted_app(mcp, event_store=SequencedEventStore(), retry_interval=0, on_request=record_request) as (
340+
http,
341+
manager,
342+
):
343+
manager_ref = manager
344+
with anyio.fail_after(5): # pragma: no branch
345+
async with (
346+
streamable_http_client(f"{BASE_URL}/mcp", http_client=http, terminate_on_close=False) as (r, w),
347+
ClientSession(r, w) as session,
348+
anyio.create_task_group() as tg,
349+
):
350+
await session.initialize()
351+
352+
async def call() -> None:
353+
try:
354+
await session.call_tool("interrupt", {})
355+
except BaseException as exc:
356+
raised.append(exc)
357+
finally:
358+
done.set()
359+
360+
tg.start_soon(call)
361+
await reconnect_attempted.wait()
362+
await done.wait()
363+
allow_exit.set()
364+
tg.cancel_scope.cancel()
365+
366+
assert len(raised) == 1
367+
assert isinstance(raised[0], Exception)
368+
assert "disconnected" in str(raised[0]).lower()
369+
370+
299371
@requirement("client-transport:http:resume-stream-api")
300372
async def test_a_captured_resumption_token_replays_missed_messages_on_a_new_connection() -> None:
301373
"""A resumption token captured via on_resumption_token_update on one connection lets a fresh

0 commit comments

Comments
 (0)