88
99import anyio
1010from anyio .streams .memory import MemoryObjectReceiveStream , MemoryObjectSendStream
11+ from opentelemetry .propagate import inject
1112from pydantic import BaseModel , TypeAdapter
1213from typing_extensions import Self
1314
@@ -248,6 +249,9 @@ async def send_request(
248249 response_stream , response_stream_reader = anyio .create_memory_object_stream [JSONRPCResponse | JSONRPCError ](1 )
249250 self ._response_streams [request_id ] = response_stream
250251
252+ # Propagate opentelemetry trace context
253+ self ._inject_otel_context (request )
254+
251255 # Set up progress token if progress callback is provided
252256 request_data = request .model_dump (by_alias = True , mode = "json" , exclude_none = True )
253257 if progress_callback is not None :
@@ -292,6 +296,10 @@ async def send_notification(
292296 related_request_id : RequestId | None = None ,
293297 ) -> None :
294298 """Emits a notification, which is a one-way message that does not expect a response."""
299+
300+ # Propagate opentelemetry trace context
301+ self ._inject_otel_context (notification )
302+
295303 # Some transport implementations may need to set the related_request_id
296304 # to attribute to the notifications to the request that triggered them.
297305 jsonrpc_notification = JSONRPCNotification (
@@ -304,6 +312,28 @@ async def send_notification(
304312 )
305313 await self ._write_stream .send (session_message )
306314
315+ def _inject_otel_context (self , request : SendRequestT | SendNotificationT ) -> None :
316+ """Propagate OpenTelemetry context in _meta.
317+
318+ See
319+ - SEP414 https://github.com/modelcontextprotocol/modelcontextprotocol/pull/414
320+ - OpenTelemetry semantic conventions
321+ https://github.com/open-telemetry/semantic-conventions/blob/v1.39.0/docs/gen-ai/mcp.md
322+ """
323+
324+ if request .params is None :
325+ return
326+
327+ carrier : RequestParamsMeta = {}
328+ inject (carrier )
329+ if not carrier :
330+ return
331+
332+ if request .params .meta is None :
333+ request .params .meta = {}
334+
335+ request .params .meta .update (carrier )
336+
307337 async def _send_response (self , request_id : RequestId , response : SendResultT | ErrorData ) -> None :
308338 if isinstance (response , ErrorData ):
309339 jsonrpc_error = JSONRPCError (jsonrpc = "2.0" , id = request_id , error = response )
0 commit comments