diff --git a/src/langtrace_python_sdk/constants/instrumentation/anthropic.py b/src/langtrace_python_sdk/constants/instrumentation/anthropic.py index 8afadb4e..35d46941 100644 --- a/src/langtrace_python_sdk/constants/instrumentation/anthropic.py +++ b/src/langtrace_python_sdk/constants/instrumentation/anthropic.py @@ -3,4 +3,8 @@ "METHOD": "anthropic.messages.create", "ENDPOINT": "/v1/messages", }, + "MESSAGES_STREAM": { + "METHOD": "anthropic.messages.stream", + "ENDPOINT": "/v1/messages", + }, } diff --git a/src/langtrace_python_sdk/instrumentation/anthropic/instrumentation.py b/src/langtrace_python_sdk/instrumentation/anthropic/instrumentation.py index 8fad6554..eb0edc2f 100644 --- a/src/langtrace_python_sdk/instrumentation/anthropic/instrumentation.py +++ b/src/langtrace_python_sdk/instrumentation/anthropic/instrumentation.py @@ -23,7 +23,7 @@ from opentelemetry.trace import get_tracer from wrapt import wrap_function_wrapper from typing import Any -from langtrace_python_sdk.instrumentation.anthropic.patch import messages_create +from langtrace_python_sdk.instrumentation.anthropic.patch import messages_create, messages_stream logging.basicConfig(level=logging.FATAL) @@ -46,6 +46,12 @@ def _instrument(self, **kwargs: dict[str, Any]) -> None: "Messages.create", messages_create(version, tracer), ) + + wrap_function_wrapper( + "anthropic.resources.messages", + "Messages.stream", + messages_stream(version, tracer), + ) def _instrument_module(self, module_name: str) -> None: pass diff --git a/src/langtrace_python_sdk/instrumentation/anthropic/patch.py b/src/langtrace_python_sdk/instrumentation/anthropic/patch.py index 183c2dd0..df86b66e 100644 --- a/src/langtrace_python_sdk/instrumentation/anthropic/patch.py +++ b/src/langtrace_python_sdk/instrumentation/anthropic/patch.py @@ -56,9 +56,7 @@ def traced_method( prompts = kwargs.get("messages", []) system = kwargs.get("system") if system: - prompts = [{"role": "system", "content": system}] + kwargs.get( - "messages", [] - ) + prompts.append({"role": "system", "content": system}) span_attributes = { **get_langtrace_attributes(version, service_provider), **get_llm_request_attributes(kwargs, prompts=prompts), @@ -72,7 +70,14 @@ def traced_method( span = tracer.start_span( name=get_span_name(APIS["MESSAGES_CREATE"]["METHOD"]), kind=SpanKind.CLIENT ) + set_span_attributes(span, attributes) + + tools = [] + if kwargs.get("tools") is not None and kwargs.get("tools"): + tools.append(json.dumps(kwargs.get("tools"))) + set_span_attribute(span, SpanAttributes.LLM_TOOLS, json.dumps(tools)) + try: # Attempt to call the original method result = wrapped(*args, **kwargs) @@ -127,7 +132,149 @@ def set_response_attributes( span.end() return result else: - return StreamWrapper(result, span) + return StreamWrapper(result, span, tool_calls=True) # return the wrapped method return traced_method + + +def messages_stream(version: str, tracer: Tracer) -> Callable[..., Any]: + + def traced_method( + wrapped: Callable[..., Any], + instance: Any, + args: List[Any], + kwargs: MessagesCreateKwargs, + ) -> Any: + service_provider = SERVICE_PROVIDERS["ANTHROPIC"] + + prompts = kwargs.get("messages", []) + system = kwargs.get("system") + if system: + prompts.append({"role": "assistant", "content": system}) + span_attributes = { + **get_langtrace_attributes(version, service_provider), + **get_llm_request_attributes(kwargs, prompts=prompts), + **get_llm_url(instance), + SpanAttributes.LLM_PATH: APIS["MESSAGES_STREAM"]["ENDPOINT"], + **get_extra_attributes(), + } + + attributes = LLMSpanAttributes(**span_attributes) + + span = tracer.start_span( + name=get_span_name(APIS["MESSAGES_STREAM"]["METHOD"]), kind=SpanKind.CLIENT + ) + + set_span_attributes(span, attributes) + + tools = [] + if kwargs.get("tools") is not None: + tools.append(json.dumps(kwargs.get("tools"))) + set_span_attribute(span, SpanAttributes.LLM_TOOLS, json.dumps(tools)) + + try: + # Create the original message stream manager + original_stream_manager = wrapped(*args, **kwargs) + + # Create a new stream manager that will instrument the stream + # while preserving the stream + class InstrumentedMessageStreamManager: + def __init__(self, original_manager, span): + self.original_manager = original_manager + self.span = span + + def __enter__(self): + # Enter the original context manager to get the stream + original_stream = self.original_manager.__enter__() + + # Create a wrapper iterator + class InstrumentedStream: + def __init__(self, original_stream, span): + self.original_stream = original_stream + self.span = span + self.message_stop_processed = False + + def __iter__(self): + return self + + def __next__(self): + try: + chunk = next(self.original_stream) + + # Apply instrumentation only once on message_stop + if chunk.type == "message_stop" and not self.message_stop_processed: + self.message_stop_processed = True + response_message = chunk.message + + responses = [ + { + "role": ( + response_message.role + if response_message.role + else "assistant" + ), + "content": message.text, + } + for message in response_message.content if message.type == "text" + ] + + set_event_completion(self.span, responses) + + if hasattr(response_message, "usage") and response_message.usage is not None: + set_span_attribute( + self.span, + SpanAttributes.LLM_USAGE_PROMPT_TOKENS, + response_message.usage.input_tokens, + ) + set_span_attribute( + self.span, + SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, + response_message.usage.output_tokens, + ) + set_span_attribute( + self.span, + SpanAttributes.LLM_USAGE_TOTAL_TOKENS, + response_message.usage.input_tokens + response_message.usage.output_tokens, + ) + + # Forward the chunk + return chunk + except StopIteration: + # End the span when we're done with the stream + self.span.end() + raise + except Exception as err: + self.span.record_exception(err) + self.span.set_status(StatusCode.ERROR, str(err)) + self.span.end() + raise + + def close(self): + self.original_stream.close() + if not self.message_stop_processed: + self.span.end() + + # Return our instrumented stream wrapper + return InstrumentedStream(original_stream, self.span) + + def __exit__(self, exc_type, exc_val, exc_tb): + result = self.original_manager.__exit__(exc_type, exc_val, exc_tb) + + if exc_type is not None: + self.span.record_exception(exc_val) + self.span.set_status(StatusCode.ERROR, str(exc_val)) + self.span.end() + + return result + + # Return the instrumented stream manager + return InstrumentedMessageStreamManager(original_stream_manager, span) + + except Exception as err: + span.record_exception(err) + span.set_status(StatusCode.ERROR, str(err)) + span.end() + raise + + return traced_method diff --git a/src/langtrace_python_sdk/version.py b/src/langtrace_python_sdk/version.py index 351a764c..24321519 100644 --- a/src/langtrace_python_sdk/version.py +++ b/src/langtrace_python_sdk/version.py @@ -1 +1 @@ -__version__ = "3.8.16" +__version__ = "3.8.17"