diff --git a/.gitignore b/.gitignore index c7bf4895..57f9e81c 100644 --- a/.gitignore +++ b/.gitignore @@ -163,3 +163,4 @@ chroma.sqlite3 #.idea/ logs/ +playground/ diff --git a/src/langtrace_python_sdk/instrumentation/openai/instrumentation.py b/src/langtrace_python_sdk/instrumentation/openai/instrumentation.py index 35569e59..27fd2964 100644 --- a/src/langtrace_python_sdk/instrumentation/openai/instrumentation.py +++ b/src/langtrace_python_sdk/instrumentation/openai/instrumentation.py @@ -24,6 +24,8 @@ async_embeddings_create, async_images_generate, chat_completions_create, + openai_responses_create, + async_openai_responses_create, embeddings_create, images_edit, images_generate, @@ -32,7 +34,7 @@ logging.basicConfig(level=logging.FATAL) -class OpenAIInstrumentation(BaseInstrumentor): # type: ignore +class OpenAIInstrumentation(BaseInstrumentor): # type: ignore def instrumentation_dependencies(self) -> Collection[str]: return ["openai >= 0.27.0", "trace-attributes >= 4.0.5"] @@ -54,6 +56,18 @@ def _instrument(self, **kwargs: Any) -> None: async_chat_completions_create(version, tracer), ) + wrap_function_wrapper( + "openai.resources.responses", + "AsyncResponses.create", + async_openai_responses_create(version, tracer), + ) + + wrap_function_wrapper( + "openai.resources.responses", + "Responses.create", + openai_responses_create(version, tracer), + ) + wrap_function_wrapper( "openai.resources.images", "Images.generate", diff --git a/src/langtrace_python_sdk/instrumentation/openai/patch.py b/src/langtrace_python_sdk/instrumentation/openai/patch.py index 7c9d43b3..5d642a71 100644 --- a/src/langtrace_python_sdk/instrumentation/openai/patch.py +++ b/src/langtrace_python_sdk/instrumentation/openai/patch.py @@ -7,27 +7,121 @@ from opentelemetry.trace.propagation import set_span_in_context from opentelemetry.trace.status import Status, StatusCode -from langtrace_python_sdk.constants.instrumentation.common import \ - SERVICE_PROVIDERS +from langtrace_python_sdk.constants.instrumentation.common import SERVICE_PROVIDERS from langtrace_python_sdk.constants.instrumentation.openai import APIS from langtrace_python_sdk.instrumentation.openai.types import ( - ChatCompletionsCreateKwargs, ContentItem, EmbeddingsCreateKwargs, - ImagesEditKwargs, ImagesGenerateKwargs, ResultType) + ChatCompletionsCreateKwargs, + ContentItem, + EmbeddingsCreateKwargs, + ImagesEditKwargs, + ImagesGenerateKwargs, + ResultType, +) from langtrace_python_sdk.types import NOT_GIVEN from langtrace_python_sdk.utils import set_span_attribute -from langtrace_python_sdk.utils.llm import (StreamWrapper, - calculate_prompt_tokens, - get_base_url, get_extra_attributes, - get_langtrace_attributes, - get_llm_request_attributes, - get_llm_url, get_span_name, - get_tool_calls, is_streaming, - set_event_completion, - set_span_attributes, - set_usage_attributes) +from langtrace_python_sdk.utils.llm import ( + StreamWrapper, + calculate_prompt_tokens, + get_base_url, + get_extra_attributes, + get_langtrace_attributes, + get_llm_request_attributes, + get_llm_url, + get_span_name, + get_tool_calls, + is_streaming, + set_event_completion, + set_span_attributes, + set_usage_attributes, +) from langtrace_python_sdk.utils.silently_fail import silently_fail +def async_openai_responses_create(version: str, tracer: Tracer) -> Callable: + """Wrap the `create` method of the `openai.AsyncResponse.create` class to trace it.""" + + async def traced_method( + wrapped: Callable, instance: Any, args: List[Any], kwargs: Dict[str, Any] + ): + input_value = kwargs.get("input") + prompt = ( + input_value[0] + if isinstance(input_value, list) + else [{"role": "user", "content": input_value}] + ) + service_provider = SERVICE_PROVIDERS["OPENAI"] + span_attributes = { + "instructions": kwargs.get("instructions"), + **get_langtrace_attributes(version, service_provider, vendor_type="llm"), + **get_llm_request_attributes( + kwargs, + operation_name="openai.responses.create", + prompts=prompt, + ), + } + with tracer.start_as_current_span( + name="openai.responses.create", + kind=SpanKind.CLIENT, + context=set_span_in_context(trace.get_current_span()), + ) as span: + try: + set_span_attributes(span, span_attributes) + + response = await wrapped(*args, **kwargs) + _set_openai_agentic_response_attributes(span, response) + + return response + except Exception as err: + span.record_exception(err) + raise + + return traced_method + + +def openai_responses_create(version: str, tracer: Tracer) -> Callable: + """Wrap the `create` method of the `openai.responses.create` class to trace it.""" + + def traced_method( + wrapped: Callable, instance: Any, args: List[Any], kwargs: Dict[str, Any] + ): + input_value = kwargs.get("input") + prompt = ( + input_value[0] + if isinstance(input_value, list) + else [{"role": "user", "content": input_value}] + ) + service_provider = SERVICE_PROVIDERS["OPENAI"] + span_attributes = { + "instructions": kwargs.get("instructions"), + **get_langtrace_attributes(version, service_provider, vendor_type="llm"), + **get_llm_request_attributes( + kwargs, + operation_name="openai.responses.create", + prompts=prompt, + ), + } + with tracer.start_as_current_span( + name="openai.responses.create", + kind=SpanKind.CLIENT, + context=set_span_in_context(trace.get_current_span()), + end_on_exit=False, + ) as span: + try: + set_span_attributes(span, span_attributes) + + response = wrapped(*args, **kwargs) + if is_streaming(kwargs) and span.is_recording(): + return StreamWrapper(response, span) + else: + _set_openai_agentic_response_attributes(span, response) + return response + except Exception as err: + span.record_exception(err) + raise + + return traced_method + + def filter_valid_attributes(attributes): """Filter attributes where value is not None, not an empty string, and not openai.NOT_GIVEN.""" return { @@ -634,6 +728,21 @@ def extract_content(choice: Any) -> Union[str, List[Dict[str, Any]], Dict[str, A return "" +def _set_openai_agentic_response_attributes(span: Span, response) -> None: + set_span_attribute(span, SpanAttributes.LLM_RESPONSE_ID, response.id) + set_span_attribute(span, SpanAttributes.LLM_RESPONSE_MODEL, response.model) + set_event_completion(span, [{"role": "assistant", "content": response.output_text}]) + set_usage_attributes( + span, + { + "input_tokens": response.usage.input_tokens, + "output_tokens": response.usage.output_tokens, + "total_tokens": response.usage.total_tokens, + "cached_tokens": response.usage.input_tokens_details["cached_tokens"], + }, + ) + + @silently_fail def _set_input_attributes( span: Span, kwargs: ChatCompletionsCreateKwargs, attributes: LLMSpanAttributes @@ -707,5 +816,9 @@ def _set_response_attributes(span: Span, result: ResultType) -> None: set_span_attribute( span, "gen_ai.usage.cached_tokens", - result.usage.prompt_tokens_details.cached_tokens if result.usage.prompt_tokens_details else 0, + ( + result.usage.prompt_tokens_details.cached_tokens + if result.usage.prompt_tokens_details + else 0 + ), ) diff --git a/src/langtrace_python_sdk/utils/llm.py b/src/langtrace_python_sdk/utils/llm.py index c2090156..d35e618d 100644 --- a/src/langtrace_python_sdk/utils/llm.py +++ b/src/langtrace_python_sdk/utils/llm.py @@ -96,22 +96,22 @@ def calculate_price_from_usage(model, usage): def convert_mistral_messages_to_serializable(mistral_messages): serializable_messages = [] - + try: for message in mistral_messages: serializable_message = {"role": message.role} - + # Handle content if hasattr(message, "content"): serializable_message["content"] = message.content - + # Handle tool_calls if hasattr(message, "tool_calls") and message.tool_calls is not None: serializable_tool_calls = [] - + for tool_call in message.tool_calls: serializable_tool_call = {} - + # Handle id, type, and index if hasattr(tool_call, "id"): serializable_tool_call["id"] = tool_call.id @@ -119,111 +119,117 @@ def convert_mistral_messages_to_serializable(mistral_messages): serializable_tool_call["type"] = tool_call.type if hasattr(tool_call, "index"): serializable_tool_call["index"] = tool_call.index - + # Handle function if hasattr(tool_call, "function"): function_call = tool_call.function serializable_function = {} - + if hasattr(function_call, "name"): serializable_function["name"] = function_call.name if hasattr(function_call, "arguments"): serializable_function["arguments"] = function_call.arguments - + serializable_tool_call["function"] = serializable_function - + serializable_tool_calls.append(serializable_tool_call) - + serializable_message["tool_calls"] = serializable_tool_calls - + # Handle tool_call_id for tool messages if hasattr(message, "tool_call_id"): serializable_message["tool_call_id"] = message.tool_call_id - + serializable_messages.append(serializable_message) except Exception as e: pass - + return serializable_messages def convert_gemini_messages_to_serializable(formatted_messages, system_message=None): """ Converts Gemini-formatted messages back to a JSON serializable format. - + Args: formatted_messages: The formatted messages from Gemini. system_message (str, optional): System message content. - + Returns: List[dict]: JSON serializable list of message dictionaries. """ serializable_messages = [] - + try: # Add system message if present if system_message: - serializable_messages.append({ - "role": "system", - "content": system_message - }) - + serializable_messages.append({"role": "system", "content": system_message}) + for message_item in formatted_messages: # Handle the case where the item is a dict with 'role' and 'content' keys - if isinstance(message_item, dict) and 'role' in message_item and 'content' in message_item: - role = message_item['role'] - content_value = message_item['content'] - + if ( + isinstance(message_item, dict) + and "role" in message_item + and "content" in message_item + ): + role = message_item["role"] + content_value = message_item["content"] + # Initialize our serializable message serializable_message = {"role": role} - + # If content is a list of Content objects if isinstance(content_value, list) and len(content_value) > 0: for content_obj in content_value: # Process each Content object - if hasattr(content_obj, 'parts') and hasattr(content_obj, 'role'): + if hasattr(content_obj, "parts") and hasattr( + content_obj, "role" + ): parts = content_obj.parts - + # Extract text from parts text_parts = [] for part in parts: - if hasattr(part, 'text') and part.text: + if hasattr(part, "text") and part.text: text_parts.append(part.text) - + if text_parts: serializable_message["content"] = " ".join(text_parts) - + # Here you can add additional processing for other part types # like function_call, function_response, inline_data, etc. # Similar to the previous implementation - + # If content is a string or already a primitive type - elif isinstance(content_value, (str, int, float, bool)) or content_value is None: + elif ( + isinstance(content_value, (str, int, float, bool)) + or content_value is None + ): serializable_message["content"] = content_value - + # Add the processed message to our list serializable_messages.append(serializable_message) - + # Handle the case where the item is a Content object directly - elif hasattr(message_item, 'role') and hasattr(message_item, 'parts'): + elif hasattr(message_item, "role") and hasattr(message_item, "parts"): # This is the case from the previous implementation # Process a Content object directly serializable_message = {"role": message_item.role} - + parts = message_item.parts text_parts = [] - + for part in parts: - if hasattr(part, 'text') and part.text: + if hasattr(part, "text") and part.text: text_parts.append(part.text) - + if text_parts: serializable_message["content"] = " ".join(text_parts) - + serializable_messages.append(serializable_message) except Exception as e: pass - + return serializable_messages @@ -253,7 +259,7 @@ def get_llm_request_attributes(kwargs, prompts=None, model=None, operation_name= or kwargs.get("top_k", None) or kwargs.get("top_n", None) ) - + try: prompts = json.dumps(prompts) if prompts else None except Exception as e: @@ -261,9 +267,13 @@ def get_llm_request_attributes(kwargs, prompts=None, model=None, operation_name= # check model if kwargs.get("model") is not None: if kwargs.get("model").startswith("gemini"): - prompts = json.dumps(convert_gemini_messages_to_serializable(prompts)) + prompts = json.dumps( + convert_gemini_messages_to_serializable(prompts) + ) elif kwargs.get("model").startswith("mistral"): - prompts = json.dumps(convert_mistral_messages_to_serializable(prompts)) + prompts = json.dumps( + convert_mistral_messages_to_serializable(prompts) + ) else: prompts = "[]" else: @@ -427,6 +437,7 @@ def cleanup(self): "".join(self.result_content), response_model ) if self._span_started: + print("SPAAN", self.span) set_span_attribute( self.span, SpanAttributes.LLM_RESPONSE_MODEL, @@ -570,6 +581,9 @@ def build_streaming_response(self, chunk): and not hasattr(chunk.delta, "message") ): content = [chunk.delta.text] if hasattr(chunk.delta, "text") else [] + # OpenAI Responses API + if hasattr(chunk, "type") and chunk.type == "response.completed": + content = [chunk.response.output_text] if isinstance(chunk, dict): if "message" in chunk: @@ -579,7 +593,11 @@ def build_streaming_response(self, chunk): self.result_content.append(content[0]) def set_usage_attributes(self, chunk): - + # Responses API OpenAI + if hasattr(chunk, "type") and chunk.type == "response.completed": + usage = chunk.response.usage + self.completion_tokens = usage.output_tokens + self.prompt_tokens = usage.input_tokens # Anthropic & OpenAI if hasattr(chunk, "type") and chunk.type == "message_start": if hasattr(chunk.message, "usage") and chunk.message.usage is not None: @@ -630,6 +648,7 @@ def process_chunk(self, chunk): and chunk.data.choices is not None ): chunk = chunk.data + self.set_response_model(chunk=chunk) self.build_streaming_response(chunk=chunk) self.set_usage_attributes(chunk=chunk) diff --git a/src/langtrace_python_sdk/version.py b/src/langtrace_python_sdk/version.py index f64508ee..08f7211d 100644 --- a/src/langtrace_python_sdk/version.py +++ b/src/langtrace_python_sdk/version.py @@ -1 +1 @@ -__version__ = "3.8.6" +__version__ = "3.8.7"