From 73df1167d56938f5c8d68eded4ff8d1903666f78 Mon Sep 17 00:00:00 2001 From: Karthik Kalyanaraman <105607645+karthikscale3@users.noreply.github.com> Date: Thu, 13 Mar 2025 10:51:00 -0700 Subject: [PATCH 1/5] Make openai agents dep optional (#501) --- .../instrumentation/openai_agents/patch.py | 44 +++++++++++++++---- src/langtrace_python_sdk/version.py | 2 +- 2 files changed, 37 insertions(+), 9 deletions(-) diff --git a/src/langtrace_python_sdk/instrumentation/openai_agents/patch.py b/src/langtrace_python_sdk/instrumentation/openai_agents/patch.py index a9ab610b..4f059b73 100644 --- a/src/langtrace_python_sdk/instrumentation/openai_agents/patch.py +++ b/src/langtrace_python_sdk/instrumentation/openai_agents/patch.py @@ -1,9 +1,6 @@ import json from typing import Any, Callable, List -from agents.exceptions import (InputGuardrailTripwireTriggered, - OutputGuardrailTripwireTriggered) -from agents.run import Runner from importlib_metadata import version as v from langtrace.trace_attributes import FrameworkSpanAttributes, SpanAttributes from opentelemetry import baggage, trace @@ -18,6 +15,29 @@ set_usage_attributes) +# Define dummy classes to use when imports fail +class DummyRunner: + pass + + +class DummyException(Exception): + pass + + +# Try importing from openai-agents package +try: + from agents.exceptions import (InputGuardrailTripwireTriggered, + OutputGuardrailTripwireTriggered) + from agents.run import Runner + OPENAI_AGENTS_AVAILABLE = True +except ImportError: + # Define dummy classes if imports fail + InputGuardrailTripwireTriggered = DummyException + OutputGuardrailTripwireTriggered = DummyException + Runner = DummyRunner + OPENAI_AGENTS_AVAILABLE = False + + def extract_agent_details(agent_or_handoff): """Extract relevant details from an agent/handoff and its handoffs.""" try: @@ -70,6 +90,10 @@ def extract_handoff_details(handoff): def get_handoffs(version: str, tracer: Tracer) -> Callable: """Wrap the `prompt` method of the `TLM` class to trace it.""" + if not OPENAI_AGENTS_AVAILABLE: + def noop_traced_method(wrapped: Callable, instance: Any, args: List[Any], kwargs: Any) -> Any: + return wrapped(*args, **kwargs) + return noop_traced_method def traced_method( wrapped: Callable, @@ -117,7 +141,8 @@ def traced_method( attributes = FrameworkSpanAttributes(**span_attributes) with tracer.start_as_current_span( - name=f"openai_agents.available_handoffs", kind=SpanKind.CLIENT + name="openai_agents.available_handoffs", + kind=SpanKind.CLIENT ) as span: try: set_span_attributes(span, attributes) @@ -157,12 +182,11 @@ def traced_method( pass # Silently fail if error recording fails raise # Re-raise the original error since it's from the wrapped function - except Exception as outer_err: - # If anything fails in our instrumentation wrapper, catch it and return control to the wrapped function + except Exception: try: return wrapped(*args, **kwargs) except Exception as wrapped_err: - raise wrapped_err # Only raise errors from the wrapped function + raise wrapped_err return traced_method @@ -328,6 +352,10 @@ def extract_run_config(config): def get_new_response(version: str, tracer: Tracer) -> Callable: """Wrap the _get_new_response method to trace inputs and outputs.""" + if not OPENAI_AGENTS_AVAILABLE: + async def noop_traced_method(wrapped: Callable, instance: Any, args: List[Any], kwargs: Any) -> Any: + return await wrapped(*args, **kwargs) + return noop_traced_method async def traced_method( wrapped: Callable, @@ -524,7 +552,7 @@ async def traced_method( raise - except Exception as outer_err: + except Exception: # Remove outer_err since it's unused try: return await wrapped(*args, **kwargs) except Exception as wrapped_err: diff --git a/src/langtrace_python_sdk/version.py b/src/langtrace_python_sdk/version.py index 56d5ad1d..f64508ee 100644 --- a/src/langtrace_python_sdk/version.py +++ b/src/langtrace_python_sdk/version.py @@ -1 +1 @@ -__version__ = "3.8.5" +__version__ = "3.8.6" From db20e1538fd404d921c1302ef47d3a11543470ff Mon Sep 17 00:00:00 2001 From: Ali Waleed Date: Tue, 18 Mar 2025 16:32:47 +0200 Subject: [PATCH 2/5] Support responses api --- .gitignore | 1 + .../instrumentation/openai/instrumentation.py | 16 +- .../instrumentation/openai/patch.py | 142 ++++++++++++++++-- 3 files changed, 143 insertions(+), 16 deletions(-) diff --git a/.gitignore b/.gitignore index c7bf4895..57f9e81c 100644 --- a/.gitignore +++ b/.gitignore @@ -163,3 +163,4 @@ chroma.sqlite3 #.idea/ logs/ +playground/ diff --git a/src/langtrace_python_sdk/instrumentation/openai/instrumentation.py b/src/langtrace_python_sdk/instrumentation/openai/instrumentation.py index 35569e59..27fd2964 100644 --- a/src/langtrace_python_sdk/instrumentation/openai/instrumentation.py +++ b/src/langtrace_python_sdk/instrumentation/openai/instrumentation.py @@ -24,6 +24,8 @@ async_embeddings_create, async_images_generate, chat_completions_create, + openai_responses_create, + async_openai_responses_create, embeddings_create, images_edit, images_generate, @@ -32,7 +34,7 @@ logging.basicConfig(level=logging.FATAL) -class OpenAIInstrumentation(BaseInstrumentor): # type: ignore +class OpenAIInstrumentation(BaseInstrumentor): # type: ignore def instrumentation_dependencies(self) -> Collection[str]: return ["openai >= 0.27.0", "trace-attributes >= 4.0.5"] @@ -54,6 +56,18 @@ def _instrument(self, **kwargs: Any) -> None: async_chat_completions_create(version, tracer), ) + wrap_function_wrapper( + "openai.resources.responses", + "AsyncResponses.create", + async_openai_responses_create(version, tracer), + ) + + wrap_function_wrapper( + "openai.resources.responses", + "Responses.create", + openai_responses_create(version, tracer), + ) + wrap_function_wrapper( "openai.resources.images", "Images.generate", diff --git a/src/langtrace_python_sdk/instrumentation/openai/patch.py b/src/langtrace_python_sdk/instrumentation/openai/patch.py index 7c9d43b3..b4b7c43a 100644 --- a/src/langtrace_python_sdk/instrumentation/openai/patch.py +++ b/src/langtrace_python_sdk/instrumentation/openai/patch.py @@ -7,27 +7,120 @@ from opentelemetry.trace.propagation import set_span_in_context from opentelemetry.trace.status import Status, StatusCode -from langtrace_python_sdk.constants.instrumentation.common import \ - SERVICE_PROVIDERS +from langtrace_python_sdk.constants.instrumentation.common import SERVICE_PROVIDERS from langtrace_python_sdk.constants.instrumentation.openai import APIS from langtrace_python_sdk.instrumentation.openai.types import ( - ChatCompletionsCreateKwargs, ContentItem, EmbeddingsCreateKwargs, - ImagesEditKwargs, ImagesGenerateKwargs, ResultType) + ChatCompletionsCreateKwargs, + ContentItem, + EmbeddingsCreateKwargs, + ImagesEditKwargs, + ImagesGenerateKwargs, + ResultType, +) from langtrace_python_sdk.types import NOT_GIVEN from langtrace_python_sdk.utils import set_span_attribute -from langtrace_python_sdk.utils.llm import (StreamWrapper, - calculate_prompt_tokens, - get_base_url, get_extra_attributes, - get_langtrace_attributes, - get_llm_request_attributes, - get_llm_url, get_span_name, - get_tool_calls, is_streaming, - set_event_completion, - set_span_attributes, - set_usage_attributes) +from langtrace_python_sdk.utils.llm import ( + StreamWrapper, + calculate_prompt_tokens, + get_base_url, + get_extra_attributes, + get_langtrace_attributes, + get_llm_request_attributes, + get_llm_url, + get_span_name, + get_tool_calls, + is_streaming, + set_event_completion, + set_span_attributes, + set_usage_attributes, +) from langtrace_python_sdk.utils.silently_fail import silently_fail +def async_openai_responses_create(version: str, tracer: Tracer) -> Callable: + """Wrap the `create` method of the `openai.AsyncResponse.create` class to trace it.""" + + async def traced_method( + wrapped: Callable, instance: Any, args: List[Any], kwargs: Dict[str, Any] + ): + input_value = kwargs.get("input") + prompt = ( + input_value[0] + if isinstance(input_value, list) + else [{"role": "user", "content": input_value}] + ) + service_provider = SERVICE_PROVIDERS["OPENAI"] + span_attributes = { + "instructions": kwargs.get("instructions"), + **get_langtrace_attributes(version, service_provider, vendor_type="llm"), + **get_llm_request_attributes( + kwargs, + operation_name="openai.responses.create", + prompts=prompt, + ), + } + with tracer.start_as_current_span( + name="openai.responses.create", + kind=SpanKind.CLIENT, + context=set_span_in_context(trace.get_current_span()), + ) as span: + try: + set_span_attributes(span, span_attributes) + + response = await wrapped(*args, **kwargs) + _set_openai_agentic_response_attributes(span, response) + + return response + except Exception as err: + span.record_exception(err) + raise + + return traced_method + + +def openai_responses_create(version: str, tracer: Tracer) -> Callable: + """Wrap the `create` method of the `openai.responses.create` class to trace it.""" + + def traced_method( + wrapped: Callable, instance: Any, args: List[Any], kwargs: Dict[str, Any] + ): + input_value = kwargs.get("input") + prompt = ( + input_value[0] + if isinstance(input_value, list) + else [{"role": "user", "content": input_value}] + ) + service_provider = SERVICE_PROVIDERS["OPENAI"] + span_attributes = { + "instructions": kwargs.get("instructions"), + **get_langtrace_attributes(version, service_provider, vendor_type="llm"), + **get_llm_request_attributes( + kwargs, + operation_name="openai.responses.create", + prompts=prompt, + ), + } + with tracer.start_as_current_span( + name="openai.responses.create", + kind=SpanKind.CLIENT, + context=set_span_in_context(trace.get_current_span()), + ) as span: + try: + set_span_attributes(span, span_attributes) + + response = wrapped(*args, **kwargs) + _set_openai_agentic_response_attributes(span, response) + + print("3. Response", response) + + return response + except Exception as err: + span.record_exception(err) + raise + + return traced_method + + def filter_valid_attributes(attributes): """Filter attributes where value is not None, not an empty string, and not openai.NOT_GIVEN.""" return { @@ -634,6 +727,21 @@ def extract_content(choice: Any) -> Union[str, List[Dict[str, Any]], Dict[str, A return "" +def _set_openai_agentic_response_attributes(span: Span, response) -> None: + set_span_attribute(span, SpanAttributes.LLM_RESPONSE_ID, response.id) + set_span_attribute(span, SpanAttributes.LLM_RESPONSE_MODEL, response.model) + set_event_completion(span, [{"role": "assistant", "content": response.output_text}]) + set_usage_attributes( + span, + { + "input_tokens": response.usage.input_tokens, + "output_tokens": response.usage.output_tokens, + "total_tokens": response.usage.total_tokens, + "cached_tokens": response.usage.input_tokens_details["cached_tokens"], + }, + ) + + @silently_fail def _set_input_attributes( span: Span, kwargs: ChatCompletionsCreateKwargs, attributes: LLMSpanAttributes @@ -707,5 +815,9 @@ def _set_response_attributes(span: Span, result: ResultType) -> None: set_span_attribute( span, "gen_ai.usage.cached_tokens", - result.usage.prompt_tokens_details.cached_tokens if result.usage.prompt_tokens_details else 0, + ( + result.usage.prompt_tokens_details.cached_tokens + if result.usage.prompt_tokens_details + else 0 + ), ) From 94fb114558cd10f8bd3f8647b6e92b22aec46011 Mon Sep 17 00:00:00 2001 From: Ali Waleed Date: Tue, 18 Mar 2025 16:33:02 +0200 Subject: [PATCH 3/5] bump --- src/langtrace_python_sdk/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/langtrace_python_sdk/version.py b/src/langtrace_python_sdk/version.py index f64508ee..08f7211d 100644 --- a/src/langtrace_python_sdk/version.py +++ b/src/langtrace_python_sdk/version.py @@ -1 +1 @@ -__version__ = "3.8.6" +__version__ = "3.8.7" From 386541bc24422951181621ad361b2ecb3c9aa244 Mon Sep 17 00:00:00 2001 From: Ali Waleed Date: Tue, 18 Mar 2025 17:09:48 +0200 Subject: [PATCH 4/5] handle streaming --- .../instrumentation/openai/patch.py | 9 +- src/langtrace_python_sdk/utils/llm.py | 111 ++++++++++-------- 2 files changed, 70 insertions(+), 50 deletions(-) diff --git a/src/langtrace_python_sdk/instrumentation/openai/patch.py b/src/langtrace_python_sdk/instrumentation/openai/patch.py index b4b7c43a..5d642a71 100644 --- a/src/langtrace_python_sdk/instrumentation/openai/patch.py +++ b/src/langtrace_python_sdk/instrumentation/openai/patch.py @@ -104,15 +104,16 @@ def traced_method( name="openai.responses.create", kind=SpanKind.CLIENT, context=set_span_in_context(trace.get_current_span()), + end_on_exit=False, ) as span: try: set_span_attributes(span, span_attributes) response = wrapped(*args, **kwargs) - _set_openai_agentic_response_attributes(span, response) - - print("3. Response", response) - + if is_streaming(kwargs) and span.is_recording(): + return StreamWrapper(response, span) + else: + _set_openai_agentic_response_attributes(span, response) return response except Exception as err: span.record_exception(err) diff --git a/src/langtrace_python_sdk/utils/llm.py b/src/langtrace_python_sdk/utils/llm.py index c2090156..d35e618d 100644 --- a/src/langtrace_python_sdk/utils/llm.py +++ b/src/langtrace_python_sdk/utils/llm.py @@ -96,22 +96,22 @@ def calculate_price_from_usage(model, usage): def convert_mistral_messages_to_serializable(mistral_messages): serializable_messages = [] - + try: for message in mistral_messages: serializable_message = {"role": message.role} - + # Handle content if hasattr(message, "content"): serializable_message["content"] = message.content - + # Handle tool_calls if hasattr(message, "tool_calls") and message.tool_calls is not None: serializable_tool_calls = [] - + for tool_call in message.tool_calls: serializable_tool_call = {} - + # Handle id, type, and index if hasattr(tool_call, "id"): serializable_tool_call["id"] = tool_call.id @@ -119,111 +119,117 @@ def convert_mistral_messages_to_serializable(mistral_messages): serializable_tool_call["type"] = tool_call.type if hasattr(tool_call, "index"): serializable_tool_call["index"] = tool_call.index - + # Handle function if hasattr(tool_call, "function"): function_call = tool_call.function serializable_function = {} - + if hasattr(function_call, "name"): serializable_function["name"] = function_call.name if hasattr(function_call, "arguments"): serializable_function["arguments"] = function_call.arguments - + serializable_tool_call["function"] = serializable_function - + serializable_tool_calls.append(serializable_tool_call) - + serializable_message["tool_calls"] = serializable_tool_calls - + # Handle tool_call_id for tool messages if hasattr(message, "tool_call_id"): serializable_message["tool_call_id"] = message.tool_call_id - + serializable_messages.append(serializable_message) except Exception as e: pass - + return serializable_messages def convert_gemini_messages_to_serializable(formatted_messages, system_message=None): """ Converts Gemini-formatted messages back to a JSON serializable format. - + Args: formatted_messages: The formatted messages from Gemini. system_message (str, optional): System message content. - + Returns: List[dict]: JSON serializable list of message dictionaries. """ serializable_messages = [] - + try: # Add system message if present if system_message: - serializable_messages.append({ - "role": "system", - "content": system_message - }) - + serializable_messages.append({"role": "system", "content": system_message}) + for message_item in formatted_messages: # Handle the case where the item is a dict with 'role' and 'content' keys - if isinstance(message_item, dict) and 'role' in message_item and 'content' in message_item: - role = message_item['role'] - content_value = message_item['content'] - + if ( + isinstance(message_item, dict) + and "role" in message_item + and "content" in message_item + ): + role = message_item["role"] + content_value = message_item["content"] + # Initialize our serializable message serializable_message = {"role": role} - + # If content is a list of Content objects if isinstance(content_value, list) and len(content_value) > 0: for content_obj in content_value: # Process each Content object - if hasattr(content_obj, 'parts') and hasattr(content_obj, 'role'): + if hasattr(content_obj, "parts") and hasattr( + content_obj, "role" + ): parts = content_obj.parts - + # Extract text from parts text_parts = [] for part in parts: - if hasattr(part, 'text') and part.text: + if hasattr(part, "text") and part.text: text_parts.append(part.text) - + if text_parts: serializable_message["content"] = " ".join(text_parts) - + # Here you can add additional processing for other part types # like function_call, function_response, inline_data, etc. # Similar to the previous implementation - + # If content is a string or already a primitive type - elif isinstance(content_value, (str, int, float, bool)) or content_value is None: + elif ( + isinstance(content_value, (str, int, float, bool)) + or content_value is None + ): serializable_message["content"] = content_value - + # Add the processed message to our list serializable_messages.append(serializable_message) - + # Handle the case where the item is a Content object directly - elif hasattr(message_item, 'role') and hasattr(message_item, 'parts'): + elif hasattr(message_item, "role") and hasattr(message_item, "parts"): # This is the case from the previous implementation # Process a Content object directly serializable_message = {"role": message_item.role} - + parts = message_item.parts text_parts = [] - + for part in parts: - if hasattr(part, 'text') and part.text: + if hasattr(part, "text") and part.text: text_parts.append(part.text) - + if text_parts: serializable_message["content"] = " ".join(text_parts) - + serializable_messages.append(serializable_message) except Exception as e: pass - + return serializable_messages @@ -253,7 +259,7 @@ def get_llm_request_attributes(kwargs, prompts=None, model=None, operation_name= or kwargs.get("top_k", None) or kwargs.get("top_n", None) ) - + try: prompts = json.dumps(prompts) if prompts else None except Exception as e: @@ -261,9 +267,13 @@ def get_llm_request_attributes(kwargs, prompts=None, model=None, operation_name= # check model if kwargs.get("model") is not None: if kwargs.get("model").startswith("gemini"): - prompts = json.dumps(convert_gemini_messages_to_serializable(prompts)) + prompts = json.dumps( + convert_gemini_messages_to_serializable(prompts) + ) elif kwargs.get("model").startswith("mistral"): - prompts = json.dumps(convert_mistral_messages_to_serializable(prompts)) + prompts = json.dumps( + convert_mistral_messages_to_serializable(prompts) + ) else: prompts = "[]" else: @@ -427,6 +437,7 @@ def cleanup(self): "".join(self.result_content), response_model ) if self._span_started: + print("SPAAN", self.span) set_span_attribute( self.span, SpanAttributes.LLM_RESPONSE_MODEL, @@ -570,6 +581,9 @@ def build_streaming_response(self, chunk): and not hasattr(chunk.delta, "message") ): content = [chunk.delta.text] if hasattr(chunk.delta, "text") else [] + # OpenAI Responses API + if hasattr(chunk, "type") and chunk.type == "response.completed": + content = [chunk.response.output_text] if isinstance(chunk, dict): if "message" in chunk: @@ -579,7 +593,11 @@ def build_streaming_response(self, chunk): self.result_content.append(content[0]) def set_usage_attributes(self, chunk): - + # Responses API OpenAI + if hasattr(chunk, "type") and chunk.type == "response.completed": + usage = chunk.response.usage + self.completion_tokens = usage.output_tokens + self.prompt_tokens = usage.input_tokens # Anthropic & OpenAI if hasattr(chunk, "type") and chunk.type == "message_start": if hasattr(chunk.message, "usage") and chunk.message.usage is not None: @@ -630,6 +648,7 @@ def process_chunk(self, chunk): and chunk.data.choices is not None ): chunk = chunk.data + self.set_response_model(chunk=chunk) self.build_streaming_response(chunk=chunk) self.set_usage_attributes(chunk=chunk) From 72e325aa3ea30a2a23dedc745af67117c076efda Mon Sep 17 00:00:00 2001 From: Ali Waleed Date: Tue, 18 Mar 2025 17:15:34 +0200 Subject: [PATCH 5/5] bump version --- src/langtrace_python_sdk/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/langtrace_python_sdk/version.py b/src/langtrace_python_sdk/version.py index f64508ee..08f7211d 100644 --- a/src/langtrace_python_sdk/version.py +++ b/src/langtrace_python_sdk/version.py @@ -1 +1 @@ -__version__ = "3.8.6" +__version__ = "3.8.7"