Skip to content

Commit 480c6d6

Browse files
kxbnbclaude
andcommitted
test: add regression test for EndOfStream in send_request (#1717)
Adds a test that reproduces the race condition where the per-request response stream is closed before send_request reads from it. Without the fix, EndOfStream propagates unhandled. With the fix, it's converted to MCPError with CONNECTION_CLOSED. The test closes the response stream's send side directly while the server connection stays open, bypassing the receive loop's graceful error injection. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 9a1a3c4 commit 480c6d6

File tree

1 file changed

+56
-0
lines changed

1 file changed

+56
-0
lines changed

tests/shared/test_session.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,3 +304,59 @@ async def mock_server():
304304
await ev_closed.wait()
305305
with anyio.fail_after(1): # pragma: no branch
306306
await ev_response.wait()
307+
308+
309+
@pytest.mark.anyio
310+
async def test_response_stream_closed_raises_mcp_error():
311+
"""Test that EndOfStream on the per-request response stream raises MCPError.
312+
313+
Reproduces the race from #1717: if the per-request response stream is closed
314+
(e.g. receive loop calls aclose() during shutdown) before send_request reads
315+
from it, receive() raises EndOfStream. Without the fix, this propagates as an
316+
unhandled EndOfStream (or causes UnboundLocalError).
317+
318+
Simulates this by closing the response stream's send side directly while the
319+
server connection stays open (so the receive loop never enters its finally block).
320+
"""
321+
322+
ev_result = anyio.Event()
323+
caught_error: list[MCPError] = []
324+
325+
async with create_client_server_memory_streams() as (client_streams, server_streams):
326+
client_read, client_write = client_streams
327+
server_read, _server_write = server_streams
328+
329+
async def make_request(client_session: ClientSession):
330+
nonlocal caught_error
331+
try:
332+
await client_session.send_ping()
333+
pytest.fail("Expected MCPError") # pragma: no cover
334+
except MCPError as e:
335+
caught_error.append(e)
336+
ev_result.set()
337+
338+
async def close_response_stream(client_session: ClientSession):
339+
# Consume the request so the client's send completes
340+
await server_read.receive()
341+
342+
# Wait for send_request to register its response stream
343+
while not client_session._response_streams: # pragma: no branch
344+
await anyio.sleep(0.01) # pragma: no cover
345+
346+
# Close the send side directly, bypassing the receive loop's
347+
# graceful error injection. This triggers EndOfStream on receive().
348+
for stream in client_session._response_streams.values():
349+
await stream.aclose()
350+
351+
async with (
352+
anyio.create_task_group() as tg,
353+
ClientSession(read_stream=client_read, write_stream=client_write) as client_session,
354+
):
355+
tg.start_soon(make_request, client_session)
356+
tg.start_soon(close_response_stream, client_session)
357+
358+
with anyio.fail_after(2): # pragma: no branch
359+
await ev_result.wait()
360+
361+
assert len(caught_error) == 1
362+
assert "Connection closed" in str(caught_error[0])

0 commit comments

Comments
 (0)