Skip to content

Commit 37891f4

Browse files
authored
Add basic OpenTelemetry tracing for client and server requests (#2381)
1 parent 3ce0f76 commit 37891f4

File tree

6 files changed

+436
-83
lines changed

6 files changed

+436
-83
lines changed

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ dependencies = [
4040
"pyjwt[crypto]>=2.10.1",
4141
"typing-extensions>=4.13.0",
4242
"typing-inspection>=0.4.1",
43+
"opentelemetry-api>=1.28.0",
4344
]
4445

4546
[project.optional-dependencies]
@@ -71,6 +72,7 @@ dev = [
7172
"coverage[toml]>=7.10.7,<=7.13",
7273
"pillow>=12.0",
7374
"strict-no-cover",
75+
"logfire>=3.0.0",
7476
]
7577
docs = [
7678
"mkdocs>=1.6.1",

src/mcp/server/lowlevel/server.py

Lines changed: 85 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,10 @@ async def main():
4242
from collections.abc import AsyncIterator, Awaitable, Callable
4343
from contextlib import AbstractAsyncContextManager, AsyncExitStack, asynccontextmanager
4444
from importlib.metadata import version as importlib_version
45-
from typing import Any, Generic
45+
from typing import Any, Generic, cast
4646

4747
import anyio
48+
from opentelemetry.trace import SpanKind, StatusCode
4849
from starlette.applications import Starlette
4950
from starlette.middleware import Middleware
5051
from starlette.middleware.authentication import AuthenticationMiddleware
@@ -65,6 +66,7 @@ async def main():
6566
from mcp.server.streamable_http import EventStore
6667
from mcp.server.streamable_http_manager import StreamableHTTPASGIApp, StreamableHTTPSessionManager
6768
from mcp.server.transport_security import TransportSecuritySettings
69+
from mcp.shared._otel import extract_trace_context, otel_span
6870
from mcp.shared._stream_protocols import ReadStream, WriteStream
6971
from mcp.shared.exceptions import MCPError
7072
from mcp.shared.message import ServerMessageMetadata, SessionMessage
@@ -446,72 +448,90 @@ async def _handle_request(
446448
):
447449
logger.info("Processing request of type %s", type(req).__name__)
448450

449-
if handler := self._request_handlers.get(req.method):
450-
logger.debug("Dispatching request of type %s", type(req).__name__)
451+
target = getattr(req.params, "name", None) if req.params else None
452+
span_name = f"MCP handle {req.method} {target}" if target else f"MCP handle {req.method}"
451453

452-
try:
453-
# Extract request context and close_sse_stream from message metadata
454-
request_data = None
455-
close_sse_stream_cb = None
456-
close_standalone_sse_stream_cb = None
457-
if message.message_metadata is not None and isinstance(message.message_metadata, ServerMessageMetadata):
458-
request_data = message.message_metadata.request_context
459-
close_sse_stream_cb = message.message_metadata.close_sse_stream
460-
close_standalone_sse_stream_cb = message.message_metadata.close_standalone_sse_stream
454+
# Extract W3C trace context from _meta (SEP-414).
455+
meta = cast(dict[str, Any] | None, getattr(req.params, "meta", None)) if req.params else None
456+
parent_context = extract_trace_context(meta) if meta is not None else None
461457

462-
client_capabilities = session.client_params.capabilities if session.client_params else None
463-
task_support = self._experimental_handlers.task_support if self._experimental_handlers else None
464-
# Get task metadata from request params if present
465-
task_metadata = None
466-
if hasattr(req, "params") and req.params is not None:
467-
task_metadata = getattr(req.params, "task", None)
468-
ctx = ServerRequestContext(
469-
request_id=message.request_id,
470-
meta=message.request_meta,
471-
session=session,
472-
lifespan_context=lifespan_context,
473-
experimental=Experimental(
474-
task_metadata=task_metadata,
475-
_client_capabilities=client_capabilities,
476-
_session=session,
477-
_task_support=task_support,
478-
),
479-
request=request_data,
480-
close_sse_stream=close_sse_stream_cb,
481-
close_standalone_sse_stream=close_standalone_sse_stream_cb,
482-
)
483-
response = await handler(ctx, req.params)
484-
except MCPError as err:
485-
response = err.error
486-
except anyio.get_cancelled_exc_class():
487-
if message.cancelled:
488-
# Client sent CancelledNotification; responder.cancel() already
489-
# sent an error response, so skip the duplicate.
490-
logger.info("Request %s cancelled - duplicate response suppressed", message.request_id)
491-
return
492-
# Transport-close cancellation from the TG in run(); re-raise so the
493-
# TG swallows its own cancellation.
494-
raise
495-
except Exception as err:
496-
if raise_exceptions: # pragma: no cover
497-
raise err
498-
response = types.ErrorData(code=0, message=str(err))
499-
else: # pragma: no cover
500-
response = types.ErrorData(code=types.METHOD_NOT_FOUND, message="Method not found")
501-
502-
try:
503-
await message.respond(response)
504-
except (anyio.BrokenResourceError, anyio.ClosedResourceError):
505-
# Transport closed between handler unblocking and respond. Happens
506-
# when _receive_loop's finally wakes a handler blocked on
507-
# send_request: the handler runs to respond() before run()'s TG
508-
# cancel fires, but after the write stream closed. Closed if our
509-
# end closed (_receive_loop's async-with exit); Broken if the peer
510-
# end closed first (streamable_http terminate()).
511-
logger.debug("Response for %s dropped - transport closed", message.request_id)
512-
return
513-
514-
logger.debug("Response sent")
458+
with otel_span(
459+
span_name,
460+
kind=SpanKind.SERVER,
461+
attributes={"mcp.method.name": req.method, "jsonrpc.request.id": message.request_id},
462+
context=parent_context,
463+
) as span:
464+
if handler := self._request_handlers.get(req.method):
465+
logger.debug("Dispatching request of type %s", type(req).__name__)
466+
467+
try:
468+
# Extract request context and close_sse_stream from message metadata
469+
request_data = None
470+
close_sse_stream_cb = None
471+
close_standalone_sse_stream_cb = None
472+
if message.message_metadata is not None and isinstance(
473+
message.message_metadata, ServerMessageMetadata
474+
):
475+
request_data = message.message_metadata.request_context
476+
close_sse_stream_cb = message.message_metadata.close_sse_stream
477+
close_standalone_sse_stream_cb = message.message_metadata.close_standalone_sse_stream
478+
479+
client_capabilities = session.client_params.capabilities if session.client_params else None
480+
task_support = self._experimental_handlers.task_support if self._experimental_handlers else None
481+
# Get task metadata from request params if present
482+
task_metadata = None
483+
if hasattr(req, "params") and req.params is not None: # pragma: no branch
484+
task_metadata = getattr(req.params, "task", None)
485+
ctx = ServerRequestContext(
486+
request_id=message.request_id,
487+
meta=message.request_meta,
488+
session=session,
489+
lifespan_context=lifespan_context,
490+
experimental=Experimental(
491+
task_metadata=task_metadata,
492+
_client_capabilities=client_capabilities,
493+
_session=session,
494+
_task_support=task_support,
495+
),
496+
request=request_data,
497+
close_sse_stream=close_sse_stream_cb,
498+
close_standalone_sse_stream=close_standalone_sse_stream_cb,
499+
)
500+
response = await handler(ctx, req.params)
501+
except MCPError as err:
502+
response = err.error
503+
except anyio.get_cancelled_exc_class():
504+
if message.cancelled:
505+
# Client sent CancelledNotification; responder.cancel() already
506+
# sent an error response, so skip the duplicate.
507+
logger.info("Request %s cancelled - duplicate response suppressed", message.request_id)
508+
return
509+
# Transport-close cancellation from the TG in run(); re-raise so the
510+
# TG swallows its own cancellation.
511+
raise
512+
except Exception as err:
513+
if raise_exceptions: # pragma: no cover
514+
raise err
515+
response = types.ErrorData(code=0, message=str(err))
516+
else: # pragma: no cover
517+
response = types.ErrorData(code=types.METHOD_NOT_FOUND, message="Method not found")
518+
519+
if isinstance(response, types.ErrorData) and span is not None:
520+
span.set_status(StatusCode.ERROR, response.message)
521+
522+
try:
523+
await message.respond(response)
524+
except (anyio.BrokenResourceError, anyio.ClosedResourceError):
525+
# Transport closed between handler unblocking and respond. Happens
526+
# when _receive_loop's finally wakes a handler blocked on
527+
# send_request: the handler runs to respond() before run()'s TG
528+
# cancel fires, but after the write stream closed. Closed if our
529+
# end closed (_receive_loop's async-with exit); Broken if the peer
530+
# end closed first (streamable_http terminate()).
531+
logger.debug("Response for %s dropped - transport closed", message.request_id)
532+
return
533+
534+
logger.debug("Response sent")
515535

516536
async def _handle_notification(
517537
self,

src/mcp/shared/_otel.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
"""OpenTelemetry helpers for MCP."""
2+
3+
from __future__ import annotations
4+
5+
from collections.abc import Iterator
6+
from contextlib import contextmanager
7+
from typing import Any
8+
9+
from opentelemetry.context import Context
10+
from opentelemetry.propagate import extract, inject
11+
from opentelemetry.trace import SpanKind, get_tracer
12+
13+
_tracer = get_tracer("mcp-python-sdk")
14+
15+
16+
@contextmanager
17+
def otel_span(
18+
name: str,
19+
*,
20+
kind: SpanKind,
21+
attributes: dict[str, Any] | None = None,
22+
context: Context | None = None,
23+
) -> Iterator[Any]:
24+
"""Create an OTel span."""
25+
with _tracer.start_as_current_span(name, kind=kind, attributes=attributes, context=context) as span:
26+
yield span
27+
28+
29+
def inject_trace_context(meta: dict[str, Any]) -> None:
30+
"""Inject W3C trace context (traceparent/tracestate) into a `_meta` dict."""
31+
inject(meta)
32+
33+
34+
def extract_trace_context(meta: dict[str, Any]) -> Context:
35+
"""Extract W3C trace context from a `_meta` dict."""
36+
return extract(meta)

src/mcp/shared/session.py

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@
99

1010
import anyio
1111
from anyio.streams.memory import MemoryObjectSendStream
12+
from opentelemetry.trace import SpanKind
1213
from pydantic import BaseModel, TypeAdapter
1314
from typing_extensions import Self
1415

16+
from mcp.shared._otel import inject_trace_context, otel_span
1517
from mcp.shared._stream_protocols import ReadStream, WriteStream
1618
from mcp.shared.exceptions import MCPError
1719
from mcp.shared.message import MessageMetadata, ServerMessageMetadata, SessionMessage
@@ -268,24 +270,36 @@ async def send_request(
268270
self._progress_callbacks[request_id] = progress_callback
269271

270272
try:
271-
jsonrpc_request = JSONRPCRequest(jsonrpc="2.0", id=request_id, **request_data)
272-
await self._write_stream.send(SessionMessage(message=jsonrpc_request, metadata=metadata))
273-
274-
# request read timeout takes precedence over session read timeout
275-
timeout = request_read_timeout_seconds or self._session_read_timeout_seconds
276-
277-
try:
278-
with anyio.fail_after(timeout):
279-
response_or_error = await response_stream_reader.receive()
280-
except TimeoutError:
281-
class_name = request.__class__.__name__
282-
message = f"Timed out while waiting for response to {class_name}. Waited {timeout} seconds."
283-
raise MCPError(code=REQUEST_TIMEOUT, message=message)
284-
285-
if isinstance(response_or_error, JSONRPCError):
286-
raise MCPError.from_jsonrpc_error(response_or_error)
287-
else:
288-
return result_type.model_validate(response_or_error.result, by_name=False)
273+
target = request_data.get("params", {}).get("name")
274+
span_name = f"MCP send {request.method} {target}" if target else f"MCP send {request.method}"
275+
276+
with otel_span(
277+
span_name,
278+
kind=SpanKind.CLIENT,
279+
attributes={"mcp.method.name": request.method, "jsonrpc.request.id": request_id},
280+
):
281+
# Inject W3C trace context into _meta (SEP-414).
282+
meta: dict[str, Any] = request_data.setdefault("params", {}).setdefault("_meta", {})
283+
inject_trace_context(meta)
284+
285+
jsonrpc_request = JSONRPCRequest(jsonrpc="2.0", id=request_id, **request_data)
286+
await self._write_stream.send(SessionMessage(message=jsonrpc_request, metadata=metadata))
287+
288+
# request read timeout takes precedence over session read timeout
289+
timeout = request_read_timeout_seconds or self._session_read_timeout_seconds
290+
291+
try:
292+
with anyio.fail_after(timeout):
293+
response_or_error = await response_stream_reader.receive()
294+
except TimeoutError:
295+
class_name = request.__class__.__name__
296+
message = f"Timed out while waiting for response to {class_name}. Waited {timeout} seconds."
297+
raise MCPError(code=REQUEST_TIMEOUT, message=message)
298+
299+
if isinstance(response_or_error, JSONRPCError):
300+
raise MCPError.from_jsonrpc_error(response_or_error)
301+
else:
302+
return result_type.model_validate(response_or_error.result, by_name=False)
289303

290304
finally:
291305
self._response_streams.pop(request_id, None)

tests/shared/test_otel.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
from __future__ import annotations
2+
3+
import pytest
4+
from logfire.testing import CaptureLogfire
5+
6+
from mcp import types
7+
from mcp.client.client import Client
8+
from mcp.server.mcpserver import MCPServer
9+
10+
pytestmark = pytest.mark.anyio
11+
12+
13+
# Logfire warns about propagated trace context by default (distributed_tracing=None).
14+
# This is expected here since we're testing cross-boundary context propagation.
15+
@pytest.mark.filterwarnings("ignore::RuntimeWarning")
16+
async def test_client_and_server_spans(capfire: CaptureLogfire):
17+
"""Verify that calling a tool produces client and server spans with correct attributes."""
18+
server = MCPServer("test")
19+
20+
@server.tool()
21+
def greet(name: str) -> str:
22+
"""Greet someone."""
23+
return f"Hello, {name}!"
24+
25+
async with Client(server) as client:
26+
result = await client.call_tool("greet", {"name": "World"})
27+
28+
assert isinstance(result.content[0], types.TextContent)
29+
assert result.content[0].text == "Hello, World!"
30+
31+
spans = capfire.exporter.exported_spans_as_dict()
32+
span_names = {s["name"] for s in spans}
33+
34+
assert "MCP send tools/call greet" in span_names
35+
assert "MCP handle tools/call greet" in span_names
36+
37+
client_span = next(s for s in spans if s["name"] == "MCP send tools/call greet")
38+
server_span = next(s for s in spans if s["name"] == "MCP handle tools/call greet")
39+
40+
assert client_span["attributes"]["mcp.method.name"] == "tools/call"
41+
assert server_span["attributes"]["mcp.method.name"] == "tools/call"
42+
43+
# Server span should be in the same trace as the client span (context propagation).
44+
assert server_span["context"]["trace_id"] == client_span["context"]["trace_id"]

0 commit comments

Comments
 (0)