88
99import anyio
1010from anyio .streams .memory import MemoryObjectReceiveStream , MemoryObjectSendStream
11+ from opentelemetry .propagate import inject
1112from pydantic import BaseModel , TypeAdapter
1213from typing_extensions import Self
1314
@@ -251,6 +252,9 @@ async def send_request(
251252 response_stream , response_stream_reader = anyio .create_memory_object_stream [JSONRPCResponse | JSONRPCError ](1 )
252253 self ._response_streams [request_id ] = response_stream
253254
255+ # Propagate opentelemetry trace context
256+ self ._inject_otel_context (request )
257+
254258 # Set up progress token if progress callback is provided
255259 request_data = request .model_dump (by_alias = True , mode = "json" , exclude_none = True )
256260 if progress_callback is not None :
@@ -295,6 +299,10 @@ async def send_notification(
295299 related_request_id : RequestId | None = None ,
296300 ) -> None :
297301 """Emits a notification, which is a one-way message that does not expect a response."""
302+
303+ # Propagate opentelemetry trace context
304+ self ._inject_otel_context (notification )
305+
298306 # Some transport implementations may need to set the related_request_id
299307 # to attribute to the notifications to the request that triggered them.
300308 jsonrpc_notification = JSONRPCNotification (
@@ -307,6 +315,28 @@ async def send_notification(
307315 )
308316 await self ._write_stream .send (session_message )
309317
318+ def _inject_otel_context (self , request : SendRequestT | SendNotificationT ) -> None :
319+ """Propagate OpenTelemetry context in `_meta`.
320+
321+ See
322+ - SEP414 https://github.com/modelcontextprotocol/modelcontextprotocol/pull/414
323+ - OpenTelemetry semantic conventions
324+ https://github.com/open-telemetry/semantic-conventions/blob/v1.39.0/docs/gen-ai/mcp.md
325+ """
326+
327+ if request .params is None :
328+ return
329+
330+ carrier : RequestParamsMeta = {}
331+ inject (carrier )
332+ if not carrier :
333+ return
334+
335+ if request .params .meta is None :
336+ request .params .meta = {}
337+
338+ request .params .meta .update (carrier )
339+
310340 async def _send_response (self , request_id : RequestId , response : SendResultT | ErrorData ) -> None :
311341 if isinstance (response , ErrorData ):
312342 jsonrpc_error = JSONRPCError (jsonrpc = "2.0" , id = request_id , error = response )
0 commit comments