From 456437676bc304ebe367737cb02d8090d93d6a4e Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Thu, 2 Apr 2026 19:23:30 -0400 Subject: [PATCH 1/5] Add richer OTel MCP span attributes --- src/mcp/client/streamable_http.py | 33 +++++++++++++++++- src/mcp/server/lowlevel/server.py | 31 ++++++++++------- src/mcp/shared/_otel.py | 52 ++++++++++++++++++++++++++++ src/mcp/shared/session.py | 18 ++++++++-- tests/shared/test_otel.py | 36 +++++++++++++++++++ tests/shared/test_streamable_http.py | 21 +++++++++++ 6 files changed, 174 insertions(+), 17 deletions(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index 9a119c633..b4378802e 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -7,6 +7,7 @@ from collections.abc import AsyncGenerator, Awaitable, Callable from contextlib import asynccontextmanager from dataclasses import dataclass +from types import TracebackType import anyio import httpx @@ -17,6 +18,7 @@ from mcp.client._transport import TransportStreams from mcp.shared._context_streams import ContextReceiveStream, ContextSendStream, create_context_streams from mcp.shared._httpx_utils import create_mcp_http_client +from mcp.shared._stream_protocols import WriteStream from mcp.shared.message import ClientMessageMetadata, SessionMessage from mcp.types import ( INTERNAL_ERROR, @@ -512,6 +514,35 @@ def get_session_id(self) -> str | None: return self.session_id # pragma: no cover +class _SessionAwareWriteStream: + """Write-stream wrapper that exposes the transport session ID.""" + + def __init__(self, inner: WriteStream[SessionMessage], transport: StreamableHTTPTransport) -> None: + self._inner = inner + self._transport = transport + + async def send(self, item: SessionMessage) -> None: + await self._inner.send(item) + + async def aclose(self) -> None: + await self._inner.aclose() + + def get_session_id(self) -> str | None: + return self._transport.session_id + + async def __aenter__(self) -> _SessionAwareWriteStream: + await self._inner.__aenter__() + return self + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> bool | None: + return await self._inner.__aexit__(exc_type, exc_val, exc_tb) + + # TODO(Marcelo): I've dropped the `get_session_id` callback because it breaks the Transport protocol. Is that needed? # It's a completely wrong abstraction, so removal is a good idea. But if we need the client to find the session ID, # we should think about a better way to do it. I believe we can achieve it with other means. @@ -581,7 +612,7 @@ def start_get_stream() -> None: ) try: - yield read_stream, write_stream + yield read_stream, _SessionAwareWriteStream(write_stream, transport) finally: if transport.session_id and terminate_on_close: await transport.terminate_session(client) diff --git a/src/mcp/server/lowlevel/server.py b/src/mcp/server/lowlevel/server.py index 59de0ace4..69074129e 100644 --- a/src/mcp/server/lowlevel/server.py +++ b/src/mcp/server/lowlevel/server.py @@ -66,13 +66,14 @@ async def main(): from mcp.server.streamable_http import EventStore from mcp.server.streamable_http_manager import StreamableHTTPASGIApp, StreamableHTTPSessionManager from mcp.server.transport_security import TransportSecuritySettings -from mcp.shared._otel import extract_trace_context, otel_span +from mcp.shared._otel import build_server_span_attributes, extract_trace_context, otel_span from mcp.shared._stream_protocols import ReadStream, WriteStream from mcp.shared.exceptions import MCPError from mcp.shared.message import ServerMessageMetadata, SessionMessage from mcp.shared.session import RequestResponder logger = logging.getLogger(__name__) +MCP_SESSION_ID_HEADER = "mcp-session-id" LifespanResultT = TypeVar("LifespanResultT", default=Any) @@ -454,28 +455,32 @@ async def _handle_request( # Extract W3C trace context from _meta (SEP-414). meta = cast(dict[str, Any] | None, getattr(req.params, "meta", None)) if req.params else None parent_context = extract_trace_context(meta) if meta is not None else None + request_data = None + close_sse_stream_cb = None + close_standalone_sse_stream_cb = None + if message.message_metadata is not None and isinstance(message.message_metadata, ServerMessageMetadata): + request_data = message.message_metadata.request_context + close_sse_stream_cb = message.message_metadata.close_sse_stream + close_standalone_sse_stream_cb = message.message_metadata.close_standalone_sse_stream + request_headers = getattr(request_data, "headers", None) + session_id = request_headers.get(MCP_SESSION_ID_HEADER) if request_headers is not None else None with otel_span( span_name, kind=SpanKind.SERVER, - attributes={"mcp.method.name": req.method, "jsonrpc.request.id": message.request_id}, + attributes=build_server_span_attributes( + service_name=self.name, + method=req.method, + request_id=message.request_id, + params=req.params, + session_id=session_id, + ), context=parent_context, ) as span: if handler := self._request_handlers.get(req.method): logger.debug("Dispatching request of type %s", type(req).__name__) try: - # Extract request context and close_sse_stream from message metadata - request_data = None - close_sse_stream_cb = None - close_standalone_sse_stream_cb = None - if message.message_metadata is not None and isinstance( - message.message_metadata, ServerMessageMetadata - ): - request_data = message.message_metadata.request_context - close_sse_stream_cb = message.message_metadata.close_sse_stream - close_standalone_sse_stream_cb = message.message_metadata.close_standalone_sse_stream - client_capabilities = session.client_params.capabilities if session.client_params else None task_support = self._experimental_handlers.task_support if self._experimental_handlers else None # Get task metadata from request params if present diff --git a/src/mcp/shared/_otel.py b/src/mcp/shared/_otel.py index 170e873a0..b5fa0d718 100644 --- a/src/mcp/shared/_otel.py +++ b/src/mcp/shared/_otel.py @@ -11,6 +11,7 @@ from opentelemetry.trace import SpanKind, get_tracer _tracer = get_tracer("mcp-python-sdk") +MCP_RPC_SYSTEM = "mcp" @contextmanager @@ -34,3 +35,54 @@ def inject_trace_context(meta: dict[str, Any]) -> None: def extract_trace_context(meta: dict[str, Any]) -> Context: """Extract W3C trace context from a `_meta` dict.""" return extract(meta) + + +def build_client_span_attributes( + *, + method: str, + request_id: str | int, + params: dict[str, Any] | None = None, + session_id: str | None = None, +) -> dict[str, Any]: + """Build OTel attributes for an MCP client request span.""" + attributes: dict[str, Any] = { + "rpc.system": MCP_RPC_SYSTEM, + "rpc.method": method, + "mcp.method.name": method, + "jsonrpc.request.id": request_id, + } + + if params is not None and (resource_uri := params.get("uri")) is not None: + attributes["mcp.resource.uri"] = resource_uri + + if session_id is not None: + attributes["mcp.session.id"] = session_id + + return attributes + + +def build_server_span_attributes( + *, + service_name: str, + method: str, + request_id: str | int, + params: Any = None, + session_id: str | None = None, +) -> dict[str, Any]: + """Build OTel attributes for an MCP server request span.""" + attributes: dict[str, Any] = { + "rpc.system": MCP_RPC_SYSTEM, + "rpc.service": service_name, + "rpc.method": method, + "mcp.method.name": method, + "jsonrpc.request.id": request_id, + } + + resource_uri = getattr(params, "uri", None) + if resource_uri is not None: + attributes["mcp.resource.uri"] = str(resource_uri) + + if session_id is not None: + attributes["mcp.session.id"] = session_id + + return attributes diff --git a/src/mcp/shared/session.py b/src/mcp/shared/session.py index 243eef5ae..8a21736a1 100644 --- a/src/mcp/shared/session.py +++ b/src/mcp/shared/session.py @@ -5,7 +5,7 @@ from collections.abc import Callable from contextlib import AsyncExitStack from types import TracebackType -from typing import Any, Generic, Protocol, TypeVar +from typing import Any, Generic, Protocol, TypeVar, cast import anyio from anyio.streams.memory import MemoryObjectSendStream @@ -13,7 +13,7 @@ from pydantic import BaseModel, TypeAdapter from typing_extensions import Self -from mcp.shared._otel import inject_trace_context, otel_span +from mcp.shared._otel import build_client_span_attributes, inject_trace_context, otel_span from mcp.shared._stream_protocols import ReadStream, WriteStream from mcp.shared.exceptions import MCPError from mcp.shared.message import MessageMetadata, ServerMessageMetadata, SessionMessage @@ -236,6 +236,13 @@ async def __aexit__( self._task_group.cancel_scope.cancel() return await self._task_group.__aexit__(exc_type, exc_val, exc_tb) + def _get_transport_session_id(self) -> str | None: + """Return the transport session ID when the write stream exposes it.""" + get_session_id = getattr(self._write_stream, "get_session_id", None) + if callable(get_session_id): + return cast("str | None", get_session_id()) + return None + async def send_request( self, request: SendRequestT, @@ -276,7 +283,12 @@ async def send_request( with otel_span( span_name, kind=SpanKind.CLIENT, - attributes={"mcp.method.name": request.method, "jsonrpc.request.id": request_id}, + attributes=build_client_span_attributes( + method=request.method, + request_id=request_id, + params=request_data.get("params"), + session_id=self._get_transport_session_id(), + ), ): # Inject W3C trace context into _meta (SEP-414). meta: dict[str, Any] = request_data.setdefault("params", {}).setdefault("_meta", {}) diff --git a/tests/shared/test_otel.py b/tests/shared/test_otel.py index ec7ff78cc..b70e397d9 100644 --- a/tests/shared/test_otel.py +++ b/tests/shared/test_otel.py @@ -37,8 +37,44 @@ def greet(name: str) -> str: client_span = next(s for s in spans if s["name"] == "MCP send tools/call greet") server_span = next(s for s in spans if s["name"] == "MCP handle tools/call greet") + assert client_span["attributes"]["rpc.system"] == "mcp" + assert client_span["attributes"]["rpc.method"] == "tools/call" assert client_span["attributes"]["mcp.method.name"] == "tools/call" + assert server_span["attributes"]["rpc.system"] == "mcp" + assert server_span["attributes"]["rpc.service"] == "test" + assert server_span["attributes"]["rpc.method"] == "tools/call" assert server_span["attributes"]["mcp.method.name"] == "tools/call" # Server span should be in the same trace as the client span (context propagation). assert server_span["context"]["trace_id"] == client_span["context"]["trace_id"] + + +@pytest.mark.filterwarnings("ignore::RuntimeWarning") +async def test_resource_read_spans_include_resource_uri(capfire: CaptureLogfire): + """Verify that resource reads include MCP resource and RPC attributes.""" + server = MCPServer("test") + + @server.resource("test://resource") + def test_resource() -> str: + return "hello" + + async with Client(server) as client: + result = await client.read_resource("test://resource") + + assert result.contents[0].uri == "test://resource" + + spans = capfire.exporter.exported_spans_as_dict() + + client_span = next(s for s in spans if s["name"] == "MCP send resources/read") + server_span = next(s for s in spans if s["name"] == "MCP handle resources/read") + + assert client_span["attributes"]["rpc.system"] == "mcp" + assert client_span["attributes"]["rpc.method"] == "resources/read" + assert client_span["attributes"]["mcp.method.name"] == "resources/read" + assert client_span["attributes"]["mcp.resource.uri"] == "test://resource" + + assert server_span["attributes"]["rpc.system"] == "mcp" + assert server_span["attributes"]["rpc.service"] == "test" + assert server_span["attributes"]["rpc.method"] == "resources/read" + assert server_span["attributes"]["mcp.method.name"] == "resources/read" + assert server_span["attributes"]["mcp.resource.uri"] == "test://resource" diff --git a/tests/shared/test_streamable_http.py b/tests/shared/test_streamable_http.py index 3d5770fb6..78521e178 100644 --- a/tests/shared/test_streamable_http.py +++ b/tests/shared/test_streamable_http.py @@ -23,6 +23,7 @@ import requests import uvicorn from httpx_sse import ServerSentEvent +from logfire.testing import CaptureLogfire from starlette.applications import Starlette from starlette.requests import Request from starlette.routing import Mount @@ -1081,6 +1082,26 @@ async def test_streamable_http_client_resource_read(initialized_client_session: assert response.contents[0].text == "Read test-resource" +@pytest.mark.anyio +@pytest.mark.filterwarnings("ignore::RuntimeWarning") +async def test_streamable_http_resource_read_spans_include_session_id( + capfire: CaptureLogfire, basic_server: None, basic_server_url: str +): + """Verify streamable HTTP spans include the negotiated MCP session ID.""" + async with streamable_http_client(f"{basic_server_url}/mcp") as (read_stream, write_stream): + async with ClientSession(read_stream, write_stream) as session: + await session.initialize() + response = await session.read_resource(uri="foobar://test-resource") + + assert response.contents[0].uri == "foobar://test-resource" + + spans = capfire.exporter.exported_spans_as_dict() + client_span = next(s for s in spans if s["name"] == "MCP send resources/read") + + assert client_span["attributes"]["mcp.session.id"] + assert client_span["attributes"]["mcp.resource.uri"] == "foobar://test-resource" + + @pytest.mark.anyio async def test_streamable_http_client_tool_invocation(initialized_client_session: ClientSession): """Test client tool invocation.""" From 67ca7fea72d11dedd068c060aa9c09df4085ace1 Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Thu, 2 Apr 2026 19:46:46 -0400 Subject: [PATCH 2/5] Remove client OTel session-id wrapper --- src/mcp/client/streamable_http.py | 33 +--------------------------- src/mcp/shared/_otel.py | 4 ---- src/mcp/shared/session.py | 10 +-------- tests/shared/test_streamable_http.py | 21 ------------------ 4 files changed, 2 insertions(+), 66 deletions(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index b4378802e..9a119c633 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -7,7 +7,6 @@ from collections.abc import AsyncGenerator, Awaitable, Callable from contextlib import asynccontextmanager from dataclasses import dataclass -from types import TracebackType import anyio import httpx @@ -18,7 +17,6 @@ from mcp.client._transport import TransportStreams from mcp.shared._context_streams import ContextReceiveStream, ContextSendStream, create_context_streams from mcp.shared._httpx_utils import create_mcp_http_client -from mcp.shared._stream_protocols import WriteStream from mcp.shared.message import ClientMessageMetadata, SessionMessage from mcp.types import ( INTERNAL_ERROR, @@ -514,35 +512,6 @@ def get_session_id(self) -> str | None: return self.session_id # pragma: no cover -class _SessionAwareWriteStream: - """Write-stream wrapper that exposes the transport session ID.""" - - def __init__(self, inner: WriteStream[SessionMessage], transport: StreamableHTTPTransport) -> None: - self._inner = inner - self._transport = transport - - async def send(self, item: SessionMessage) -> None: - await self._inner.send(item) - - async def aclose(self) -> None: - await self._inner.aclose() - - def get_session_id(self) -> str | None: - return self._transport.session_id - - async def __aenter__(self) -> _SessionAwareWriteStream: - await self._inner.__aenter__() - return self - - async def __aexit__( - self, - exc_type: type[BaseException] | None, - exc_val: BaseException | None, - exc_tb: TracebackType | None, - ) -> bool | None: - return await self._inner.__aexit__(exc_type, exc_val, exc_tb) - - # TODO(Marcelo): I've dropped the `get_session_id` callback because it breaks the Transport protocol. Is that needed? # It's a completely wrong abstraction, so removal is a good idea. But if we need the client to find the session ID, # we should think about a better way to do it. I believe we can achieve it with other means. @@ -612,7 +581,7 @@ def start_get_stream() -> None: ) try: - yield read_stream, _SessionAwareWriteStream(write_stream, transport) + yield read_stream, write_stream finally: if transport.session_id and terminate_on_close: await transport.terminate_session(client) diff --git a/src/mcp/shared/_otel.py b/src/mcp/shared/_otel.py index b5fa0d718..164a345e5 100644 --- a/src/mcp/shared/_otel.py +++ b/src/mcp/shared/_otel.py @@ -42,7 +42,6 @@ def build_client_span_attributes( method: str, request_id: str | int, params: dict[str, Any] | None = None, - session_id: str | None = None, ) -> dict[str, Any]: """Build OTel attributes for an MCP client request span.""" attributes: dict[str, Any] = { @@ -55,9 +54,6 @@ def build_client_span_attributes( if params is not None and (resource_uri := params.get("uri")) is not None: attributes["mcp.resource.uri"] = resource_uri - if session_id is not None: - attributes["mcp.session.id"] = session_id - return attributes diff --git a/src/mcp/shared/session.py b/src/mcp/shared/session.py index 8a21736a1..84d7a8e34 100644 --- a/src/mcp/shared/session.py +++ b/src/mcp/shared/session.py @@ -5,7 +5,7 @@ from collections.abc import Callable from contextlib import AsyncExitStack from types import TracebackType -from typing import Any, Generic, Protocol, TypeVar, cast +from typing import Any, Generic, Protocol, TypeVar import anyio from anyio.streams.memory import MemoryObjectSendStream @@ -236,13 +236,6 @@ async def __aexit__( self._task_group.cancel_scope.cancel() return await self._task_group.__aexit__(exc_type, exc_val, exc_tb) - def _get_transport_session_id(self) -> str | None: - """Return the transport session ID when the write stream exposes it.""" - get_session_id = getattr(self._write_stream, "get_session_id", None) - if callable(get_session_id): - return cast("str | None", get_session_id()) - return None - async def send_request( self, request: SendRequestT, @@ -287,7 +280,6 @@ async def send_request( method=request.method, request_id=request_id, params=request_data.get("params"), - session_id=self._get_transport_session_id(), ), ): # Inject W3C trace context into _meta (SEP-414). diff --git a/tests/shared/test_streamable_http.py b/tests/shared/test_streamable_http.py index 78521e178..3d5770fb6 100644 --- a/tests/shared/test_streamable_http.py +++ b/tests/shared/test_streamable_http.py @@ -23,7 +23,6 @@ import requests import uvicorn from httpx_sse import ServerSentEvent -from logfire.testing import CaptureLogfire from starlette.applications import Starlette from starlette.requests import Request from starlette.routing import Mount @@ -1082,26 +1081,6 @@ async def test_streamable_http_client_resource_read(initialized_client_session: assert response.contents[0].text == "Read test-resource" -@pytest.mark.anyio -@pytest.mark.filterwarnings("ignore::RuntimeWarning") -async def test_streamable_http_resource_read_spans_include_session_id( - capfire: CaptureLogfire, basic_server: None, basic_server_url: str -): - """Verify streamable HTTP spans include the negotiated MCP session ID.""" - async with streamable_http_client(f"{basic_server_url}/mcp") as (read_stream, write_stream): - async with ClientSession(read_stream, write_stream) as session: - await session.initialize() - response = await session.read_resource(uri="foobar://test-resource") - - assert response.contents[0].uri == "foobar://test-resource" - - spans = capfire.exporter.exported_spans_as_dict() - client_span = next(s for s in spans if s["name"] == "MCP send resources/read") - - assert client_span["attributes"]["mcp.session.id"] - assert client_span["attributes"]["mcp.resource.uri"] == "foobar://test-resource" - - @pytest.mark.anyio async def test_streamable_http_client_tool_invocation(initialized_client_session: ClientSession): """Test client tool invocation.""" From b5b5389689bfa64a5cfa65d803687bc16fb0a773 Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Thu, 2 Apr 2026 19:51:37 -0400 Subject: [PATCH 3/5] Inline OTel span attributes --- src/mcp/server/lowlevel/server.py | 22 ++++++++------ src/mcp/shared/_otel.py | 48 ------------------------------- src/mcp/shared/session.py | 16 +++++++---- 3 files changed, 24 insertions(+), 62 deletions(-) diff --git a/src/mcp/server/lowlevel/server.py b/src/mcp/server/lowlevel/server.py index 69074129e..2b1697834 100644 --- a/src/mcp/server/lowlevel/server.py +++ b/src/mcp/server/lowlevel/server.py @@ -66,7 +66,7 @@ async def main(): from mcp.server.streamable_http import EventStore from mcp.server.streamable_http_manager import StreamableHTTPASGIApp, StreamableHTTPSessionManager from mcp.server.transport_security import TransportSecuritySettings -from mcp.shared._otel import build_server_span_attributes, extract_trace_context, otel_span +from mcp.shared._otel import extract_trace_context, otel_span from mcp.shared._stream_protocols import ReadStream, WriteStream from mcp.shared.exceptions import MCPError from mcp.shared.message import ServerMessageMetadata, SessionMessage @@ -464,17 +464,23 @@ async def _handle_request( close_standalone_sse_stream_cb = message.message_metadata.close_standalone_sse_stream request_headers = getattr(request_data, "headers", None) session_id = request_headers.get(MCP_SESSION_ID_HEADER) if request_headers is not None else None + span_attributes: dict[str, Any] = { + "rpc.system": "mcp", + "rpc.service": self.name, + "rpc.method": req.method, + "mcp.method.name": req.method, + "jsonrpc.request.id": message.request_id, + } + resource_uri = getattr(req.params, "uri", None) + if resource_uri is not None: + span_attributes["mcp.resource.uri"] = str(resource_uri) + if session_id is not None: + span_attributes["mcp.session.id"] = session_id with otel_span( span_name, kind=SpanKind.SERVER, - attributes=build_server_span_attributes( - service_name=self.name, - method=req.method, - request_id=message.request_id, - params=req.params, - session_id=session_id, - ), + attributes=span_attributes, context=parent_context, ) as span: if handler := self._request_handlers.get(req.method): diff --git a/src/mcp/shared/_otel.py b/src/mcp/shared/_otel.py index 164a345e5..170e873a0 100644 --- a/src/mcp/shared/_otel.py +++ b/src/mcp/shared/_otel.py @@ -11,7 +11,6 @@ from opentelemetry.trace import SpanKind, get_tracer _tracer = get_tracer("mcp-python-sdk") -MCP_RPC_SYSTEM = "mcp" @contextmanager @@ -35,50 +34,3 @@ def inject_trace_context(meta: dict[str, Any]) -> None: def extract_trace_context(meta: dict[str, Any]) -> Context: """Extract W3C trace context from a `_meta` dict.""" return extract(meta) - - -def build_client_span_attributes( - *, - method: str, - request_id: str | int, - params: dict[str, Any] | None = None, -) -> dict[str, Any]: - """Build OTel attributes for an MCP client request span.""" - attributes: dict[str, Any] = { - "rpc.system": MCP_RPC_SYSTEM, - "rpc.method": method, - "mcp.method.name": method, - "jsonrpc.request.id": request_id, - } - - if params is not None and (resource_uri := params.get("uri")) is not None: - attributes["mcp.resource.uri"] = resource_uri - - return attributes - - -def build_server_span_attributes( - *, - service_name: str, - method: str, - request_id: str | int, - params: Any = None, - session_id: str | None = None, -) -> dict[str, Any]: - """Build OTel attributes for an MCP server request span.""" - attributes: dict[str, Any] = { - "rpc.system": MCP_RPC_SYSTEM, - "rpc.service": service_name, - "rpc.method": method, - "mcp.method.name": method, - "jsonrpc.request.id": request_id, - } - - resource_uri = getattr(params, "uri", None) - if resource_uri is not None: - attributes["mcp.resource.uri"] = str(resource_uri) - - if session_id is not None: - attributes["mcp.session.id"] = session_id - - return attributes diff --git a/src/mcp/shared/session.py b/src/mcp/shared/session.py index 84d7a8e34..2ccb09c5d 100644 --- a/src/mcp/shared/session.py +++ b/src/mcp/shared/session.py @@ -13,7 +13,7 @@ from pydantic import BaseModel, TypeAdapter from typing_extensions import Self -from mcp.shared._otel import build_client_span_attributes, inject_trace_context, otel_span +from mcp.shared._otel import inject_trace_context, otel_span from mcp.shared._stream_protocols import ReadStream, WriteStream from mcp.shared.exceptions import MCPError from mcp.shared.message import MessageMetadata, ServerMessageMetadata, SessionMessage @@ -272,15 +272,19 @@ async def send_request( try: target = request_data.get("params", {}).get("name") span_name = f"MCP send {request.method} {target}" if target else f"MCP send {request.method}" + span_attributes: dict[str, Any] = { + "rpc.system": "mcp", + "rpc.method": request.method, + "mcp.method.name": request.method, + "jsonrpc.request.id": request_id, + } + if (resource_uri := request_data.get("params", {}).get("uri")) is not None: + span_attributes["mcp.resource.uri"] = resource_uri with otel_span( span_name, kind=SpanKind.CLIENT, - attributes=build_client_span_attributes( - method=request.method, - request_id=request_id, - params=request_data.get("params"), - ), + attributes=span_attributes, ): # Inject W3C trace context into _meta (SEP-414). meta: dict[str, Any] = request_data.setdefault("params", {}).setdefault("_meta", {}) From 2c4b51a1fa46835f4e2f704dd15fed4c3e93106b Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Thu, 2 Apr 2026 20:10:05 -0400 Subject: [PATCH 4/5] Restore OTel span attribute helpers --- src/mcp/server/lowlevel/server.py | 34 ++++++++++------------ src/mcp/shared/_otel.py | 48 +++++++++++++++++++++++++++++++ src/mcp/shared/session.py | 16 ++++------- 3 files changed, 69 insertions(+), 29 deletions(-) diff --git a/src/mcp/server/lowlevel/server.py b/src/mcp/server/lowlevel/server.py index 2b1697834..4ff123531 100644 --- a/src/mcp/server/lowlevel/server.py +++ b/src/mcp/server/lowlevel/server.py @@ -66,7 +66,7 @@ async def main(): from mcp.server.streamable_http import EventStore from mcp.server.streamable_http_manager import StreamableHTTPASGIApp, StreamableHTTPSessionManager from mcp.server.transport_security import TransportSecuritySettings -from mcp.shared._otel import extract_trace_context, otel_span +from mcp.shared._otel import build_server_span_attributes, extract_trace_context, otel_span from mcp.shared._stream_protocols import ReadStream, WriteStream from mcp.shared.exceptions import MCPError from mcp.shared.message import ServerMessageMetadata, SessionMessage @@ -455,32 +455,28 @@ async def _handle_request( # Extract W3C trace context from _meta (SEP-414). meta = cast(dict[str, Any] | None, getattr(req.params, "meta", None)) if req.params else None parent_context = extract_trace_context(meta) if meta is not None else None - request_data = None + server_message_metadata = ( + message.message_metadata if isinstance(message.message_metadata, ServerMessageMetadata) else None + ) + request_data = server_message_metadata.request_context if server_message_metadata is not None else None close_sse_stream_cb = None close_standalone_sse_stream_cb = None - if message.message_metadata is not None and isinstance(message.message_metadata, ServerMessageMetadata): - request_data = message.message_metadata.request_context - close_sse_stream_cb = message.message_metadata.close_sse_stream - close_standalone_sse_stream_cb = message.message_metadata.close_standalone_sse_stream + if server_message_metadata is not None: + close_sse_stream_cb = server_message_metadata.close_sse_stream + close_standalone_sse_stream_cb = server_message_metadata.close_standalone_sse_stream request_headers = getattr(request_data, "headers", None) session_id = request_headers.get(MCP_SESSION_ID_HEADER) if request_headers is not None else None - span_attributes: dict[str, Any] = { - "rpc.system": "mcp", - "rpc.service": self.name, - "rpc.method": req.method, - "mcp.method.name": req.method, - "jsonrpc.request.id": message.request_id, - } - resource_uri = getattr(req.params, "uri", None) - if resource_uri is not None: - span_attributes["mcp.resource.uri"] = str(resource_uri) - if session_id is not None: - span_attributes["mcp.session.id"] = session_id with otel_span( span_name, kind=SpanKind.SERVER, - attributes=span_attributes, + attributes=build_server_span_attributes( + service_name=self.name, + method=req.method, + request_id=message.request_id, + params=req.params, + session_id=session_id, + ), context=parent_context, ) as span: if handler := self._request_handlers.get(req.method): diff --git a/src/mcp/shared/_otel.py b/src/mcp/shared/_otel.py index 170e873a0..164a345e5 100644 --- a/src/mcp/shared/_otel.py +++ b/src/mcp/shared/_otel.py @@ -11,6 +11,7 @@ from opentelemetry.trace import SpanKind, get_tracer _tracer = get_tracer("mcp-python-sdk") +MCP_RPC_SYSTEM = "mcp" @contextmanager @@ -34,3 +35,50 @@ def inject_trace_context(meta: dict[str, Any]) -> None: def extract_trace_context(meta: dict[str, Any]) -> Context: """Extract W3C trace context from a `_meta` dict.""" return extract(meta) + + +def build_client_span_attributes( + *, + method: str, + request_id: str | int, + params: dict[str, Any] | None = None, +) -> dict[str, Any]: + """Build OTel attributes for an MCP client request span.""" + attributes: dict[str, Any] = { + "rpc.system": MCP_RPC_SYSTEM, + "rpc.method": method, + "mcp.method.name": method, + "jsonrpc.request.id": request_id, + } + + if params is not None and (resource_uri := params.get("uri")) is not None: + attributes["mcp.resource.uri"] = resource_uri + + return attributes + + +def build_server_span_attributes( + *, + service_name: str, + method: str, + request_id: str | int, + params: Any = None, + session_id: str | None = None, +) -> dict[str, Any]: + """Build OTel attributes for an MCP server request span.""" + attributes: dict[str, Any] = { + "rpc.system": MCP_RPC_SYSTEM, + "rpc.service": service_name, + "rpc.method": method, + "mcp.method.name": method, + "jsonrpc.request.id": request_id, + } + + resource_uri = getattr(params, "uri", None) + if resource_uri is not None: + attributes["mcp.resource.uri"] = str(resource_uri) + + if session_id is not None: + attributes["mcp.session.id"] = session_id + + return attributes diff --git a/src/mcp/shared/session.py b/src/mcp/shared/session.py index 2ccb09c5d..84d7a8e34 100644 --- a/src/mcp/shared/session.py +++ b/src/mcp/shared/session.py @@ -13,7 +13,7 @@ from pydantic import BaseModel, TypeAdapter from typing_extensions import Self -from mcp.shared._otel import inject_trace_context, otel_span +from mcp.shared._otel import build_client_span_attributes, inject_trace_context, otel_span from mcp.shared._stream_protocols import ReadStream, WriteStream from mcp.shared.exceptions import MCPError from mcp.shared.message import MessageMetadata, ServerMessageMetadata, SessionMessage @@ -272,19 +272,15 @@ async def send_request( try: target = request_data.get("params", {}).get("name") span_name = f"MCP send {request.method} {target}" if target else f"MCP send {request.method}" - span_attributes: dict[str, Any] = { - "rpc.system": "mcp", - "rpc.method": request.method, - "mcp.method.name": request.method, - "jsonrpc.request.id": request_id, - } - if (resource_uri := request_data.get("params", {}).get("uri")) is not None: - span_attributes["mcp.resource.uri"] = resource_uri with otel_span( span_name, kind=SpanKind.CLIENT, - attributes=span_attributes, + attributes=build_client_span_attributes( + method=request.method, + request_id=request_id, + params=request_data.get("params"), + ), ): # Inject W3C trace context into _meta (SEP-414). meta: dict[str, Any] = request_data.setdefault("params", {}).setdefault("_meta", {}) From f5b9f396716a5d145768d959332962cd7effb563 Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Thu, 2 Apr 2026 20:23:33 -0400 Subject: [PATCH 5/5] Address PR review findings --- src/mcp/server/lowlevel/server.py | 3 +- src/mcp/shared/_otel.py | 16 ++++++++-- tests/shared/test_otel.py | 46 ++++++++++++++++++++++++++++ tests/shared/test_streamable_http.py | 27 ++++++++++++++++ 4 files changed, 88 insertions(+), 4 deletions(-) diff --git a/src/mcp/server/lowlevel/server.py b/src/mcp/server/lowlevel/server.py index 4ff123531..81ec05b5e 100644 --- a/src/mcp/server/lowlevel/server.py +++ b/src/mcp/server/lowlevel/server.py @@ -63,7 +63,7 @@ async def main(): from mcp.server.lowlevel.experimental import ExperimentalHandlers from mcp.server.models import InitializationOptions from mcp.server.session import ServerSession -from mcp.server.streamable_http import EventStore +from mcp.server.streamable_http import MCP_SESSION_ID_HEADER, EventStore from mcp.server.streamable_http_manager import StreamableHTTPASGIApp, StreamableHTTPSessionManager from mcp.server.transport_security import TransportSecuritySettings from mcp.shared._otel import build_server_span_attributes, extract_trace_context, otel_span @@ -73,7 +73,6 @@ async def main(): from mcp.shared.session import RequestResponder logger = logging.getLogger(__name__) -MCP_SESSION_ID_HEADER = "mcp-session-id" LifespanResultT = TypeVar("LifespanResultT", default=Any) diff --git a/src/mcp/shared/_otel.py b/src/mcp/shared/_otel.py index 164a345e5..1f2ec8b03 100644 --- a/src/mcp/shared/_otel.py +++ b/src/mcp/shared/_otel.py @@ -4,7 +4,7 @@ from collections.abc import Iterator from contextlib import contextmanager -from typing import Any +from typing import Any, cast from opentelemetry.context import Context from opentelemetry.propagate import extract, inject @@ -51,7 +51,16 @@ def build_client_span_attributes( "jsonrpc.request.id": request_id, } - if params is not None and (resource_uri := params.get("uri")) is not None: + resource_uri = None + if params is not None: + resource_uri = params.get("uri") + if resource_uri is None: + ref = params.get("ref") + if isinstance(ref, dict): + typed_ref = cast(dict[str, Any], ref) + resource_uri = typed_ref.get("uri") + + if resource_uri is not None: attributes["mcp.resource.uri"] = resource_uri return attributes @@ -75,6 +84,9 @@ def build_server_span_attributes( } resource_uri = getattr(params, "uri", None) + if resource_uri is None: + ref = getattr(params, "ref", None) + resource_uri = getattr(ref, "uri", None) if resource_uri is not None: attributes["mcp.resource.uri"] = str(resource_uri) diff --git a/tests/shared/test_otel.py b/tests/shared/test_otel.py index b70e397d9..a6ccc6784 100644 --- a/tests/shared/test_otel.py +++ b/tests/shared/test_otel.py @@ -78,3 +78,49 @@ def test_resource() -> str: assert server_span["attributes"]["rpc.method"] == "resources/read" assert server_span["attributes"]["mcp.method.name"] == "resources/read" assert server_span["attributes"]["mcp.resource.uri"] == "test://resource" + + # Server span should be in the same trace as the client span (context propagation). + assert server_span["context"]["trace_id"] == client_span["context"]["trace_id"] + + +@pytest.mark.filterwarnings("ignore::RuntimeWarning") +async def test_completion_spans_include_resource_template_uri(capfire: CaptureLogfire): + """Verify completion spans include the referenced resource template URI.""" + server = MCPServer("test") + + @server.completion() + async def handle_completion( + ref: types.ResourceTemplateReference | types.PromptReference, + argument: types.CompletionArgument, + context: types.CompletionContext | None, + ) -> types.Completion: + assert isinstance(ref, types.ResourceTemplateReference) + assert argument.name == "path" + assert argument.value == "rea" + assert context is None + return types.Completion(values=["README.md"]) + + async with Client(server) as client: + result = await client.complete( + ref=types.ResourceTemplateReference(type="ref/resource", uri="repo://files/{path}"), + argument={"name": "path", "value": "rea"}, + ) + + assert result.completion.values == ["README.md"] + + spans = capfire.exporter.exported_spans_as_dict() + + client_span = next(s for s in spans if s["name"] == "MCP send completion/complete") + server_span = next(s for s in spans if s["name"] == "MCP handle completion/complete") + + assert client_span["attributes"]["rpc.system"] == "mcp" + assert client_span["attributes"]["rpc.method"] == "completion/complete" + assert client_span["attributes"]["mcp.method.name"] == "completion/complete" + assert client_span["attributes"]["mcp.resource.uri"] == "repo://files/{path}" + + assert server_span["attributes"]["rpc.system"] == "mcp" + assert server_span["attributes"]["rpc.service"] == "test" + assert server_span["attributes"]["rpc.method"] == "completion/complete" + assert server_span["attributes"]["mcp.method.name"] == "completion/complete" + assert server_span["attributes"]["mcp.resource.uri"] == "repo://files/{path}" + assert server_span["context"]["trace_id"] == client_span["context"]["trace_id"] diff --git a/tests/shared/test_streamable_http.py b/tests/shared/test_streamable_http.py index 3d5770fb6..7023e905c 100644 --- a/tests/shared/test_streamable_http.py +++ b/tests/shared/test_streamable_http.py @@ -23,6 +23,7 @@ import requests import uvicorn from httpx_sse import ServerSentEvent +from logfire.testing import CaptureLogfire from starlette.applications import Starlette from starlette.requests import Request from starlette.routing import Mount @@ -1081,6 +1082,32 @@ async def test_streamable_http_client_resource_read(initialized_client_session: assert response.contents[0].text == "Read test-resource" +@pytest.mark.anyio +@pytest.mark.filterwarnings("ignore::RuntimeWarning") +async def test_streamable_http_server_span_includes_session_id(capfire: CaptureLogfire): + """Verify streamable HTTP server spans include the negotiated MCP session ID.""" + app = _create_server() + mcp_app = app.streamable_http_app(host="testserver") + + async with ( + mcp_app.router.lifespan_context(mcp_app), + httpx.ASGITransport(mcp_app) as transport, + httpx.AsyncClient(transport=transport, base_url="http://testserver") as http_client, + streamable_http_client("http://testserver/mcp", http_client=http_client) as (read_stream, write_stream), + ClientSession(read_stream, write_stream) as session, + ): + await session.initialize() + response = await session.read_resource(uri="foobar://test-resource") + + assert response.contents[0].uri == "foobar://test-resource" + + spans = capfire.exporter.exported_spans_as_dict() + server_span = next(s for s in spans if s["name"] == "MCP handle resources/read") + + assert server_span["attributes"]["mcp.session.id"] + assert server_span["attributes"]["mcp.resource.uri"] == "foobar://test-resource" + + @pytest.mark.anyio async def test_streamable_http_client_tool_invocation(initialized_client_session: ClientSession): """Test client tool invocation."""