diff --git a/sentry_sdk/integrations/openai.py b/sentry_sdk/integrations/openai.py index d8139f217b..d4fe6e2c26 100644 --- a/sentry_sdk/integrations/openai.py +++ b/sentry_sdk/integrations/openai.py @@ -467,14 +467,11 @@ def _common_set_output_data( response: "Any", input: "Any", integration: "OpenAIIntegration", - start_time: "Optional[float]" = None, finish_span: bool = True, ) -> None: if hasattr(response, "model"): set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_MODEL, response.model) - ttft: "Optional[float]" = None - if hasattr(response, "choices"): if should_send_default_pii() and integration.include_prompts: response_text = [ @@ -525,143 +522,6 @@ def _common_set_output_data( if finish_span: span.__exit__(None, None, None) - - elif hasattr(response, "_iterator"): - data_buf: "list[list[str]]" = [] # one for each choice - - old_iterator = response._iterator - - def new_iterator() -> "Iterator[ChatCompletionChunk]": - nonlocal ttft - count_tokens_manually = True - for x in old_iterator: - with capture_internal_exceptions(): - # OpenAI chat completion API - if hasattr(x, "choices"): - choice_index = 0 - for choice in x.choices: - if hasattr(choice, "delta") and hasattr( - choice.delta, "content" - ): - if start_time is not None and ttft is None: - ttft = time.perf_counter() - start_time - content = choice.delta.content - if len(data_buf) <= choice_index: - data_buf.append([]) - data_buf[choice_index].append(content or "") - choice_index += 1 - - # OpenAI responses API - elif hasattr(x, "delta"): - if start_time is not None and ttft is None: - ttft = time.perf_counter() - start_time - if len(data_buf) == 0: - data_buf.append([]) - data_buf[0].append(x.delta or "") - - # OpenAI responses API end of streaming response - if RESPONSES_API_ENABLED and isinstance(x, ResponseCompletedEvent): - _calculate_token_usage( - input, - x.response, - span, - None, - integration.count_tokens, - ) - count_tokens_manually = False - - yield x - - with capture_internal_exceptions(): - if ttft is not None: - set_data_normalized( - span, SPANDATA.GEN_AI_RESPONSE_TIME_TO_FIRST_TOKEN, ttft - ) - if len(data_buf) > 0: - all_responses = ["".join(chunk) for chunk in data_buf] - if should_send_default_pii() and integration.include_prompts: - set_data_normalized( - span, SPANDATA.GEN_AI_RESPONSE_TEXT, all_responses - ) - if count_tokens_manually: - _calculate_token_usage( - input, - response, - span, - all_responses, - integration.count_tokens, - ) - - if finish_span: - span.__exit__(None, None, None) - - async def new_iterator_async() -> "AsyncIterator[ChatCompletionChunk]": - nonlocal ttft - count_tokens_manually = True - async for x in old_iterator: - with capture_internal_exceptions(): - # OpenAI chat completion API - if hasattr(x, "choices"): - choice_index = 0 - for choice in x.choices: - if hasattr(choice, "delta") and hasattr( - choice.delta, "content" - ): - if start_time is not None and ttft is None: - ttft = time.perf_counter() - start_time - content = choice.delta.content - if len(data_buf) <= choice_index: - data_buf.append([]) - data_buf[choice_index].append(content or "") - choice_index += 1 - - # OpenAI responses API - elif hasattr(x, "delta"): - if start_time is not None and ttft is None: - ttft = time.perf_counter() - start_time - if len(data_buf) == 0: - data_buf.append([]) - data_buf[0].append(x.delta or "") - - # OpenAI responses API end of streaming response - if RESPONSES_API_ENABLED and isinstance(x, ResponseCompletedEvent): - _calculate_token_usage( - input, - x.response, - span, - None, - integration.count_tokens, - ) - count_tokens_manually = False - - yield x - - with capture_internal_exceptions(): - if ttft is not None: - set_data_normalized( - span, SPANDATA.GEN_AI_RESPONSE_TIME_TO_FIRST_TOKEN, ttft - ) - if len(data_buf) > 0: - all_responses = ["".join(chunk) for chunk in data_buf] - if should_send_default_pii() and integration.include_prompts: - set_data_normalized( - span, SPANDATA.GEN_AI_RESPONSE_TEXT, all_responses - ) - if count_tokens_manually: - _calculate_token_usage( - input, - response, - span, - all_responses, - integration.count_tokens, - ) - if finish_span: - span.__exit__(None, None, None) - - if str(type(response._iterator)) == "": - response._iterator = new_iterator_async() - else: - response._iterator = new_iterator() else: _calculate_token_usage(input, response, span, None, integration.count_tokens) if finish_span: @@ -704,7 +564,7 @@ def _new_chat_completion_common(f: "Any", *args: "Any", **kwargs: "Any") -> "Any ) else: _set_completions_api_output_data( - span, response, kwargs, integration, start_time, finish_span=True + span, response, kwargs, integration, finish_span=True ) return response @@ -715,7 +575,6 @@ def _set_completions_api_output_data( response: "Any", kwargs: "dict[str, Any]", integration: "OpenAIIntegration", - start_time: "Optional[float]" = None, finish_span: bool = True, ) -> None: messages = kwargs.get("messages") @@ -728,7 +587,6 @@ def _set_completions_api_output_data( response, messages, integration, - start_time, finish_span, ) @@ -746,14 +604,99 @@ def _set_streaming_completions_api_output_data( if messages is not None and isinstance(messages, str): messages = [messages] - _common_set_output_data( - span, - response, - messages, - integration, - start_time, - finish_span, - ) + ttft: "Optional[float]" = None + data_buf: "list[list[str]]" = [] # one for each choice + + old_iterator = response._iterator + + def new_iterator() -> "Iterator[ChatCompletionChunk]": + nonlocal ttft + for x in old_iterator: + with capture_internal_exceptions(): + if hasattr(x, "choices"): + choice_index = 0 + for choice in x.choices: + if hasattr(choice, "delta") and hasattr( + choice.delta, "content" + ): + if start_time is not None and ttft is None: + ttft = time.perf_counter() - start_time + content = choice.delta.content + if len(data_buf) <= choice_index: + data_buf.append([]) + data_buf[choice_index].append(content or "") + choice_index += 1 + + yield x + + with capture_internal_exceptions(): + if ttft is not None: + set_data_normalized( + span, SPANDATA.GEN_AI_RESPONSE_TIME_TO_FIRST_TOKEN, ttft + ) + if len(data_buf) > 0: + all_responses = ["".join(chunk) for chunk in data_buf] + if should_send_default_pii() and integration.include_prompts: + set_data_normalized( + span, SPANDATA.GEN_AI_RESPONSE_TEXT, all_responses + ) + _calculate_token_usage( + messages, + response, + span, + all_responses, + integration.count_tokens, + ) + + if finish_span: + span.__exit__(None, None, None) + + async def new_iterator_async() -> "AsyncIterator[ChatCompletionChunk]": + nonlocal ttft + async for x in old_iterator: + with capture_internal_exceptions(): + if hasattr(x, "choices"): + choice_index = 0 + for choice in x.choices: + if hasattr(choice, "delta") and hasattr( + choice.delta, "content" + ): + if start_time is not None and ttft is None: + ttft = time.perf_counter() - start_time + content = choice.delta.content + if len(data_buf) <= choice_index: + data_buf.append([]) + data_buf[choice_index].append(content or "") + choice_index += 1 + + yield x + + with capture_internal_exceptions(): + if ttft is not None: + set_data_normalized( + span, SPANDATA.GEN_AI_RESPONSE_TIME_TO_FIRST_TOKEN, ttft + ) + if len(data_buf) > 0: + all_responses = ["".join(chunk) for chunk in data_buf] + if should_send_default_pii() and integration.include_prompts: + set_data_normalized( + span, SPANDATA.GEN_AI_RESPONSE_TEXT, all_responses + ) + _calculate_token_usage( + messages, + response, + span, + all_responses, + integration.count_tokens, + ) + + if finish_span: + span.__exit__(None, None, None) + + if str(type(response._iterator)) == "": + response._iterator = new_iterator_async() + else: + response._iterator = new_iterator() def _set_responses_api_output_data( @@ -761,7 +704,6 @@ def _set_responses_api_output_data( response: "Any", kwargs: "dict[str, Any]", integration: "OpenAIIntegration", - start_time: "Optional[float]" = None, finish_span: bool = True, ) -> None: input = kwargs.get("input") @@ -774,7 +716,6 @@ def _set_responses_api_output_data( response, input, integration, - start_time, finish_span, ) @@ -792,14 +733,108 @@ def _set_streaming_responses_api_output_data( if input is not None and isinstance(input, str): input = [input] - _common_set_output_data( - span, - response, - input, - integration, - start_time, - finish_span, - ) + ttft: "Optional[float]" = None + data_buf: "list[list[str]]" = [] # one for each choice + + old_iterator = response._iterator + + def new_iterator() -> "Iterator[ChatCompletionChunk]": + nonlocal ttft + count_tokens_manually = True + for x in old_iterator: + with capture_internal_exceptions(): + if hasattr(x, "delta"): + if start_time is not None and ttft is None: + ttft = time.perf_counter() - start_time + if len(data_buf) == 0: + data_buf.append([]) + data_buf[0].append(x.delta or "") + + if isinstance(x, ResponseCompletedEvent): + _calculate_token_usage( + input, + x.response, + span, + None, + integration.count_tokens, + ) + count_tokens_manually = False + + yield x + + with capture_internal_exceptions(): + if ttft is not None: + set_data_normalized( + span, SPANDATA.GEN_AI_RESPONSE_TIME_TO_FIRST_TOKEN, ttft + ) + if len(data_buf) > 0: + all_responses = ["".join(chunk) for chunk in data_buf] + if should_send_default_pii() and integration.include_prompts: + set_data_normalized( + span, SPANDATA.GEN_AI_RESPONSE_TEXT, all_responses + ) + if count_tokens_manually: + _calculate_token_usage( + input, + response, + span, + all_responses, + integration.count_tokens, + ) + + if finish_span: + span.__exit__(None, None, None) + + async def new_iterator_async() -> "AsyncIterator[ChatCompletionChunk]": + nonlocal ttft + count_tokens_manually = True + async for x in old_iterator: + with capture_internal_exceptions(): + if hasattr(x, "delta"): + if start_time is not None and ttft is None: + ttft = time.perf_counter() - start_time + if len(data_buf) == 0: + data_buf.append([]) + data_buf[0].append(x.delta or "") + + if isinstance(x, ResponseCompletedEvent): + _calculate_token_usage( + input, + x.response, + span, + None, + integration.count_tokens, + ) + count_tokens_manually = False + + yield x + + with capture_internal_exceptions(): + if ttft is not None: + set_data_normalized( + span, SPANDATA.GEN_AI_RESPONSE_TIME_TO_FIRST_TOKEN, ttft + ) + if len(data_buf) > 0: + all_responses = ["".join(chunk) for chunk in data_buf] + if should_send_default_pii() and integration.include_prompts: + set_data_normalized( + span, SPANDATA.GEN_AI_RESPONSE_TEXT, all_responses + ) + if count_tokens_manually: + _calculate_token_usage( + input, + response, + span, + all_responses, + integration.count_tokens, + ) + if finish_span: + span.__exit__(None, None, None) + + if str(type(response._iterator)) == "": + response._iterator = new_iterator_async() + else: + response._iterator = new_iterator() def _set_embeddings_output_data( @@ -807,7 +842,6 @@ def _set_embeddings_output_data( response: "Any", kwargs: "dict[str, Any]", integration: "OpenAIIntegration", - start_time: "Optional[float]" = None, finish_span: bool = True, ) -> None: input = kwargs.get("input") @@ -820,7 +854,6 @@ def _set_embeddings_output_data( response, input, integration, - start_time, finish_span, ) @@ -1008,7 +1041,7 @@ def _new_responses_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "An ) else: _set_responses_api_output_data( - span, response, kwargs, integration, start_time, finish_span=True + span, response, kwargs, integration, finish_span=True ) return response diff --git a/tests/integrations/openai/test_openai.py b/tests/integrations/openai/test_openai.py index d99fb4caf8..060600ee65 100644 --- a/tests/integrations/openai/test_openai.py +++ b/tests/integrations/openai/test_openai.py @@ -500,6 +500,7 @@ def test_streaming_chat_completion_no_prompts( {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "hello"}, ], + stream=True, ) response_string = "".join( map(lambda x: x.choices[0].delta.content, response_stream) @@ -624,6 +625,7 @@ def test_streaming_chat_completion(sentry_init, capture_events, messages, reques response_stream = client.chat.completions.create( model="some-model", messages=messages, + stream=True, ) response_string = "".join( map(lambda x: x.choices[0].delta.content, response_stream) @@ -747,6 +749,7 @@ async def test_streaming_chat_completion_async_no_prompts( {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "hello"}, ], + stream=True, ) response_string = "" @@ -881,6 +884,7 @@ async def test_streaming_chat_completion_async( response_stream = await client.chat.completions.create( model="some-model", messages=messages, + stream=True, ) response_string = "" @@ -942,7 +946,8 @@ def test_bad_chat_completion(sentry_init, capture_events): ) with pytest.raises(OpenAIError): client.chat.completions.create( - model="some-model", messages=[{"role": "system", "content": "hello"}] + model="some-model", + messages=[{"role": "system", "content": "hello"}], ) (event,) = events @@ -2417,15 +2422,19 @@ async def test_ai_client_span_streaming_responses_async_api( events = capture_events() client = AsyncOpenAI(api_key="z") - client.responses._post = AsyncMock(return_value=EXAMPLE_RESPONSE) + returned_stream = AsyncStream(cast_to=None, response=None, client=client) + returned_stream._iterator = async_iterator(EXAMPLE_RESPONSES_STREAM) + client.responses._post = mock.AsyncMock(return_value=returned_stream) with start_transaction(name="openai tx"): - await client.responses.create( + result = await client.responses.create( model="gpt-4o", instructions=instructions, input=input, stream=True, ) + async for _ in result: + pass (transaction,) = events spans = transaction["spans"] @@ -2438,14 +2447,14 @@ async def test_ai_client_span_streaming_responses_async_api( "gen_ai.operation.name": "responses", "gen_ai.response.streaming": True, "gen_ai.system": "openai", - "gen_ai.response.model": "response-model-id", + "gen_ai.response.time_to_first_token": mock.ANY, "gen_ai.usage.input_tokens": 20, "gen_ai.usage.input_tokens.cached": 5, "gen_ai.usage.output_tokens": 10, "gen_ai.usage.output_tokens.reasoning": 8, "gen_ai.usage.total_tokens": 30, "gen_ai.request.model": "gpt-4o", - "gen_ai.response.text": "the model response", + "gen_ai.response.text": "hello world", "thread.id": mock.ANY, "thread.name": mock.ANY, } @@ -2994,7 +3003,9 @@ def test_streaming_chat_completion_ttft(sentry_init, capture_events): with start_transaction(name="openai tx"): response_stream = client.chat.completions.create( - model="some-model", messages=[{"role": "user", "content": "Say hello"}] + model="some-model", + messages=[{"role": "user", "content": "Say hello"}], + stream=True, ) # Consume the stream for _ in response_stream: @@ -3058,7 +3069,9 @@ async def test_streaming_chat_completion_ttft_async(sentry_init, capture_events) with start_transaction(name="openai tx"): response_stream = await client.chat.completions.create( - model="some-model", messages=[{"role": "user", "content": "Say hello"}] + model="some-model", + messages=[{"role": "user", "content": "Say hello"}], + stream=True, ) # Consume the stream async for _ in response_stream: