From d60d06b1b65f66fc0c466a2b2a8e83f1caa649d4 Mon Sep 17 00:00:00 2001 From: Yufeng He <40085740+he-yufeng@users.noreply.github.com> Date: Mon, 8 Jun 2026 02:28:28 +0800 Subject: [PATCH] fix: defer parsing incomplete streamed text --- .../lib/streaming/responses/_responses.py | 17 +- tests/lib/responses/test_responses.py | 189 ++++++++++++++++++ 2 files changed, 205 insertions(+), 1 deletion(-) diff --git a/src/openai/lib/streaming/responses/_responses.py b/src/openai/lib/streaming/responses/_responses.py index 6975a9260d..f509a8a4ea 100644 --- a/src/openai/lib/streaming/responses/_responses.py +++ b/src/openai/lib/streaming/responses/_responses.py @@ -5,6 +5,8 @@ from typing import Any, List, Generic, Iterable, Awaitable, cast from typing_extensions import Self, Callable, Iterator, AsyncIterator +import pydantic + from ._types import ParsedResponseSnapshot from ._events import ( ResponseStreamEvent, @@ -286,7 +288,7 @@ def handle_event(self, event: RawResponseStreamEvent) -> List[ResponseStreamEven logprobs=event.logprobs, type="response.output_text.done", text=event.text, - parsed=parse_text(event.text, text_format=self._text_format), + parsed=self._parse_text_done_event(event.text), ) ) elif event.type == "response.function_call_arguments.delta": @@ -370,3 +372,16 @@ def _create_initial_response(self, event: RawResponseStreamEvent) -> ParsedRespo raise RuntimeError(f"Expected to have received `response.created` before `{event.type}`") return construct_type_unchecked(type_=ParsedResponseSnapshot, value=event.response.to_dict()) + + def _parse_text_done_event(self, text: str) -> TextFormatT | None: + try: + return parse_text(text, text_format=self._text_format) + except pydantic.ValidationError as exc: + if not _is_json_parse_error(exc): + raise + + return None + + +def _is_json_parse_error(exc: pydantic.ValidationError) -> bool: + return any("json" in str(error.get("type", "")).lower() for error in exc.errors()) diff --git a/tests/lib/responses/test_responses.py b/tests/lib/responses/test_responses.py index 8e5f16df95..697466804e 100644 --- a/tests/lib/responses/test_responses.py +++ b/tests/lib/responses/test_responses.py @@ -3,17 +3,108 @@ from typing_extensions import TypeVar import pytest +import pydantic from respx import MockRouter from inline_snapshot import snapshot from openai import OpenAI, AsyncOpenAI from openai._utils import assert_signatures_in_sync +from openai._models import construct_type_unchecked +from openai.types.responses import ( + ResponseCreatedEvent, + ResponseTextDoneEvent, + ResponseCompletedEvent as RawResponseCompletedEvent, + ResponseIncompleteEvent, + ResponseOutputItemAddedEvent, + ResponseContentPartAddedEvent, +) +from openai.lib.streaming.responses._responses import ResponseStreamState from ...conftest import base_url from ..snapshots import make_snapshot_request _T = TypeVar("_T") + +class _StructuredText(pydantic.BaseModel): + answer: str + + +def _response_payload(*, status: str, output: list[object] | None = None) -> dict[str, object]: + return { + "id": "resp_test", + "object": "response", + "created_at": 0, + "model": "gpt-4.1", + "output": output or [], + "parallel_tool_calls": True, + "temperature": None, + "tool_choice": "auto", + "tools": [], + "top_p": None, + "status": status, + } + + +def _message_payload(*, text: str, status: str) -> dict[str, object]: + return { + "id": "msg_test", + "type": "message", + "role": "assistant", + "status": status, + "content": [ + { + "type": "output_text", + "text": text, + "annotations": [], + "logprobs": [], + } + ], + } + + +def _start_response_stream(state: ResponseStreamState[_StructuredText]) -> None: + state.handle_event( + construct_type_unchecked( + type_=ResponseCreatedEvent, + value={ + "type": "response.created", + "sequence_number": 0, + "response": _response_payload(status="in_progress"), + }, + ) + ) + state.handle_event( + construct_type_unchecked( + type_=ResponseOutputItemAddedEvent, + value={ + "type": "response.output_item.added", + "sequence_number": 1, + "output_index": 0, + "item": _message_payload(text="", status="in_progress"), + }, + ) + ) + state.handle_event( + construct_type_unchecked( + type_=ResponseContentPartAddedEvent, + value={ + "type": "response.content_part.added", + "sequence_number": 2, + "output_index": 0, + "content_index": 0, + "item_id": "msg_test", + "part": { + "type": "output_text", + "text": "", + "annotations": [], + "logprobs": [], + }, + }, + ) + ) + + # all the snapshots in this file are auto-generated from the live API # # you can update them with @@ -41,6 +132,104 @@ def test_output_text(client: OpenAI, respx_mock: MockRouter) -> None: ) +def test_stream_output_text_done_defers_invalid_structured_parse() -> None: + state = ResponseStreamState(text_format=_StructuredText, input_tools=[]) + _start_response_stream(state) + + events = state.handle_event( + construct_type_unchecked( + type_=ResponseTextDoneEvent, + value={ + "type": "response.output_text.done", + "sequence_number": 3, + "output_index": 0, + "content_index": 0, + "item_id": "msg_test", + "logprobs": [], + "text": '{"answer":', + }, + ) + ) + + assert len(events) == 1 + assert events[0].type == "response.output_text.done" + assert events[0].parsed is None + + incomplete_events = state.handle_event( + construct_type_unchecked( + type_=ResponseIncompleteEvent, + value={ + "type": "response.incomplete", + "sequence_number": 4, + "response": { + **_response_payload( + status="incomplete", + output=[_message_payload(text='{"answer":', status="incomplete")], + ), + "incomplete_details": {"reason": "max_output_tokens"}, + }, + }, + ) + ) + assert incomplete_events[0].type == "response.incomplete" + + +def test_stream_output_text_done_preserves_structured_validation_errors() -> None: + state = ResponseStreamState(text_format=_StructuredText, input_tools=[]) + _start_response_stream(state) + + with pytest.raises(pydantic.ValidationError): + state.handle_event( + construct_type_unchecked( + type_=ResponseTextDoneEvent, + value={ + "type": "response.output_text.done", + "sequence_number": 3, + "output_index": 0, + "content_index": 0, + "item_id": "msg_test", + "logprobs": [], + "text": "{}", + }, + ) + ) + + +def test_stream_completed_response_still_raises_invalid_structured_parse() -> None: + state = ResponseStreamState(text_format=_StructuredText, input_tools=[]) + _start_response_stream(state) + + state.handle_event( + construct_type_unchecked( + type_=ResponseTextDoneEvent, + value={ + "type": "response.output_text.done", + "sequence_number": 3, + "output_index": 0, + "content_index": 0, + "item_id": "msg_test", + "logprobs": [], + "text": '{"answer":', + }, + ) + ) + + with pytest.raises(pydantic.ValidationError): + state.handle_event( + construct_type_unchecked( + type_=RawResponseCompletedEvent, + value={ + "type": "response.completed", + "sequence_number": 4, + "response": _response_payload( + status="completed", + output=[_message_payload(text='{"answer":', status="completed")], + ), + }, + ) + ) + + @pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"]) def test_stream_method_definition_in_sync(sync: bool, client: OpenAI, async_client: AsyncOpenAI) -> None: checking_client: OpenAI | AsyncOpenAI = client if sync else async_client