Skip to content

Commit 49fe538

Browse files
committed
fix: propagate SSE stream errors to waiting requests
Fixes #1401. Also fixes #1789 (closed as duplicate). When an SSE read timeout occurs during a StreamableHTTP POST request, the pending send_request call hangs indefinitely. The transport catches the exception but never sends an error back through the read stream, leaving the caller blocked on response_stream_reader.receive() with nothing to receive. This fix propagates SSE stream failures as JSONRPCError to the waiting request, keeping failures isolated to the affected request rather than tearing down the entire session. Changes: - _handle_sse_response now sends JSONRPCError when SSE stream ends without a complete response - _handle_reconnection returns bool to indicate success/failure - handle_get_stream tracks received_events to properly count reconnection attempts - _default_message_handler logs warnings for unhandled exceptions
1 parent 161834d commit 49fe538

2 files changed

Lines changed: 31 additions & 15 deletions

File tree

src/mcp/client/session.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ async def __call__(
5757
async def _default_message_handler(
5858
message: RequestResponder[types.ServerRequest, types.ClientResult] | types.ServerNotification | Exception,
5959
) -> None:
60+
if isinstance(message, Exception):
61+
logger.warning("Unhandled exception in message handler: %s", message)
6062
await anyio.lowlevel.checkpoint()
6163

6264

src/mcp/client/streamable_http.py

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747

4848
# Reconnection defaults
4949
DEFAULT_RECONNECTION_DELAY_MS = 1000 # 1 second fallback when server doesn't provide retry
50-
MAX_RECONNECTION_ATTEMPTS = 2 # Max retry attempts before giving up
50+
MAX_RECONNECTION_ATTEMPTS = 5 # Max retry attempts before giving up
5151

5252

5353
class StreamableHTTPError(Exception):
@@ -197,6 +197,7 @@ async def handle_get_stream(self, client: httpx.AsyncClient, read_stream_writer:
197197
event_source.response.raise_for_status()
198198
logger.debug("GET SSE connection established")
199199

200+
received_events = False
200201
async for sse in event_source.aiter_sse():
201202
# Track last event ID for reconnection
202203
if sse.id:
@@ -205,10 +206,12 @@ async def handle_get_stream(self, client: httpx.AsyncClient, read_stream_writer:
205206
if sse.retry is not None:
206207
retry_interval_ms = sse.retry
207208

209+
received_events = True
208210
await self._handle_sse_event(sse, read_stream_writer)
209211

210-
# Stream ended normally (server closed) - reset attempt counter
211-
attempt = 0
212+
# Only reset attempts if we actually received events;
213+
# empty connections count toward MAX_RECONNECTION_ATTEMPTS
214+
attempt = 0 if received_events else attempt + 1
212215

213216
except Exception: # pragma: lax no cover
214217
logger.debug("GET stream error", exc_info=True)
@@ -364,25 +367,36 @@ async def _handle_sse_response(
364367
await response.aclose()
365368
return # Normal completion, no reconnect needed
366369
except Exception:
367-
logger.debug("SSE stream ended", exc_info=True) # pragma: no cover
370+
logger.debug("SSE stream error", exc_info=True)
368371

369-
# Stream ended without response - reconnect if we received an event with ID
370-
if last_event_id is not None: # pragma: no branch
372+
# Stream ended without a complete response — attempt reconnection if possible
373+
if last_event_id is not None:
371374
logger.info("SSE stream disconnected, reconnecting...")
372-
await self._handle_reconnection(ctx, last_event_id, retry_interval_ms)
375+
if await self._handle_reconnection(ctx, last_event_id, retry_interval_ms):
376+
return # Reconnection delivered the response
377+
378+
# No response delivered — unblock the waiting request with an error
379+
error_data = ErrorData(code=INTERNAL_ERROR, message="SSE stream ended without a response")
380+
error_msg = SessionMessage(JSONRPCError(jsonrpc="2.0", id=original_request_id, error=error_data))
381+
await ctx.read_stream_writer.send(error_msg)
373382

374383
async def _handle_reconnection(
375384
self,
376385
ctx: RequestContext,
377386
last_event_id: str,
378387
retry_interval_ms: int | None = None,
379388
attempt: int = 0,
380-
) -> None:
381-
"""Reconnect with Last-Event-ID to resume stream after server disconnect."""
389+
) -> bool:
390+
"""Reconnect with Last-Event-ID to resume stream after server disconnect.
391+
392+
Returns:
393+
True if the response was successfully delivered, False if max
394+
reconnection attempts were exceeded without delivering a response.
395+
"""
382396
# Bail if max retries exceeded
383-
if attempt >= MAX_RECONNECTION_ATTEMPTS: # pragma: no cover
397+
if attempt >= MAX_RECONNECTION_ATTEMPTS:
384398
logger.debug(f"Max reconnection attempts ({MAX_RECONNECTION_ATTEMPTS}) exceeded")
385-
return
399+
return False
386400

387401
# Always wait - use server value or default
388402
delay_ms = retry_interval_ms if retry_interval_ms is not None else DEFAULT_RECONNECTION_DELAY_MS
@@ -419,15 +433,15 @@ async def _handle_reconnection(
419433
)
420434
if is_complete:
421435
await event_source.response.aclose()
422-
return
436+
return True
423437

424-
# Stream ended again without response - reconnect again (reset attempt counter)
438+
# Stream ended again without response - reconnect again
425439
logger.info("SSE stream disconnected, reconnecting...")
426-
await self._handle_reconnection(ctx, reconnect_last_event_id, reconnect_retry_ms, 0)
440+
return await self._handle_reconnection(ctx, reconnect_last_event_id, reconnect_retry_ms, attempt + 1)
427441
except Exception as e: # pragma: no cover
428442
logger.debug(f"Reconnection failed: {e}")
429443
# Try to reconnect again if we still have an event ID
430-
await self._handle_reconnection(ctx, last_event_id, retry_interval_ms, attempt + 1)
444+
return await self._handle_reconnection(ctx, last_event_id, retry_interval_ms, attempt + 1)
431445

432446
async def post_writer(
433447
self,

0 commit comments

Comments
 (0)