Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion src/openai/lib/streaming/responses/_responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from typing import Any, List, Generic, Iterable, Awaitable, cast
from typing_extensions import Self, Callable, Iterator, AsyncIterator

import pydantic

from ._types import ParsedResponseSnapshot
from ._events import (
ResponseStreamEvent,
Expand Down Expand Up @@ -286,7 +288,7 @@ def handle_event(self, event: RawResponseStreamEvent) -> List[ResponseStreamEven
logprobs=event.logprobs,
type="response.output_text.done",
text=event.text,
parsed=parse_text(event.text, text_format=self._text_format),
parsed=self._parse_text_done_event(event.text),
)
)
elif event.type == "response.function_call_arguments.delta":
Expand Down Expand Up @@ -370,3 +372,16 @@ def _create_initial_response(self, event: RawResponseStreamEvent) -> ParsedRespo
raise RuntimeError(f"Expected to have received `response.created` before `{event.type}`")

return construct_type_unchecked(type_=ParsedResponseSnapshot, value=event.response.to_dict())

def _parse_text_done_event(self, text: str) -> TextFormatT | None:
try:
return parse_text(text, text_format=self._text_format)
except pydantic.ValidationError as exc:
if not _is_json_parse_error(exc):
raise

return None


def _is_json_parse_error(exc: pydantic.ValidationError) -> bool:
return any("json" in str(error.get("type", "")).lower() for error in exc.errors())
Comment on lines +386 to +387

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Distinguish top-level JSON parse failures

When a structured model contains a JSON-validated field, e.g. a Pydantic Json[...] field, Pydantic reports invalid field contents with a json_* error type even though the top-level response text parsed successfully. This helper treats those schema validation failures as truncation and returns None from response.output_text.done, so an incomplete stream with complete-but-invalid structured text can silently skip the validation error that this change intended to preserve. Check that the JSON error is for the top-level parse (for example by inspecting the error location) before suppressing it.

Useful? React with 👍 / 👎.

189 changes: 189 additions & 0 deletions tests/lib/responses/test_responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,108 @@
from typing_extensions import TypeVar

import pytest
import pydantic
from respx import MockRouter
from inline_snapshot import snapshot

from openai import OpenAI, AsyncOpenAI
from openai._utils import assert_signatures_in_sync
from openai._models import construct_type_unchecked
from openai.types.responses import (
ResponseCreatedEvent,
ResponseTextDoneEvent,
ResponseCompletedEvent as RawResponseCompletedEvent,
ResponseIncompleteEvent,
ResponseOutputItemAddedEvent,
ResponseContentPartAddedEvent,
)
from openai.lib.streaming.responses._responses import ResponseStreamState

from ...conftest import base_url
from ..snapshots import make_snapshot_request

_T = TypeVar("_T")


class _StructuredText(pydantic.BaseModel):
answer: str


def _response_payload(*, status: str, output: list[object] | None = None) -> dict[str, object]:
return {
"id": "resp_test",
"object": "response",
"created_at": 0,
"model": "gpt-4.1",
"output": output or [],
"parallel_tool_calls": True,
"temperature": None,
"tool_choice": "auto",
"tools": [],
"top_p": None,
"status": status,
}


def _message_payload(*, text: str, status: str) -> dict[str, object]:
return {
"id": "msg_test",
"type": "message",
"role": "assistant",
"status": status,
"content": [
{
"type": "output_text",
"text": text,
"annotations": [],
"logprobs": [],
}
],
}


def _start_response_stream(state: ResponseStreamState[_StructuredText]) -> None:
state.handle_event(
construct_type_unchecked(
type_=ResponseCreatedEvent,
value={
"type": "response.created",
"sequence_number": 0,
"response": _response_payload(status="in_progress"),
},
)
)
state.handle_event(
construct_type_unchecked(
type_=ResponseOutputItemAddedEvent,
value={
"type": "response.output_item.added",
"sequence_number": 1,
"output_index": 0,
"item": _message_payload(text="", status="in_progress"),
},
)
)
state.handle_event(
construct_type_unchecked(
type_=ResponseContentPartAddedEvent,
value={
"type": "response.content_part.added",
"sequence_number": 2,
"output_index": 0,
"content_index": 0,
"item_id": "msg_test",
"part": {
"type": "output_text",
"text": "",
"annotations": [],
"logprobs": [],
},
},
)
)


# all the snapshots in this file are auto-generated from the live API
#
# you can update them with
Expand Down Expand Up @@ -41,6 +132,104 @@ def test_output_text(client: OpenAI, respx_mock: MockRouter) -> None:
)


def test_stream_output_text_done_defers_invalid_structured_parse() -> None:
state = ResponseStreamState(text_format=_StructuredText, input_tools=[])
_start_response_stream(state)

events = state.handle_event(
construct_type_unchecked(
type_=ResponseTextDoneEvent,
value={
"type": "response.output_text.done",
"sequence_number": 3,
"output_index": 0,
"content_index": 0,
"item_id": "msg_test",
"logprobs": [],
"text": '{"answer":',
},
)
)

assert len(events) == 1
assert events[0].type == "response.output_text.done"
assert events[0].parsed is None

incomplete_events = state.handle_event(
construct_type_unchecked(
type_=ResponseIncompleteEvent,
value={
"type": "response.incomplete",
"sequence_number": 4,
"response": {
**_response_payload(
status="incomplete",
output=[_message_payload(text='{"answer":', status="incomplete")],
),
"incomplete_details": {"reason": "max_output_tokens"},
},
},
)
)
assert incomplete_events[0].type == "response.incomplete"


def test_stream_output_text_done_preserves_structured_validation_errors() -> None:
state = ResponseStreamState(text_format=_StructuredText, input_tools=[])
_start_response_stream(state)

with pytest.raises(pydantic.ValidationError):
state.handle_event(
construct_type_unchecked(
type_=ResponseTextDoneEvent,
value={
"type": "response.output_text.done",
"sequence_number": 3,
"output_index": 0,
"content_index": 0,
"item_id": "msg_test",
"logprobs": [],
"text": "{}",
},
)
)


def test_stream_completed_response_still_raises_invalid_structured_parse() -> None:
state = ResponseStreamState(text_format=_StructuredText, input_tools=[])
_start_response_stream(state)

state.handle_event(
construct_type_unchecked(
type_=ResponseTextDoneEvent,
value={
"type": "response.output_text.done",
"sequence_number": 3,
"output_index": 0,
"content_index": 0,
"item_id": "msg_test",
"logprobs": [],
"text": '{"answer":',
},
)
)

with pytest.raises(pydantic.ValidationError):
state.handle_event(
construct_type_unchecked(
type_=RawResponseCompletedEvent,
value={
"type": "response.completed",
"sequence_number": 4,
"response": _response_payload(
status="completed",
output=[_message_payload(text='{"answer":', status="completed")],
),
},
)
)


@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
def test_stream_method_definition_in_sync(sync: bool, client: OpenAI, async_client: AsyncOpenAI) -> None:
checking_client: OpenAI | AsyncOpenAI = client if sync else async_client
Expand Down