Skip to content

Commit 64daae5

Browse files
committed
fix: send priming event in GET handler to prevent hang
The GET request handler in streamable_http.py was missing the priming event that the POST handler sends. This caused GET requests to /mcp endpoint to hang indefinitely while waiting for messages. Fix: - Send a priming event immediately after establishing the SSE connection in standalone_sse_writer() - When event_store is configured and protocol >= 2025-11-25, use the proper priming event with resumability support - Otherwise, send a simple "open" event to confirm connection is established - Add anyio.ClosedResourceError handling for clean shutdown - Store SSE writer reference for close_standalone_sse_stream() to work Related: jlowin/fastmcp#532
1 parent 812a46a commit 64daae5

File tree

2 files changed

+114
-0
lines changed

2 files changed

+114
-0
lines changed

src/mcp/server/streamable_http.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -706,6 +706,12 @@ async def _handle_get_request(self, request: Request, send: Send) -> None: # pr
706706
# Create SSE stream
707707
sse_stream_writer, sse_stream_reader = anyio.create_memory_object_stream[dict[str, str]](0)
708708

709+
# Store writer reference so close_standalone_sse_stream() can close it
710+
self._sse_stream_writers[GET_STREAM_KEY] = sse_stream_writer
711+
712+
# Get protocol version from header for priming event decision
713+
protocol_version = request.headers.get(MCP_PROTOCOL_VERSION_HEADER, DEFAULT_NEGOTIATED_VERSION)
714+
709715
async def standalone_sse_writer():
710716
try:
711717
# Create a standalone message stream for server-initiated messages
@@ -714,6 +720,17 @@ async def standalone_sse_writer():
714720
standalone_stream_reader = self._request_streams[GET_STREAM_KEY][1]
715721

716722
async with sse_stream_writer, standalone_stream_reader:
723+
# Send an immediate event to establish connection and prevent hang
724+
# This is crucial for GET requests which have no initial data to send
725+
if self._event_store and protocol_version >= "2025-11-25":
726+
# Send proper priming event with resumability support
727+
await self._maybe_send_priming_event(GET_STREAM_KEY, sse_stream_writer, protocol_version)
728+
else:
729+
# Send a simple "open" event to confirm connection is established
730+
# Without this, GET requests hang waiting for data
731+
open_event: dict[str, str] = {"event": "open", "data": ""}
732+
await sse_stream_writer.send(open_event)
733+
717734
# Process messages from the standalone stream
718735
async for event_message in standalone_stream_reader:
719736
# For the standalone stream, we handle:
@@ -724,10 +741,14 @@ async def standalone_sse_writer():
724741
# Send the message via SSE
725742
event_data = self._create_event_data(event_message)
726743
await sse_stream_writer.send(event_data)
744+
except anyio.ClosedResourceError:
745+
# Expected when close_standalone_sse_stream() is called
746+
logger.debug("Standalone SSE stream closed by close_standalone_sse_stream()")
727747
except Exception:
728748
logger.exception("Error in standalone SSE writer")
729749
finally:
730750
logger.debug("Closing standalone SSE writer")
751+
self._sse_stream_writers.pop(GET_STREAM_KEY, None)
731752
await self._clean_up_memory_streams(GET_STREAM_KEY)
732753

733754
# Create and start EventSourceResponse

tests/shared/test_streamable_http.py

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2350,3 +2350,96 @@ async def test_streamable_http_client_preserves_custom_with_mcp_headers(
23502350

23512351
assert "content-type" in headers_data
23522352
assert headers_data["content-type"] == "application/json"
2353+
2354+
2355+
def test_get_request_receives_priming_event_with_event_store(
2356+
event_server: tuple[SimpleEventStore, str],
2357+
) -> None:
2358+
"""
2359+
Test that GET requests to /mcp receive a priming event immediately.
2360+
2361+
This test verifies the fix for the issue where GET requests would hang
2362+
because the standalone_sse_writer didn't send a priming event before
2363+
entering the message loop.
2364+
2365+
With event_store configured and protocol version >= 2025-11-25, the server
2366+
should send a priming event (empty data with event id) immediately after
2367+
establishing the SSE connection, preventing the client from hanging.
2368+
"""
2369+
event_store, server_url = event_server
2370+
mcp_url = f"{server_url}/mcp"
2371+
2372+
# Use latest protocol version (2025-11-25) to enable priming events
2373+
init_request_latest = {
2374+
"jsonrpc": "2.0",
2375+
"method": "initialize",
2376+
"params": {
2377+
"clientInfo": {"name": "test-client", "version": "1.0"},
2378+
"protocolVersion": "2025-11-25", # Must be >= 2025-11-25 for priming events
2379+
"capabilities": {},
2380+
},
2381+
"id": "init-1",
2382+
}
2383+
2384+
# First, initialize a session via POST
2385+
init_response = requests.post(
2386+
mcp_url,
2387+
headers={
2388+
"Accept": "application/json, text/event-stream",
2389+
"Content-Type": "application/json",
2390+
},
2391+
json=init_request_latest,
2392+
)
2393+
assert init_response.status_code == 200
2394+
2395+
# Get session ID
2396+
session_id = init_response.headers.get(MCP_SESSION_ID_HEADER)
2397+
assert session_id is not None
2398+
2399+
# Extract negotiated protocol version from SSE response
2400+
# Note: With event_store, the POST response includes priming event first (empty data)
2401+
# then the actual initialize response
2402+
negotiated_version = None
2403+
for line in init_response.text.splitlines():
2404+
if line.startswith("data: ") and line[6:].strip(): # Skip empty data (priming event)
2405+
try:
2406+
init_data = json.loads(line[6:])
2407+
if "result" in init_data and "protocolVersion" in init_data["result"]:
2408+
negotiated_version = init_data["result"]["protocolVersion"]
2409+
break
2410+
except json.JSONDecodeError:
2411+
continue
2412+
assert negotiated_version is not None, "Could not extract protocol version from init response"
2413+
2414+
# Now make a GET request to establish SSE stream with a short timeout
2415+
# Before the fix, this would hang indefinitely waiting for messages
2416+
# After the fix, we should get the priming event immediately
2417+
get_response = requests.get(
2418+
mcp_url,
2419+
headers={
2420+
"Accept": "text/event-stream",
2421+
MCP_SESSION_ID_HEADER: session_id,
2422+
MCP_PROTOCOL_VERSION_HEADER: negotiated_version,
2423+
},
2424+
stream=True,
2425+
timeout=3, # 3 second timeout - priming event should arrive immediately
2426+
)
2427+
2428+
assert get_response.status_code == 200
2429+
assert get_response.headers.get("Content-Type") == "text/event-stream"
2430+
2431+
# Try to read the first chunk from the stream - should be the priming event
2432+
# The priming event format is: "id: <event_id>\ndata: \n\n"
2433+
try:
2434+
# Read up to 1KB to get the priming event
2435+
priming_received = False
2436+
for chunk in get_response.iter_content(chunk_size=1024, decode_unicode=True):
2437+
if chunk and ("id:" in chunk or "data:" in chunk):
2438+
priming_received = True
2439+
break
2440+
2441+
assert priming_received, (
2442+
"GET request should receive priming event immediately with event_store configured"
2443+
)
2444+
finally:
2445+
get_response.close()

0 commit comments

Comments
 (0)