Skip to content

Commit 52cf90f

Browse files
author
skyvanguard
committed
fix: convert all strict pragmas to lax for non-deterministic coverage
All # pragma: no cover annotations in the streamable HTTP transport files are on code paths that have non-deterministic coverage under parallel test execution. Using lax annotations correctly excludes them from coverage without triggering strict-no-cover violations. Github-Issue:#1648
1 parent a0e96ea commit 52cf90f

File tree

2 files changed

+39
-37
lines changed

2 files changed

+39
-37
lines changed

src/mcp/server/streamable_http.py

Lines changed: 35 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ async def store_event(self, stream_id: StreamId, message: JSONRPCMessage | None)
9191
Returns:
9292
The generated event ID for the stored event
9393
"""
94-
pass # pragma: no cover
94+
pass # pragma: lax no cover
9595

9696
@abstractmethod
9797
async def replay_events_after(
@@ -108,7 +108,7 @@ async def replay_events_after(
108108
Returns:
109109
The stream ID of the replayed events
110110
"""
111-
pass # pragma: no cover
111+
pass # pragma: lax no cover
112112

113113

114114
class StreamableHTTPServerTransport:
@@ -175,7 +175,7 @@ def is_terminated(self) -> bool:
175175
"""Check if this transport has been explicitly terminated."""
176176
return self._terminated
177177

178-
def close_sse_stream(self, request_id: RequestId) -> None: # pragma: no cover
178+
def close_sse_stream(self, request_id: RequestId) -> None: # pragma: lax no cover
179179
"""Close SSE connection for a specific request without terminating the stream.
180180
181181
This method closes the HTTP connection for the specified request, triggering
@@ -203,7 +203,7 @@ def close_sse_stream(self, request_id: RequestId) -> None: # pragma: no cover
203203
send_stream.close()
204204
receive_stream.close()
205205

206-
def close_standalone_sse_stream(self) -> None: # pragma: no cover
206+
def close_standalone_sse_stream(self) -> None: # pragma: lax no cover
207207
"""Close the standalone GET SSE stream, triggering client reconnection.
208208
209209
This method closes the HTTP connection for the standalone GET stream used
@@ -238,10 +238,10 @@ def _create_session_message(
238238
# Only provide close callbacks when client supports resumability
239239
if self._event_store and protocol_version >= "2025-11-25":
240240

241-
async def close_stream_callback() -> None: # pragma: no cover
241+
async def close_stream_callback() -> None: # pragma: lax no cover
242242
self.close_sse_stream(request_id)
243243

244-
async def close_standalone_stream_callback() -> None: # pragma: no cover
244+
async def close_standalone_stream_callback() -> None: # pragma: lax no cover
245245
self.close_standalone_sse_stream()
246246

247247
metadata = ServerMessageMetadata(
@@ -289,7 +289,7 @@ def _create_error_response(
289289
) -> Response:
290290
"""Create an error response with a simple string message."""
291291
response_headers = {"Content-Type": CONTENT_TYPE_JSON}
292-
if headers: # pragma: no cover
292+
if headers: # pragma: lax no cover
293293
response_headers.update(headers)
294294

295295
if self.mcp_session_id:
@@ -328,11 +328,11 @@ def _create_json_response(
328328
headers=response_headers,
329329
)
330330

331-
def _get_session_id(self, request: Request) -> str | None: # pragma: no cover
331+
def _get_session_id(self, request: Request) -> str | None: # pragma: lax no cover
332332
"""Extract the session ID from request headers."""
333333
return request.headers.get(MCP_SESSION_ID_HEADER)
334334

335-
def _create_event_data(self, event_message: EventMessage) -> dict[str, str]: # pragma: no cover
335+
def _create_event_data(self, event_message: EventMessage) -> dict[str, str]: # pragma: lax no cover
336336
"""Create event data dictionary from an EventMessage."""
337337
event_data = {
338338
"event": "message",
@@ -352,7 +352,7 @@ async def _clean_up_memory_streams(self, request_id: RequestId) -> None:
352352
# Close the request stream
353353
await self._request_streams[request_id][0].aclose()
354354
await self._request_streams[request_id][1].aclose()
355-
except Exception: # pragma: no cover
355+
except Exception: # pragma: lax no cover
356356
# During cleanup, we catch all exceptions since streams might be in various states
357357
logger.debug("Error closing memory streams - may already be closed")
358358
finally:
@@ -370,7 +370,7 @@ async def handle_request(self, scope: Scope, receive: Receive, send: Send) -> No
370370
await error_response(scope, receive, send)
371371
return
372372

373-
if self._terminated: # pragma: no cover
373+
if self._terminated: # pragma: lax no cover
374374
# If the session has been terminated, return 404 Not Found
375375
response = self._create_error_response(
376376
"Not Found: Session has been terminated",
@@ -382,13 +382,13 @@ async def handle_request(self, scope: Scope, receive: Receive, send: Send) -> No
382382
try:
383383
if request.method == "POST":
384384
await self._handle_post_request(scope, request, receive, send)
385-
elif request.method == "GET": # pragma: no cover
385+
elif request.method == "GET": # pragma: lax no cover
386386
await self._handle_get_request(request, send)
387-
elif request.method == "DELETE": # pragma: no cover
387+
elif request.method == "DELETE": # pragma: lax no cover
388388
await self._handle_delete_request(request, send)
389-
else: # pragma: no cover
389+
else: # pragma: lax no cover
390390
await self._handle_unsupported_request(request, send)
391-
except ClientDisconnect: # pragma: no cover
391+
except ClientDisconnect: # pragma: lax no cover
392392
logger.debug(f"Client disconnected during {request.method} request")
393393

394394
def _check_accept_headers(self, request: Request) -> tuple[bool, bool]:
@@ -433,15 +433,15 @@ async def _validate_accept_header(self, request: Request, scope: Scope, send: Se
433433
async def _handle_post_request(self, scope: Scope, request: Request, receive: Receive, send: Send) -> None:
434434
"""Handle POST requests containing JSON-RPC messages."""
435435
writer = self._read_stream_writer
436-
if writer is None: # pragma: no cover
436+
if writer is None: # pragma: lax no cover
437437
raise ValueError("No read stream writer available. Ensure connect() is called first.")
438438
try:
439439
# Validate Accept header
440440
if not await self._validate_accept_header(request, scope, send):
441441
return
442442

443443
# Validate Content-Type
444-
if not self._check_content_type(request): # pragma: no cover
444+
if not self._check_content_type(request): # pragma: lax no cover
445445
response = self._create_error_response(
446446
"Unsupported Media Type: Content-Type must be application/json",
447447
HTTPStatus.UNSUPPORTED_MEDIA_TYPE,
@@ -461,7 +461,7 @@ async def _handle_post_request(self, scope: Scope, request: Request, receive: Re
461461

462462
try:
463463
message = jsonrpc_message_adapter.validate_python(raw_message, by_name=False)
464-
except ValidationError as e: # pragma: no cover
464+
except ValidationError as e: # pragma: lax no cover
465465
response = self._create_error_response(
466466
f"Validation error: {str(e)}",
467467
HTTPStatus.BAD_REQUEST,
@@ -473,7 +473,7 @@ async def _handle_post_request(self, scope: Scope, request: Request, receive: Re
473473
# Check if this is an initialization request
474474
is_initialization_request = isinstance(message, JSONRPCRequest) and message.method == "initialize"
475475

476-
if is_initialization_request: # pragma: no cover
476+
if is_initialization_request: # pragma: lax no cover
477477
# Check if the server already has an established session
478478
if self.mcp_session_id:
479479
# Check if request has a session ID
@@ -487,11 +487,11 @@ async def _handle_post_request(self, scope: Scope, request: Request, receive: Re
487487
)
488488
await response(scope, receive, send)
489489
return
490-
elif not await self._validate_request_headers(request, send): # pragma: no cover
490+
elif not await self._validate_request_headers(request, send): # pragma: lax no cover
491491
return
492492

493493
# For notifications and responses only, return 202 Accepted
494-
if not isinstance(message, JSONRPCRequest): # pragma: no cover
494+
if not isinstance(message, JSONRPCRequest): # pragma: lax no cover
495495
# Create response object and send it
496496
response = self._create_json_response(
497497
None,
@@ -538,23 +538,23 @@ async def _handle_post_request(self, scope: Scope, request: Request, receive: Re
538538
response_message = event_message.message
539539
break
540540
# For notifications and request, keep waiting
541-
else: # pragma: no cover
541+
else: # pragma: lax no cover
542542
logger.debug(f"received: {event_message.message.method}")
543543

544544
# At this point we should have a response
545545
if response_message:
546546
# Create JSON response
547547
response = self._create_json_response(response_message)
548548
await response(scope, receive, send)
549-
else: # pragma: no cover
549+
else: # pragma: lax no cover
550550
# This shouldn't happen in normal operation
551551
logger.error("No response message received before stream closed")
552552
response = self._create_error_response(
553553
"Error processing request: No response received",
554554
HTTPStatus.INTERNAL_SERVER_ERROR,
555555
)
556556
await response(scope, receive, send)
557-
except Exception: # pragma: no cover
557+
except Exception: # pragma: lax no cover
558558
logger.exception("Error processing JSON response")
559559
response = self._create_error_response(
560560
"Error processing request",
@@ -564,7 +564,7 @@ async def _handle_post_request(self, scope: Scope, request: Request, receive: Re
564564
await response(scope, receive, send)
565565
finally:
566566
await self._clean_up_memory_streams(request_id)
567-
else: # pragma: no cover
567+
else: # pragma: lax no cover
568568
# Create SSE stream
569569
sse_stream_writer, sse_stream_reader = anyio.create_memory_object_stream[dict[str, str]](0)
570570

@@ -626,7 +626,7 @@ async def sse_writer():
626626
await sse_stream_reader.aclose()
627627
await self._clean_up_memory_streams(request_id)
628628

629-
except Exception as err: # pragma: no cover
629+
except Exception as err: # pragma: lax no cover
630630
logger.exception("Error handling POST request")
631631
response = self._create_error_response(
632632
f"Error handling POST request: {err}",
@@ -638,7 +638,7 @@ async def sse_writer():
638638
await writer.send(Exception(err))
639639
return
640640

641-
async def _handle_get_request(self, request: Request, send: Send) -> None: # pragma: no cover
641+
async def _handle_get_request(self, request: Request, send: Send) -> None: # pragma: lax no cover
642642
"""Handle GET request to establish SSE.
643643
644644
This allows the server to communicate to the client without the client
@@ -736,7 +736,7 @@ async def standalone_sse_writer():
736736
await sse_stream_reader.aclose()
737737
await self._clean_up_memory_streams(GET_STREAM_KEY)
738738

739-
async def _handle_delete_request(self, request: Request, send: Send) -> None: # pragma: no cover
739+
async def _handle_delete_request(self, request: Request, send: Send) -> None: # pragma: lax no cover
740740
"""Handle DELETE requests for explicit session termination."""
741741
# Validate session ID
742742
if not self.mcp_session_id:
@@ -786,11 +786,11 @@ async def terminate(self) -> None:
786786
await self._write_stream_reader.aclose()
787787
if self._write_stream is not None: # pragma: no branch
788788
await self._write_stream.aclose()
789-
except Exception as e: # pragma: no cover
789+
except Exception as e: # pragma: lax no cover
790790
# During cleanup, we catch all exceptions since streams might be in various states
791791
logger.debug(f"Error closing streams: {e}")
792792

793-
async def _handle_unsupported_request(self, request: Request, send: Send) -> None: # pragma: no cover
793+
async def _handle_unsupported_request(self, request: Request, send: Send) -> None: # pragma: lax no cover
794794
"""Handle unsupported HTTP methods."""
795795
headers = {
796796
"Content-Type": CONTENT_TYPE_JSON,
@@ -864,7 +864,7 @@ async def _validate_protocol_version(self, request: Request, send: Send) -> bool
864864

865865
return True
866866

867-
async def _replay_events(self, last_event_id: str, request: Request, send: Send) -> None: # pragma: no cover
867+
async def _replay_events(self, last_event_id: str, request: Request, send: Send) -> None: # pragma: lax no cover
868868
"""Replays events that would have been sent after the specified event ID.
869869
Only used when resumability is enabled.
870870
"""
@@ -996,7 +996,7 @@ async def message_router():
996996
# send it there
997997
target_request_id = response_id
998998
# Extract related_request_id from meta if it exists
999-
elif ( # pragma: no cover
999+
elif ( # pragma: lax no cover
10001000
session_message.metadata is not None
10011001
and isinstance(
10021002
session_message.metadata,
@@ -1020,13 +1020,13 @@ async def message_router():
10201020
try:
10211021
# Send both the message and the event ID
10221022
await self._request_streams[request_stream_id][0].send(EventMessage(message, event_id))
1023-
except ( # pragma: no cover
1023+
except ( # pragma: lax no cover
10241024
anyio.BrokenResourceError,
10251025
anyio.ClosedResourceError,
10261026
):
10271027
# Stream might be closed, remove from registry
10281028
self._request_streams.pop(request_stream_id, None)
1029-
else: # pragma: no cover
1029+
else: # pragma: lax no cover
10301030
logger.debug(
10311031
f"""Request stream {request_stream_id} not found
10321032
for message. Still processing message as the client
@@ -1057,6 +1057,6 @@ async def message_router():
10571057
await read_stream.aclose()
10581058
await write_stream_reader.aclose()
10591059
await write_stream.aclose()
1060-
except Exception as e: # pragma: no cover
1060+
except Exception as e: # pragma: lax no cover
10611061
# During cleanup, we catch all exceptions since streams might be in various states
10621062
logger.debug(f"Error closing streams: {e}")

src/mcp/server/streamable_http_manager.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,9 @@ async def _handle_stateful_request(
213213
request_mcp_session_id = request.headers.get(MCP_SESSION_ID_HEADER)
214214

215215
# Existing session case
216-
if request_mcp_session_id is not None and request_mcp_session_id in self._server_instances: # pragma: no cover
216+
if (
217+
request_mcp_session_id is not None and request_mcp_session_id in self._server_instances
218+
): # pragma: lax no cover
217219
transport = self._server_instances[request_mcp_session_id]
218220
logger.debug("Session already exists, handling request directly")
219221
await transport.handle_request(scope, receive, send)
@@ -297,5 +299,5 @@ class StreamableHTTPASGIApp:
297299
def __init__(self, session_manager: StreamableHTTPSessionManager):
298300
self.session_manager = session_manager
299301

300-
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: # pragma: no cover
302+
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: # pragma: lax no cover
301303
await self.session_manager.handle_request(scope, receive, send)

0 commit comments

Comments
 (0)