diff --git a/src/mcp/server/lowlevel/server.py b/src/mcp/server/lowlevel/server.py index 59de0ace4..81ec05b5e 100644 --- a/src/mcp/server/lowlevel/server.py +++ b/src/mcp/server/lowlevel/server.py @@ -63,10 +63,10 @@ async def main(): from mcp.server.lowlevel.experimental import ExperimentalHandlers from mcp.server.models import InitializationOptions from mcp.server.session import ServerSession -from mcp.server.streamable_http import EventStore +from mcp.server.streamable_http import MCP_SESSION_ID_HEADER, EventStore from mcp.server.streamable_http_manager import StreamableHTTPASGIApp, StreamableHTTPSessionManager from mcp.server.transport_security import TransportSecuritySettings -from mcp.shared._otel import extract_trace_context, otel_span +from mcp.shared._otel import build_server_span_attributes, extract_trace_context, otel_span from mcp.shared._stream_protocols import ReadStream, WriteStream from mcp.shared.exceptions import MCPError from mcp.shared.message import ServerMessageMetadata, SessionMessage @@ -454,28 +454,34 @@ async def _handle_request( # Extract W3C trace context from _meta (SEP-414). meta = cast(dict[str, Any] | None, getattr(req.params, "meta", None)) if req.params else None parent_context = extract_trace_context(meta) if meta is not None else None + server_message_metadata = ( + message.message_metadata if isinstance(message.message_metadata, ServerMessageMetadata) else None + ) + request_data = server_message_metadata.request_context if server_message_metadata is not None else None + close_sse_stream_cb = None + close_standalone_sse_stream_cb = None + if server_message_metadata is not None: + close_sse_stream_cb = server_message_metadata.close_sse_stream + close_standalone_sse_stream_cb = server_message_metadata.close_standalone_sse_stream + request_headers = getattr(request_data, "headers", None) + session_id = request_headers.get(MCP_SESSION_ID_HEADER) if request_headers is not None else None with otel_span( span_name, kind=SpanKind.SERVER, - attributes={"mcp.method.name": req.method, "jsonrpc.request.id": message.request_id}, + attributes=build_server_span_attributes( + service_name=self.name, + method=req.method, + request_id=message.request_id, + params=req.params, + session_id=session_id, + ), context=parent_context, ) as span: if handler := self._request_handlers.get(req.method): logger.debug("Dispatching request of type %s", type(req).__name__) try: - # Extract request context and close_sse_stream from message metadata - request_data = None - close_sse_stream_cb = None - close_standalone_sse_stream_cb = None - if message.message_metadata is not None and isinstance( - message.message_metadata, ServerMessageMetadata - ): - request_data = message.message_metadata.request_context - close_sse_stream_cb = message.message_metadata.close_sse_stream - close_standalone_sse_stream_cb = message.message_metadata.close_standalone_sse_stream - client_capabilities = session.client_params.capabilities if session.client_params else None task_support = self._experimental_handlers.task_support if self._experimental_handlers else None # Get task metadata from request params if present diff --git a/src/mcp/shared/_otel.py b/src/mcp/shared/_otel.py index 170e873a0..1f2ec8b03 100644 --- a/src/mcp/shared/_otel.py +++ b/src/mcp/shared/_otel.py @@ -4,13 +4,14 @@ from collections.abc import Iterator from contextlib import contextmanager -from typing import Any +from typing import Any, cast from opentelemetry.context import Context from opentelemetry.propagate import extract, inject from opentelemetry.trace import SpanKind, get_tracer _tracer = get_tracer("mcp-python-sdk") +MCP_RPC_SYSTEM = "mcp" @contextmanager @@ -34,3 +35,62 @@ def inject_trace_context(meta: dict[str, Any]) -> None: def extract_trace_context(meta: dict[str, Any]) -> Context: """Extract W3C trace context from a `_meta` dict.""" return extract(meta) + + +def build_client_span_attributes( + *, + method: str, + request_id: str | int, + params: dict[str, Any] | None = None, +) -> dict[str, Any]: + """Build OTel attributes for an MCP client request span.""" + attributes: dict[str, Any] = { + "rpc.system": MCP_RPC_SYSTEM, + "rpc.method": method, + "mcp.method.name": method, + "jsonrpc.request.id": request_id, + } + + resource_uri = None + if params is not None: + resource_uri = params.get("uri") + if resource_uri is None: + ref = params.get("ref") + if isinstance(ref, dict): + typed_ref = cast(dict[str, Any], ref) + resource_uri = typed_ref.get("uri") + + if resource_uri is not None: + attributes["mcp.resource.uri"] = resource_uri + + return attributes + + +def build_server_span_attributes( + *, + service_name: str, + method: str, + request_id: str | int, + params: Any = None, + session_id: str | None = None, +) -> dict[str, Any]: + """Build OTel attributes for an MCP server request span.""" + attributes: dict[str, Any] = { + "rpc.system": MCP_RPC_SYSTEM, + "rpc.service": service_name, + "rpc.method": method, + "mcp.method.name": method, + "jsonrpc.request.id": request_id, + } + + resource_uri = getattr(params, "uri", None) + if resource_uri is None: + ref = getattr(params, "ref", None) + resource_uri = getattr(ref, "uri", None) + if resource_uri is not None: + attributes["mcp.resource.uri"] = str(resource_uri) + + if session_id is not None: + attributes["mcp.session.id"] = session_id + + return attributes diff --git a/src/mcp/shared/session.py b/src/mcp/shared/session.py index 243eef5ae..84d7a8e34 100644 --- a/src/mcp/shared/session.py +++ b/src/mcp/shared/session.py @@ -13,7 +13,7 @@ from pydantic import BaseModel, TypeAdapter from typing_extensions import Self -from mcp.shared._otel import inject_trace_context, otel_span +from mcp.shared._otel import build_client_span_attributes, inject_trace_context, otel_span from mcp.shared._stream_protocols import ReadStream, WriteStream from mcp.shared.exceptions import MCPError from mcp.shared.message import MessageMetadata, ServerMessageMetadata, SessionMessage @@ -276,7 +276,11 @@ async def send_request( with otel_span( span_name, kind=SpanKind.CLIENT, - attributes={"mcp.method.name": request.method, "jsonrpc.request.id": request_id}, + attributes=build_client_span_attributes( + method=request.method, + request_id=request_id, + params=request_data.get("params"), + ), ): # Inject W3C trace context into _meta (SEP-414). meta: dict[str, Any] = request_data.setdefault("params", {}).setdefault("_meta", {}) diff --git a/tests/shared/test_otel.py b/tests/shared/test_otel.py index ec7ff78cc..a6ccc6784 100644 --- a/tests/shared/test_otel.py +++ b/tests/shared/test_otel.py @@ -37,8 +37,90 @@ def greet(name: str) -> str: client_span = next(s for s in spans if s["name"] == "MCP send tools/call greet") server_span = next(s for s in spans if s["name"] == "MCP handle tools/call greet") + assert client_span["attributes"]["rpc.system"] == "mcp" + assert client_span["attributes"]["rpc.method"] == "tools/call" assert client_span["attributes"]["mcp.method.name"] == "tools/call" + assert server_span["attributes"]["rpc.system"] == "mcp" + assert server_span["attributes"]["rpc.service"] == "test" + assert server_span["attributes"]["rpc.method"] == "tools/call" assert server_span["attributes"]["mcp.method.name"] == "tools/call" # Server span should be in the same trace as the client span (context propagation). assert server_span["context"]["trace_id"] == client_span["context"]["trace_id"] + + +@pytest.mark.filterwarnings("ignore::RuntimeWarning") +async def test_resource_read_spans_include_resource_uri(capfire: CaptureLogfire): + """Verify that resource reads include MCP resource and RPC attributes.""" + server = MCPServer("test") + + @server.resource("test://resource") + def test_resource() -> str: + return "hello" + + async with Client(server) as client: + result = await client.read_resource("test://resource") + + assert result.contents[0].uri == "test://resource" + + spans = capfire.exporter.exported_spans_as_dict() + + client_span = next(s for s in spans if s["name"] == "MCP send resources/read") + server_span = next(s for s in spans if s["name"] == "MCP handle resources/read") + + assert client_span["attributes"]["rpc.system"] == "mcp" + assert client_span["attributes"]["rpc.method"] == "resources/read" + assert client_span["attributes"]["mcp.method.name"] == "resources/read" + assert client_span["attributes"]["mcp.resource.uri"] == "test://resource" + + assert server_span["attributes"]["rpc.system"] == "mcp" + assert server_span["attributes"]["rpc.service"] == "test" + assert server_span["attributes"]["rpc.method"] == "resources/read" + assert server_span["attributes"]["mcp.method.name"] == "resources/read" + assert server_span["attributes"]["mcp.resource.uri"] == "test://resource" + + # Server span should be in the same trace as the client span (context propagation). + assert server_span["context"]["trace_id"] == client_span["context"]["trace_id"] + + +@pytest.mark.filterwarnings("ignore::RuntimeWarning") +async def test_completion_spans_include_resource_template_uri(capfire: CaptureLogfire): + """Verify completion spans include the referenced resource template URI.""" + server = MCPServer("test") + + @server.completion() + async def handle_completion( + ref: types.ResourceTemplateReference | types.PromptReference, + argument: types.CompletionArgument, + context: types.CompletionContext | None, + ) -> types.Completion: + assert isinstance(ref, types.ResourceTemplateReference) + assert argument.name == "path" + assert argument.value == "rea" + assert context is None + return types.Completion(values=["README.md"]) + + async with Client(server) as client: + result = await client.complete( + ref=types.ResourceTemplateReference(type="ref/resource", uri="repo://files/{path}"), + argument={"name": "path", "value": "rea"}, + ) + + assert result.completion.values == ["README.md"] + + spans = capfire.exporter.exported_spans_as_dict() + + client_span = next(s for s in spans if s["name"] == "MCP send completion/complete") + server_span = next(s for s in spans if s["name"] == "MCP handle completion/complete") + + assert client_span["attributes"]["rpc.system"] == "mcp" + assert client_span["attributes"]["rpc.method"] == "completion/complete" + assert client_span["attributes"]["mcp.method.name"] == "completion/complete" + assert client_span["attributes"]["mcp.resource.uri"] == "repo://files/{path}" + + assert server_span["attributes"]["rpc.system"] == "mcp" + assert server_span["attributes"]["rpc.service"] == "test" + assert server_span["attributes"]["rpc.method"] == "completion/complete" + assert server_span["attributes"]["mcp.method.name"] == "completion/complete" + assert server_span["attributes"]["mcp.resource.uri"] == "repo://files/{path}" + assert server_span["context"]["trace_id"] == client_span["context"]["trace_id"] diff --git a/tests/shared/test_streamable_http.py b/tests/shared/test_streamable_http.py index 3d5770fb6..7023e905c 100644 --- a/tests/shared/test_streamable_http.py +++ b/tests/shared/test_streamable_http.py @@ -23,6 +23,7 @@ import requests import uvicorn from httpx_sse import ServerSentEvent +from logfire.testing import CaptureLogfire from starlette.applications import Starlette from starlette.requests import Request from starlette.routing import Mount @@ -1081,6 +1082,32 @@ async def test_streamable_http_client_resource_read(initialized_client_session: assert response.contents[0].text == "Read test-resource" +@pytest.mark.anyio +@pytest.mark.filterwarnings("ignore::RuntimeWarning") +async def test_streamable_http_server_span_includes_session_id(capfire: CaptureLogfire): + """Verify streamable HTTP server spans include the negotiated MCP session ID.""" + app = _create_server() + mcp_app = app.streamable_http_app(host="testserver") + + async with ( + mcp_app.router.lifespan_context(mcp_app), + httpx.ASGITransport(mcp_app) as transport, + httpx.AsyncClient(transport=transport, base_url="http://testserver") as http_client, + streamable_http_client("http://testserver/mcp", http_client=http_client) as (read_stream, write_stream), + ClientSession(read_stream, write_stream) as session, + ): + await session.initialize() + response = await session.read_resource(uri="foobar://test-resource") + + assert response.contents[0].uri == "foobar://test-resource" + + spans = capfire.exporter.exported_spans_as_dict() + server_span = next(s for s in spans if s["name"] == "MCP handle resources/read") + + assert server_span["attributes"]["mcp.session.id"] + assert server_span["attributes"]["mcp.resource.uri"] == "foobar://test-resource" + + @pytest.mark.anyio async def test_streamable_http_client_tool_invocation(initialized_client_session: ClientSession): """Test client tool invocation."""