Skip to content

Commit 23caf52

Browse files
committed
test(streamable-http): cover disconnect without resumption anchor
1 parent 579ed01 commit 23caf52

2 files changed

Lines changed: 41 additions & 12 deletions

File tree

src/mcp/client/streamable_http.py

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -388,18 +388,15 @@ async def _handle_reconnection(
388388
"""Reconnect with Last-Event-ID to resume stream after server disconnect."""
389389
# Bail if max retries exceeded
390390
if attempt >= MAX_RECONNECTION_ATTEMPTS:
391-
original_request_id = None
392-
if isinstance(ctx.session_message.message, JSONRPCRequest): # pragma: no branch
393-
original_request_id = ctx.session_message.message.id
394-
395-
if original_request_id is not None:
396-
error_data = ErrorData(
397-
code=CONNECTION_CLOSED,
398-
message="SSE stream disconnected and could not be resumed",
399-
data={"last_event_id": last_event_id},
400-
)
401-
error_msg = SessionMessage(JSONRPCError(jsonrpc="2.0", id=original_request_id, error=error_data))
402-
await ctx.read_stream_writer.send(error_msg)
391+
assert isinstance(ctx.session_message.message, JSONRPCRequest)
392+
original_request_id = ctx.session_message.message.id
393+
error_data = ErrorData(
394+
code=CONNECTION_CLOSED,
395+
message="SSE stream disconnected and could not be resumed",
396+
data={"last_event_id": last_event_id},
397+
)
398+
error_msg = SessionMessage(JSONRPCError(jsonrpc="2.0", id=original_request_id, error=error_data))
399+
await ctx.read_stream_writer.send(error_msg)
403400
logger.debug(f"Max reconnection attempts ({MAX_RECONNECTION_ATTEMPTS}) exceeded")
404401
return
405402

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import httpx
2+
import pytest
3+
4+
from mcp.client.streamable_http import RequestContext, StreamableHTTPTransport
5+
from mcp.shared._context_streams import create_context_streams
6+
from mcp.shared.message import SessionMessage
7+
from mcp.types import CONNECTION_CLOSED, JSONRPCError, JSONRPCRequest
8+
9+
pytestmark = pytest.mark.anyio
10+
11+
12+
async def test_sse_response_disconnect_before_any_event_id_fails_request() -> None:
13+
transport = StreamableHTTPTransport("http://example.com/mcp")
14+
async with httpx.AsyncClient() as client:
15+
read_stream_writer, read_stream = create_context_streams[SessionMessage | Exception](1)
16+
request = JSONRPCRequest(jsonrpc="2.0", id=1, method="tools/call", params={"name": "noop", "arguments": {}})
17+
ctx = RequestContext(
18+
client=client,
19+
session_id=None,
20+
session_message=SessionMessage(request),
21+
metadata=None,
22+
read_stream_writer=read_stream_writer,
23+
)
24+
response = httpx.Response(200, headers={"content-type": "text/event-stream"}, content=b"")
25+
26+
async with read_stream_writer, read_stream:
27+
await transport._handle_sse_response(response, ctx)
28+
message = await read_stream.receive()
29+
30+
assert isinstance(message.message, JSONRPCError)
31+
assert message.message.id == 1
32+
assert message.message.error.code == CONNECTION_CLOSED

0 commit comments

Comments
 (0)