Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 20 additions & 14 deletions src/mcp/server/lowlevel/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ async def main():
from mcp.server.lowlevel.experimental import ExperimentalHandlers
from mcp.server.models import InitializationOptions
from mcp.server.session import ServerSession
from mcp.server.streamable_http import EventStore
from mcp.server.streamable_http import MCP_SESSION_ID_HEADER, EventStore
from mcp.server.streamable_http_manager import StreamableHTTPASGIApp, StreamableHTTPSessionManager
from mcp.server.transport_security import TransportSecuritySettings
from mcp.shared._otel import extract_trace_context, otel_span
from mcp.shared._otel import build_server_span_attributes, extract_trace_context, otel_span
from mcp.shared._stream_protocols import ReadStream, WriteStream
from mcp.shared.exceptions import MCPError
from mcp.shared.message import ServerMessageMetadata, SessionMessage
Expand Down Expand Up @@ -454,28 +454,34 @@ async def _handle_request(
# Extract W3C trace context from _meta (SEP-414).
meta = cast(dict[str, Any] | None, getattr(req.params, "meta", None)) if req.params else None
parent_context = extract_trace_context(meta) if meta is not None else None
server_message_metadata = (
message.message_metadata if isinstance(message.message_metadata, ServerMessageMetadata) else None
)
request_data = server_message_metadata.request_context if server_message_metadata is not None else None
close_sse_stream_cb = None
close_standalone_sse_stream_cb = None
if server_message_metadata is not None:
close_sse_stream_cb = server_message_metadata.close_sse_stream
close_standalone_sse_stream_cb = server_message_metadata.close_standalone_sse_stream
request_headers = getattr(request_data, "headers", None)
session_id = request_headers.get(MCP_SESSION_ID_HEADER) if request_headers is not None else None

with otel_span(
span_name,
kind=SpanKind.SERVER,
attributes={"mcp.method.name": req.method, "jsonrpc.request.id": message.request_id},
attributes=build_server_span_attributes(
service_name=self.name,
method=req.method,
request_id=message.request_id,
params=req.params,
session_id=session_id,
),
context=parent_context,
) as span:
if handler := self._request_handlers.get(req.method):
logger.debug("Dispatching request of type %s", type(req).__name__)

try:
# Extract request context and close_sse_stream from message metadata
request_data = None
close_sse_stream_cb = None
close_standalone_sse_stream_cb = None
if message.message_metadata is not None and isinstance(
message.message_metadata, ServerMessageMetadata
):
request_data = message.message_metadata.request_context
close_sse_stream_cb = message.message_metadata.close_sse_stream
close_standalone_sse_stream_cb = message.message_metadata.close_standalone_sse_stream

client_capabilities = session.client_params.capabilities if session.client_params else None
task_support = self._experimental_handlers.task_support if self._experimental_handlers else None
# Get task metadata from request params if present
Expand Down
62 changes: 61 additions & 1 deletion src/mcp/shared/_otel.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@

from collections.abc import Iterator
from contextlib import contextmanager
from typing import Any
from typing import Any, cast

from opentelemetry.context import Context
from opentelemetry.propagate import extract, inject
from opentelemetry.trace import SpanKind, get_tracer

_tracer = get_tracer("mcp-python-sdk")
MCP_RPC_SYSTEM = "mcp"


@contextmanager
Expand All @@ -34,3 +35,62 @@ def inject_trace_context(meta: dict[str, Any]) -> None:
def extract_trace_context(meta: dict[str, Any]) -> Context:
"""Extract W3C trace context from a `_meta` dict."""
return extract(meta)


def build_client_span_attributes(
*,
method: str,
request_id: str | int,
params: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Build OTel attributes for an MCP client request span."""
attributes: dict[str, Any] = {
"rpc.system": MCP_RPC_SYSTEM,
"rpc.method": method,
"mcp.method.name": method,
"jsonrpc.request.id": request_id,
}

resource_uri = None
if params is not None:
resource_uri = params.get("uri")
if resource_uri is None:
Comment on lines +40 to +57
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 build_client_span_attributes has no session_id parameter, so client spans can never carry the mcp.session.id attribute while server spans for the same session can — creating an observability asymmetry. The PR description's claim that 'mcp.session.id is surfaced on the client side through the streamable HTTP write stream wrapper' is factually incorrect: commit 67ca7fe in this same branch explicitly removed that wrapper without adding a replacement. To fix this, add a session_id parameter to build_client_span_attributes and thread the session ID from ClientStreamableHTTPTransport.session_id through BaseSession.send_request.

Extended reasoning...

What the bug is and how it manifests

build_client_span_attributes (src/mcp/shared/_otel.py lines 40-57) accepts only method, request_id, and params — there is no session_id parameter. This means that no matter what the calling code does, client OTel spans can never carry the mcp.session.id attribute. By contrast, build_server_span_attributes (lines 60-84) accepts an explicit session_id parameter and conditionally sets attributes["mcp.session.id"] when it is non-None. The two functions are structurally asymmetric for session ID support.

The specific code path that triggers it

The streamable HTTP client transport (src/mcp/client/streamable_http.py, ClientStreamableHTTPTransport) maintains self.session_id and includes it in every outgoing request via the mcp-session-id HTTP header. However, this value is never passed to BaseSession.send_request (src/mcp/shared/session.py:276), which calls build_client_span_attributes with only method, request_id, and the request params dict. There is no pathway to supply a session_id to the client span builder.

Why existing code does not prevent it

Commit 67ca7fe ('Remove client OTel session-id wrapper') in this same PR branch deliberately removed the _SessionAwareWriteStream wrapper class and the _get_transport_session_id() mechanism that previously solved this problem. Commit 2c4b51a ('Restore OTel span attribute helpers') then re-added build_client_span_attributes without the session_id parameter. The TODO comment left by the author in client/streamable_http.py acknowledges the removal was intentional ('I have dropped the get_session_id callback because it breaks the Transport protocol') but offers no replacement. The PR description was not updated to reflect this change, and instead continues to claim the old mechanism still works.

What the impact would be

In distributed tracing across real deployments, client and server processes are separate. An operator querying their tracing backend (e.g. Jaeger, Honeycomb) by mcp.session.id to correlate all activity for a given session would find server spans but miss every client span, making the trace incomplete and the session timeline unreconstructable. This is the core use case for session ID on spans.

How to fix it

Add a session_id: str | None = None parameter to build_client_span_attributes with the same conditional-set logic as build_server_span_attributes. Then thread the session ID from the transport into BaseSession.send_request — either by passing it as a parameter to send_request, or by making BaseSession aware of the transport's session ID via a callback or property on the write stream (the original approach, now removed).

Step-by-step proof

  1. src/mcp/shared/_otel.py lines 40-57: build_client_span_attributes signature is (*, method: str, request_id: str | int, params: dict[str, Any] | None = None) — no session_id parameter.
  2. src/mcp/shared/_otel.py lines 60-84: build_server_span_attributes has session_id: str | None = None and sets attributes["mcp.session.id"] = session_id when non-None.
  3. src/mcp/shared/session.py line 276: send_request calls build_client_span_attributes(method=..., request_id=..., params=...) with no session_id.
  4. Commit 67ca7fe removed _SessionAwareWriteStream (the previous client-side session ID mechanism). Commit 2c4b51a restored build_client_span_attributes without restoring the session_id parameter.
  5. The PR description's Testing section references 'resource_read_spans_include_session_id or streamable_http_client_resource_read', but no such test exists in tests/shared/test_streamable_http.py — confirming the client-side session ID surfacing was never validated after the wrapper removal.

ref = params.get("ref")
if isinstance(ref, dict):
typed_ref = cast(dict[str, Any], ref)
resource_uri = typed_ref.get("uri")

if resource_uri is not None:
attributes["mcp.resource.uri"] = resource_uri

return attributes


def build_server_span_attributes(
*,
service_name: str,
method: str,
request_id: str | int,
params: Any = None,
session_id: str | None = None,
) -> dict[str, Any]:
"""Build OTel attributes for an MCP server request span."""
attributes: dict[str, Any] = {
"rpc.system": MCP_RPC_SYSTEM,
"rpc.service": service_name,
"rpc.method": method,
"mcp.method.name": method,
"jsonrpc.request.id": request_id,
}

resource_uri = getattr(params, "uri", None)
if resource_uri is None:
ref = getattr(params, "ref", None)
resource_uri = getattr(ref, "uri", None)
if resource_uri is not None:
attributes["mcp.resource.uri"] = str(resource_uri)

if session_id is not None:
attributes["mcp.session.id"] = session_id

return attributes
8 changes: 6 additions & 2 deletions src/mcp/shared/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from pydantic import BaseModel, TypeAdapter
from typing_extensions import Self

from mcp.shared._otel import inject_trace_context, otel_span
from mcp.shared._otel import build_client_span_attributes, inject_trace_context, otel_span
from mcp.shared._stream_protocols import ReadStream, WriteStream
from mcp.shared.exceptions import MCPError
from mcp.shared.message import MessageMetadata, ServerMessageMetadata, SessionMessage
Expand Down Expand Up @@ -276,7 +276,11 @@ async def send_request(
with otel_span(
span_name,
kind=SpanKind.CLIENT,
attributes={"mcp.method.name": request.method, "jsonrpc.request.id": request_id},
attributes=build_client_span_attributes(
method=request.method,
request_id=request_id,
params=request_data.get("params"),
),
):
# Inject W3C trace context into _meta (SEP-414).
meta: dict[str, Any] = request_data.setdefault("params", {}).setdefault("_meta", {})
Expand Down
82 changes: 82 additions & 0 deletions tests/shared/test_otel.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,90 @@ def greet(name: str) -> str:
client_span = next(s for s in spans if s["name"] == "MCP send tools/call greet")
server_span = next(s for s in spans if s["name"] == "MCP handle tools/call greet")

assert client_span["attributes"]["rpc.system"] == "mcp"
assert client_span["attributes"]["rpc.method"] == "tools/call"
assert client_span["attributes"]["mcp.method.name"] == "tools/call"
assert server_span["attributes"]["rpc.system"] == "mcp"
assert server_span["attributes"]["rpc.service"] == "test"
assert server_span["attributes"]["rpc.method"] == "tools/call"
assert server_span["attributes"]["mcp.method.name"] == "tools/call"

# Server span should be in the same trace as the client span (context propagation).
assert server_span["context"]["trace_id"] == client_span["context"]["trace_id"]


@pytest.mark.filterwarnings("ignore::RuntimeWarning")
async def test_resource_read_spans_include_resource_uri(capfire: CaptureLogfire):
"""Verify that resource reads include MCP resource and RPC attributes."""
server = MCPServer("test")

@server.resource("test://resource")
def test_resource() -> str:
return "hello"

async with Client(server) as client:
result = await client.read_resource("test://resource")

assert result.contents[0].uri == "test://resource"

spans = capfire.exporter.exported_spans_as_dict()

client_span = next(s for s in spans if s["name"] == "MCP send resources/read")
server_span = next(s for s in spans if s["name"] == "MCP handle resources/read")

assert client_span["attributes"]["rpc.system"] == "mcp"
assert client_span["attributes"]["rpc.method"] == "resources/read"
assert client_span["attributes"]["mcp.method.name"] == "resources/read"
assert client_span["attributes"]["mcp.resource.uri"] == "test://resource"

assert server_span["attributes"]["rpc.system"] == "mcp"
assert server_span["attributes"]["rpc.service"] == "test"
assert server_span["attributes"]["rpc.method"] == "resources/read"
assert server_span["attributes"]["mcp.method.name"] == "resources/read"
assert server_span["attributes"]["mcp.resource.uri"] == "test://resource"

# Server span should be in the same trace as the client span (context propagation).
assert server_span["context"]["trace_id"] == client_span["context"]["trace_id"]


@pytest.mark.filterwarnings("ignore::RuntimeWarning")
async def test_completion_spans_include_resource_template_uri(capfire: CaptureLogfire):
"""Verify completion spans include the referenced resource template URI."""
server = MCPServer("test")

@server.completion()
async def handle_completion(
ref: types.ResourceTemplateReference | types.PromptReference,
argument: types.CompletionArgument,
context: types.CompletionContext | None,
) -> types.Completion:
assert isinstance(ref, types.ResourceTemplateReference)
assert argument.name == "path"
assert argument.value == "rea"
assert context is None
return types.Completion(values=["README.md"])

async with Client(server) as client:
result = await client.complete(
ref=types.ResourceTemplateReference(type="ref/resource", uri="repo://files/{path}"),
argument={"name": "path", "value": "rea"},
)

assert result.completion.values == ["README.md"]

spans = capfire.exporter.exported_spans_as_dict()

client_span = next(s for s in spans if s["name"] == "MCP send completion/complete")
server_span = next(s for s in spans if s["name"] == "MCP handle completion/complete")

assert client_span["attributes"]["rpc.system"] == "mcp"
assert client_span["attributes"]["rpc.method"] == "completion/complete"
assert client_span["attributes"]["mcp.method.name"] == "completion/complete"
assert client_span["attributes"]["mcp.resource.uri"] == "repo://files/{path}"

assert server_span["attributes"]["rpc.system"] == "mcp"
assert server_span["attributes"]["rpc.service"] == "test"
assert server_span["attributes"]["rpc.method"] == "completion/complete"
assert server_span["attributes"]["mcp.method.name"] == "completion/complete"
assert server_span["attributes"]["mcp.resource.uri"] == "repo://files/{path}"
assert server_span["context"]["trace_id"] == client_span["context"]["trace_id"]
27 changes: 27 additions & 0 deletions tests/shared/test_streamable_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import requests
import uvicorn
from httpx_sse import ServerSentEvent
from logfire.testing import CaptureLogfire
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.routing import Mount
Expand Down Expand Up @@ -1081,6 +1082,32 @@ async def test_streamable_http_client_resource_read(initialized_client_session:
assert response.contents[0].text == "Read test-resource"


@pytest.mark.anyio
@pytest.mark.filterwarnings("ignore::RuntimeWarning")
async def test_streamable_http_server_span_includes_session_id(capfire: CaptureLogfire):
"""Verify streamable HTTP server spans include the negotiated MCP session ID."""
app = _create_server()
mcp_app = app.streamable_http_app(host="testserver")

async with (
mcp_app.router.lifespan_context(mcp_app),
httpx.ASGITransport(mcp_app) as transport,
httpx.AsyncClient(transport=transport, base_url="http://testserver") as http_client,
streamable_http_client("http://testserver/mcp", http_client=http_client) as (read_stream, write_stream),
ClientSession(read_stream, write_stream) as session,
):
await session.initialize()
response = await session.read_resource(uri="foobar://test-resource")

assert response.contents[0].uri == "foobar://test-resource"

spans = capfire.exporter.exported_spans_as_dict()
server_span = next(s for s in spans if s["name"] == "MCP handle resources/read")

assert server_span["attributes"]["mcp.session.id"]
assert server_span["attributes"]["mcp.resource.uri"] == "foobar://test-resource"


@pytest.mark.anyio
async def test_streamable_http_client_tool_invocation(initialized_client_session: ClientSession):
"""Test client tool invocation."""
Expand Down
Loading