diff --git a/.github/workflows/python-merge-tests.yml b/.github/workflows/python-merge-tests.yml index 4417165a24..4fc47af595 100644 --- a/.github/workflows/python-merge-tests.yml +++ b/.github/workflows/python-merge-tests.yml @@ -115,12 +115,13 @@ jobs: -m "not integration" --timeout=120 --session-timeout=900 --timeout_method thread --retries 2 --retry-delay 5 + --junitxml=pytest.xml working-directory: ./python - name: Surface failing tests if: always() uses: pmeier/pytest-results-action@v0.7.2 with: - path: ./python/**.xml + path: ./python/pytest.xml summary: true display-options: fEX fail-on-empty: false @@ -163,6 +164,7 @@ jobs: -n logical --dist worksteal --timeout=120 --session-timeout=900 --timeout_method thread --retries 2 --retry-delay 5 + --junitxml=pytest.xml working-directory: ./python - name: Test OpenAI samples timeout-minutes: 10 @@ -173,7 +175,7 @@ jobs: if: always() uses: pmeier/pytest-results-action@v0.7.2 with: - path: ./python/**.xml + path: ./python/pytest.xml summary: true display-options: fEX fail-on-empty: false @@ -225,6 +227,7 @@ jobs: -n logical --dist worksteal --timeout=120 --session-timeout=900 --timeout_method thread --retries 2 --retry-delay 5 + --junitxml=pytest.xml working-directory: ./python - name: Test Azure samples timeout-minutes: 10 @@ -235,7 +238,7 @@ jobs: if: always() uses: pmeier/pytest-results-action@v0.7.2 with: - path: ./python/**.xml + path: ./python/pytest.xml summary: true display-options: fEX fail-on-empty: false @@ -285,6 +288,7 @@ jobs: -n logical --dist worksteal --timeout=120 --session-timeout=900 --timeout_method thread --retries 2 --retry-delay 5 + --junitxml=pytest.xml working-directory: ./python - name: Stop local MCP server if: always() @@ -310,7 +314,7 @@ jobs: if: always() uses: pmeier/pytest-results-action@v0.7.2 with: - path: ./python/**.xml + path: ./python/pytest.xml summary: true display-options: fEX fail-on-empty: false @@ -375,12 +379,13 @@ jobs: -x --timeout=360 --session-timeout=900 --timeout_method thread --retries 2 --retry-delay 5 + --junitxml=pytest.xml working-directory: ./python - name: Surface failing tests if: always() uses: pmeier/pytest-results-action@v0.7.2 with: - path: ./python/**.xml + path: ./python/pytest.xml summary: true display-options: fEX fail-on-empty: false @@ -430,12 +435,13 @@ jobs: -n logical --dist worksteal --timeout=120 --session-timeout=900 --timeout_method thread --retries 2 --retry-delay 5 + --junitxml=pytest.xml working-directory: ./python - name: Surface failing tests if: always() uses: pmeier/pytest-results-action@v0.7.2 with: - path: ./python/**.xml + path: ./python/pytest.xml summary: true display-options: fEX fail-on-empty: false @@ -489,13 +495,13 @@ jobs: echo "Cosmos DB emulator did not become ready in time." >&2 exit 1 - name: Test with pytest (Cosmos integration) - run: uv run --directory packages/azure-cosmos poe integration-tests -n logical --dist worksteal --timeout=120 --session-timeout=900 --timeout_method thread --retries 2 --retry-delay 5 + run: uv run --directory packages/azure-cosmos poe integration-tests -n logical --dist worksteal --timeout=120 --session-timeout=900 --timeout_method thread --retries 2 --retry-delay 5 --junitxml=pytest.xml working-directory: ./python - name: Surface failing tests if: always() uses: pmeier/pytest-results-action@v0.7.2 with: - path: ./python/**.xml + path: ./python/pytest.xml summary: true display-options: fEX fail-on-empty: false diff --git a/.github/workflows/python-tests.yml b/.github/workflows/python-tests.yml index 3e12773090..5530be9ffa 100644 --- a/.github/workflows/python-tests.yml +++ b/.github/workflows/python-tests.yml @@ -40,7 +40,7 @@ jobs: UV_CACHE_DIR: /tmp/.uv-cache # Unit tests - name: Run all tests - run: uv run poe test -A + run: uv run poe test -A --junitxml=pytest.xml working-directory: ./python # Surface failing tests @@ -48,7 +48,7 @@ jobs: if: always() uses: pmeier/pytest-results-action@v0.7.2 with: - path: ./python/**.xml + path: ./python/pytest.xml summary: true display-options: fEX fail-on-empty: false diff --git a/.gitignore b/.gitignore index 089abb5395..4994e9e2fe 100644 --- a/.gitignore +++ b/.gitignore @@ -47,6 +47,8 @@ htmlcov/ .cache nosetests.xml coverage.xml +pytest.xml +python-coverage.xml *.cover *.py,cover .hypothesis/ diff --git a/python/packages/ag-ui/agent_framework_ag_ui/_agent_run.py b/python/packages/ag-ui/agent_framework_ag_ui/_agent_run.py index e9ce610b10..639a3f89b3 100644 --- a/python/packages/ag-ui/agent_framework_ag_ui/_agent_run.py +++ b/python/packages/ag-ui/agent_framework_ag_ui/_agent_run.py @@ -46,6 +46,7 @@ from ._run_common import ( FlowState, _build_run_finished_event, # type: ignore + _close_reasoning_block, # type: ignore _emit_content, # type: ignore _extract_resume_payload, # type: ignore _has_only_tool_calls, # type: ignore @@ -1058,6 +1059,10 @@ async def run_agent_stream( } ) + # Close any open reasoning block + for event in _close_reasoning_block(flow): + yield event + # Close any open message if flow.message_id: logger.debug(f"End of run: closing text message message_id={flow.message_id}") diff --git a/python/packages/ag-ui/agent_framework_ag_ui/_run_common.py b/python/packages/ag-ui/agent_framework_ag_ui/_run_common.py index 155f559a94..81d5fadbbe 100644 --- a/python/packages/ag-ui/agent_framework_ag_ui/_run_common.py +++ b/python/packages/ag-ui/agent_framework_ag_ui/_run_common.py @@ -128,6 +128,7 @@ class FlowState: interrupts: list[dict[str, Any]] = field(default_factory=list) # pyright: ignore[reportUnknownVariableType] reasoning_messages: list[dict[str, Any]] = field(default_factory=list) # pyright: ignore[reportUnknownVariableType] accumulated_reasoning: dict[str, str] = field(default_factory=dict) # pyright: ignore[reportUnknownVariableType] + reasoning_message_id: str | None = None def get_tool_name(self, call_id: str | None) -> str | None: """Get tool name by call ID.""" @@ -462,12 +463,39 @@ def _emit_mcp_tool_result( return _emit_tool_result_common(content.call_id, raw_output, flow, predictive_handler) +def _close_reasoning_block(flow: FlowState) -> list[BaseEvent]: + """Close an open reasoning block, emitting end events. + + Should be called when the reasoning block is complete -- e.g. when + non-reasoning content arrives or at end of a run. + """ + if not flow.reasoning_message_id: + return [] + message_id = flow.reasoning_message_id + flow.reasoning_message_id = None + return [ + ReasoningMessageEndEvent(message_id=message_id), + ReasoningEndEvent(message_id=message_id), + ] + + def _emit_text_reasoning(content: Content, flow: FlowState | None = None) -> list[BaseEvent]: """Emit AG-UI reasoning events for text_reasoning content. Uses the protocol-defined reasoning event types so that AG-UI consumers such as CopilotKit can render reasoning natively. + When *flow* is provided the function follows the streaming pattern: it + emits ``ReasoningStartEvent`` / ``ReasoningMessageStartEvent`` only on + the first delta for a given ``message_id`` and just + ``ReasoningMessageContentEvent`` for subsequent deltas. The matching + ``ReasoningMessageEndEvent`` / ``ReasoningEndEvent`` are deferred until + ``_close_reasoning_block`` is called (e.g. when non-reasoning content + arrives or at end-of-run). + + Without *flow* (backward-compat) the full Start→Content→End sequence is + emitted for every call. + Only ``content.text`` is used for the visible reasoning message. If ``content.protected_data`` is present it is emitted as a ``ReasoningEncryptedValueEvent`` so that consumers can persist encrypted @@ -483,26 +511,49 @@ def _emit_text_reasoning(content: Content, flow: FlowState | None = None) -> lis message_id = content.id or generate_event_id() - events: list[BaseEvent] = [ - ReasoningStartEvent(message_id=message_id), - ReasoningMessageStartEvent(message_id=message_id, role="assistant"), - ] + events: list[BaseEvent] = [] - if text: - events.append(ReasoningMessageContentEvent(message_id=message_id, delta=text)) + if flow is not None: + # Streaming mode: track open reasoning block in flow state. + if flow.reasoning_message_id != message_id: + # Close any previously open reasoning block (different message_id). + events.extend(_close_reasoning_block(flow)) + # Open new reasoning block. + events.append(ReasoningStartEvent(message_id=message_id)) + events.append(ReasoningMessageStartEvent(message_id=message_id, role="assistant")) + flow.reasoning_message_id = message_id - events.append(ReasoningMessageEndEvent(message_id=message_id)) + if text: + events.append(ReasoningMessageContentEvent(message_id=message_id, delta=text)) + + if content.protected_data is not None: + events.append( + ReasoningEncryptedValueEvent( + subtype="message", + entity_id=message_id, + encrypted_value=content.protected_data, + ) + ) + else: + # No flow -- backward-compatible full sequence per call. + events.append(ReasoningStartEvent(message_id=message_id)) + events.append(ReasoningMessageStartEvent(message_id=message_id, role="assistant")) - if content.protected_data is not None: - events.append( - ReasoningEncryptedValueEvent( - subtype="message", - entity_id=message_id, - encrypted_value=content.protected_data, + if text: + events.append(ReasoningMessageContentEvent(message_id=message_id, delta=text)) + + events.append(ReasoningMessageEndEvent(message_id=message_id)) + + if content.protected_data is not None: + events.append( + ReasoningEncryptedValueEvent( + subtype="message", + entity_id=message_id, + encrypted_value=content.protected_data, + ) ) - ) - events.append(ReasoningEndEvent(message_id=message_id)) + events.append(ReasoningEndEvent(message_id=message_id)) # Persist reasoning into flow state for MESSAGES_SNAPSHOT. # Accumulate reasoning text per message_id, similar to flow.accumulated_text, @@ -546,23 +597,30 @@ def _emit_content( ) -> list[BaseEvent]: """Emit appropriate events for any content type.""" content_type = getattr(content, "type", None) + + # Close open reasoning block when switching to non-reasoning content. + if content_type != "text_reasoning": + events = _close_reasoning_block(flow) + else: + events = [] + if content_type == "text": - return _emit_text(content, flow, skip_text) + return events + _emit_text(content, flow, skip_text) if content_type == "function_call": - return _emit_tool_call(content, flow, predictive_handler) + return events + _emit_tool_call(content, flow, predictive_handler) if content_type == "function_result": - return _emit_tool_result(content, flow, predictive_handler) + return events + _emit_tool_result(content, flow, predictive_handler) if content_type == "function_approval_request": - return _emit_approval_request(content, flow, predictive_handler, require_confirmation) + return events + _emit_approval_request(content, flow, predictive_handler, require_confirmation) if content_type == "usage": - return _emit_usage(content) + return events + _emit_usage(content) if content_type == "oauth_consent_request": - return _emit_oauth_consent(content) + return events + _emit_oauth_consent(content) if content_type == "mcp_server_tool_call": - return _emit_mcp_tool_call(content, flow) + return events + _emit_mcp_tool_call(content, flow) if content_type == "mcp_server_tool_result": - return _emit_mcp_tool_result(content, flow, predictive_handler) + return events + _emit_mcp_tool_result(content, flow, predictive_handler) if content_type == "text_reasoning": return _emit_text_reasoning(content, flow) logger.debug("Skipping unsupported content type in AG-UI emitter: %s", content_type) - return [] + return events diff --git a/python/packages/ag-ui/agent_framework_ag_ui/_workflow_run.py b/python/packages/ag-ui/agent_framework_ag_ui/_workflow_run.py index a75d29abc4..d34cb7db61 100644 --- a/python/packages/ag-ui/agent_framework_ag_ui/_workflow_run.py +++ b/python/packages/ag-ui/agent_framework_ag_ui/_workflow_run.py @@ -29,6 +29,7 @@ from ._run_common import ( FlowState, _build_run_finished_event, + _close_reasoning_block, _emit_content, _extract_resume_payload, _normalize_resume_interrupts, @@ -729,6 +730,9 @@ def _drain_open_message() -> list[TextMessageEndEvent]: run_error_emitted = True terminal_emitted = True + for reasoning_evt in _close_reasoning_block(flow): + yield reasoning_evt + for end_event in _drain_open_message(): yield end_event diff --git a/python/packages/ag-ui/tests/ag_ui/test_run.py b/python/packages/ag-ui/tests/ag_ui/test_run.py index 0e5c329ce9..18b0d0d7e4 100644 --- a/python/packages/ag-ui/tests/ag_ui/test_run.py +++ b/python/packages/ag-ui/tests/ag_ui/test_run.py @@ -11,6 +11,7 @@ ReasoningMessageEndEvent, ReasoningMessageStartEvent, ReasoningStartEvent, + TextMessageContentEvent, TextMessageEndEvent, TextMessageStartEvent, ToolCallArgsEvent, @@ -29,6 +30,7 @@ from agent_framework_ag_ui._run_common import ( FlowState, _build_run_finished_event, + _close_reasoning_block, _emit_approval_request, _emit_content, _emit_mcp_tool_call, @@ -1344,8 +1346,11 @@ def test_routes_text_reasoning(self): events = _emit_content(content, flow) - assert len(events) == 5 + # Streaming pattern: Start + MessageStart + Content (no End events yet) + assert len(events) == 3 assert isinstance(events[0], ReasoningStartEvent) + assert isinstance(events[1], ReasoningMessageStartEvent) + assert isinstance(events[2], ReasoningMessageContentEvent) class TestReasoningInSnapshot: @@ -1501,3 +1506,137 @@ def test_reasoning_encrypted_value_updated_on_later_delta(self): assert len(flow.reasoning_messages) == 1 assert flow.reasoning_messages[0]["content"] == "part1 part2" assert flow.reasoning_messages[0]["encryptedValue"] == "encrypted-payload" + + def test_reasoning_done_after_deltas_does_not_duplicate(self): + """A done-style content arriving after deltas does not duplicate accumulated text. + + The upstream client should skip done events when deltas preceded them, + but if one leaks through, the accumulator must not double-append. + This test verifies that only the delta-produced text is stored. + """ + flow = FlowState() + msg_id = "reason_dedup" + + delta1 = Content.from_text_reasoning(id=msg_id, text="Hello ") + delta2 = Content.from_text_reasoning(id=msg_id, text="world") + + _emit_text_reasoning(delta1, flow) + _emit_text_reasoning(delta2, flow) + + # Accumulated text should equal the concatenation of deltas only + assert len(flow.reasoning_messages) == 1 + assert flow.reasoning_messages[0]["content"] == "Hello world" + assert flow.reasoning_messages[0]["id"] == msg_id + + def test_reasoning_deltas_emit_one_content_event_each(self): + """Each reasoning delta emits exactly one ReasoningMessageContentEvent + within a single Start/End sequence (streaming pattern).""" + flow = FlowState() + msg_id = "reason_evt" + + delta1 = Content.from_text_reasoning(id=msg_id, text="Think ") + delta2 = Content.from_text_reasoning(id=msg_id, text="hard") + + events1 = _emit_text_reasoning(delta1, flow) + events2 = _emit_text_reasoning(delta2, flow) + close_events = _close_reasoning_block(flow) + + all_events = events1 + events2 + close_events + content_events = [e for e in all_events if isinstance(e, ReasoningMessageContentEvent)] + + assert len(content_events) == 2 + assert content_events[0].delta == "Think " + assert content_events[1].delta == "hard" + + # Streaming pattern: one Start/End sequence wrapping both content events + start_events = [e for e in all_events if isinstance(e, ReasoningStartEvent)] + end_events = [e for e in all_events if isinstance(e, ReasoningEndEvent)] + msg_start_events = [e for e in all_events if isinstance(e, ReasoningMessageStartEvent)] + msg_end_events = [e for e in all_events if isinstance(e, ReasoningMessageEndEvent)] + assert len(start_events) == 1 + assert len(end_events) == 1 + assert len(msg_start_events) == 1 + assert len(msg_end_events) == 1 + + def test_reasoning_streaming_event_order(self): + """Streaming reasoning emits Start once, then Content per delta, then End on close.""" + flow = FlowState() + msg_id = "reason_order" + + d1 = Content.from_text_reasoning(id=msg_id, text="A ") + d2 = Content.from_text_reasoning(id=msg_id, text="B ") + d3 = Content.from_text_reasoning(id=msg_id, text="C") + + events = [] + events.extend(_emit_text_reasoning(d1, flow)) + events.extend(_emit_text_reasoning(d2, flow)) + events.extend(_emit_text_reasoning(d3, flow)) + events.extend(_close_reasoning_block(flow)) + + assert isinstance(events[0], ReasoningStartEvent) + assert isinstance(events[1], ReasoningMessageStartEvent) + assert isinstance(events[2], ReasoningMessageContentEvent) + assert events[2].delta == "A " + assert isinstance(events[3], ReasoningMessageContentEvent) + assert events[3].delta == "B " + assert isinstance(events[4], ReasoningMessageContentEvent) + assert events[4].delta == "C" + assert isinstance(events[5], ReasoningMessageEndEvent) + assert isinstance(events[6], ReasoningEndEvent) + assert len(events) == 7 + + def test_close_reasoning_block_noop_when_not_open(self): + """_close_reasoning_block returns empty list when no reasoning block is open.""" + flow = FlowState() + assert _close_reasoning_block(flow) == [] + + def test_close_reasoning_block_resets_state(self): + """_close_reasoning_block clears reasoning_message_id.""" + flow = FlowState() + _emit_text_reasoning(Content.from_text_reasoning(id="r1", text="x"), flow) + assert flow.reasoning_message_id == "r1" + + _close_reasoning_block(flow) + assert flow.reasoning_message_id is None + + def test_emit_content_closes_reasoning_on_text(self): + """Switching from reasoning to text content auto-closes reasoning block.""" + flow = FlowState() + reasoning = Content.from_text_reasoning(id="r1", text="thinking") + text = Content.from_text("answer") + + r_events = _emit_content(reasoning, flow) + t_events = _emit_content(text, flow) + + # reasoning events: Start + MsgStart + Content + assert isinstance(r_events[0], ReasoningStartEvent) + # text events should start with reasoning End events + assert isinstance(t_events[0], ReasoningMessageEndEvent) + assert isinstance(t_events[1], ReasoningEndEvent) + # then text start + + assert isinstance(t_events[2], TextMessageStartEvent) + assert isinstance(t_events[3], TextMessageContentEvent) + + def test_reasoning_distinct_ids_close_previous_block(self): + """Emitting reasoning with a new message_id auto-closes the previous block.""" + flow = FlowState() + c1 = Content.from_text_reasoning(id="block1", text="first") + c2 = Content.from_text_reasoning(id="block2", text="second") + + events1 = _emit_text_reasoning(c1, flow) + events2 = _emit_text_reasoning(c2, flow) + close = _close_reasoning_block(flow) + + # events1: Start(block1) + MsgStart(block1) + Content(block1) + assert events1[0].message_id == "block1" + # events2: MsgEnd(block1) + End(block1) + Start(block2) + MsgStart(block2) + Content(block2) + assert isinstance(events2[0], ReasoningMessageEndEvent) + assert events2[0].message_id == "block1" + assert isinstance(events2[1], ReasoningEndEvent) + assert events2[1].message_id == "block1" + assert isinstance(events2[2], ReasoningStartEvent) + assert events2[2].message_id == "block2" + # close: MsgEnd(block2) + End(block2) + assert isinstance(close[0], ReasoningMessageEndEvent) + assert close[0].message_id == "block2" diff --git a/python/packages/azure-cosmos/agent_framework_azure_cosmos/_checkpoint_storage.py b/python/packages/azure-cosmos/agent_framework_azure_cosmos/_checkpoint_storage.py index 4544311fd9..1b6257f203 100644 --- a/python/packages/azure-cosmos/agent_framework_azure_cosmos/_checkpoint_storage.py +++ b/python/packages/azure-cosmos/agent_framework_azure_cosmos/_checkpoint_storage.py @@ -315,10 +315,7 @@ async def get_latest(self, *, workflow_name: str) -> WorkflowCheckpoint | None: """ await self._ensure_container_proxy() - query = ( - "SELECT * FROM c WHERE c.workflow_name = @workflow_name " - "ORDER BY c.timestamp DESC OFFSET 0 LIMIT 1" - ) + query = "SELECT * FROM c WHERE c.workflow_name = @workflow_name ORDER BY c.timestamp DESC OFFSET 0 LIMIT 1" parameters: list[dict[str, object]] = [ {"name": "@workflow_name", "value": workflow_name}, ] @@ -351,10 +348,7 @@ async def list_checkpoint_ids(self, *, workflow_name: str) -> list[CheckpointID] """ await self._ensure_container_proxy() - query = ( - "SELECT c.checkpoint_id FROM c WHERE c.workflow_name = @workflow_name " - "ORDER BY c.timestamp ASC" - ) + query = "SELECT c.checkpoint_id FROM c WHERE c.workflow_name = @workflow_name ORDER BY c.timestamp ASC" parameters: list[dict[str, object]] = [ {"name": "@workflow_name", "value": workflow_name}, ] diff --git a/python/packages/azure-cosmos/tests/test_cosmos_checkpoint_storage.py b/python/packages/azure-cosmos/tests/test_cosmos_checkpoint_storage.py index 5e183c3223..52155d0e21 100644 --- a/python/packages/azure-cosmos/tests/test_cosmos_checkpoint_storage.py +++ b/python/packages/azure-cosmos/tests/test_cosmos_checkpoint_storage.py @@ -402,9 +402,7 @@ async def test_list_checkpoint_ids_empty_returns_empty(mock_container: MagicMock # --- Tests for close and context manager --- -async def test_close_closes_owned_client( - monkeypatch: pytest.MonkeyPatch, mock_cosmos_client: MagicMock -) -> None: +async def test_close_closes_owned_client(monkeypatch: pytest.MonkeyPatch, mock_cosmos_client: MagicMock) -> None: mock_factory = MagicMock(return_value=mock_cosmos_client) monkeypatch.setattr(checkpoint_storage_module, "CosmosClient", mock_factory) diff --git a/python/packages/openai/agent_framework_openai/_chat_client.py b/python/packages/openai/agent_framework_openai/_chat_client.py index f5c0eea03e..ecd17c4b5e 100644 --- a/python/packages/openai/agent_framework_openai/_chat_client.py +++ b/python/packages/openai/agent_framework_openai/_chat_client.py @@ -512,6 +512,7 @@ def _inner_get_response( if stream: function_call_ids: dict[int, tuple[str, str]] = {} + seen_reasoning_delta_item_ids: set[str] = set() validated_options: dict[str, Any] | None = None async def _stream() -> AsyncIterable[ChatResponseUpdate]: @@ -530,6 +531,7 @@ async def _stream() -> AsyncIterable[ChatResponseUpdate]: chunk, options=validated_options, function_call_ids=function_call_ids, + seen_reasoning_delta_item_ids=seen_reasoning_delta_item_ids, ) except Exception as ex: self._handle_request_error(ex) @@ -1930,6 +1932,7 @@ def _parse_chunk_from_openai( event: OpenAIResponseStreamEvent, options: dict[str, Any], function_call_ids: dict[int, tuple[str, str]], + seen_reasoning_delta_item_ids: set[str] | None = None, ) -> ChatResponseUpdate: """Parse an OpenAI Responses API streaming event into a ChatResponseUpdate.""" metadata: dict[str, Any] = {} @@ -2008,6 +2011,8 @@ def _parse_chunk_from_openai( contents.append(Content.from_text(text=event.delta, raw_representation=event)) metadata.update(self._get_metadata_from_response(event)) case "response.reasoning_text.delta": + if seen_reasoning_delta_item_ids is not None: + seen_reasoning_delta_item_ids.add(event.item_id) contents.append( Content.from_text_reasoning( id=event.item_id, @@ -2017,15 +2022,21 @@ def _parse_chunk_from_openai( ) metadata.update(self._get_metadata_from_response(event)) case "response.reasoning_text.done": - contents.append( - Content.from_text_reasoning( - id=event.item_id, - text=event.text, - raw_representation=event, + # Done event carries the full accumulated text. Emit it only as a + # fallback when no delta was already received for this item_id, to + # avoid duplicating content in downstream accumulators (e.g. ag-ui). + if seen_reasoning_delta_item_ids is None or event.item_id not in seen_reasoning_delta_item_ids: + contents.append( + Content.from_text_reasoning( + id=event.item_id, + text=event.text, + raw_representation=event, + ) ) - ) metadata.update(self._get_metadata_from_response(event)) case "response.reasoning_summary_text.delta": + if seen_reasoning_delta_item_ids is not None: + seen_reasoning_delta_item_ids.add(event.item_id) contents.append( Content.from_text_reasoning( id=event.item_id, @@ -2035,13 +2046,17 @@ def _parse_chunk_from_openai( ) metadata.update(self._get_metadata_from_response(event)) case "response.reasoning_summary_text.done": - contents.append( - Content.from_text_reasoning( - id=event.item_id, - text=event.text, - raw_representation=event, + # Done event carries the full accumulated text. Emit it only as a + # fallback when no delta was already received for this item_id, to + # avoid duplicating content in downstream accumulators (e.g. ag-ui). + if seen_reasoning_delta_item_ids is None or event.item_id not in seen_reasoning_delta_item_ids: + contents.append( + Content.from_text_reasoning( + id=event.item_id, + text=event.text, + raw_representation=event, + ) ) - ) metadata.update(self._get_metadata_from_response(event)) case "response.code_interpreter_call_code.delta": call_id = getattr(event, "call_id", None) or getattr(event, "id", None) or event.item_id @@ -2065,6 +2080,9 @@ def _parse_chunk_from_openai( ) ) metadata.update(self._get_metadata_from_response(event)) + # NOTE: Unlike reasoning done events, code_interpreter done events always + # emit content because downstream consumers do not accumulate + # code_interpreter deltas the same way. case "response.code_interpreter_call_code.done": call_id = getattr(event, "call_id", None) or getattr(event, "id", None) or event.item_id ci_additional_properties = { diff --git a/python/packages/openai/tests/openai/test_openai_chat_client.py b/python/packages/openai/tests/openai/test_openai_chat_client.py index fd55321238..1e18f85273 100644 --- a/python/packages/openai/tests/openai/test_openai_chat_client.py +++ b/python/packages/openai/tests/openai/test_openai_chat_client.py @@ -2808,11 +2808,12 @@ def test_streaming_reasoning_text_delta_event() -> None: mock_metadata.assert_called_once_with(event) -def test_streaming_reasoning_text_done_event() -> None: - """Test reasoning text done event creates TextReasoningContent with complete text.""" +def test_streaming_reasoning_text_done_event_skipped_after_deltas() -> None: + """Test reasoning text done event does not emit content when deltas were already received.""" client = OpenAIChatClient(model="test-model", api_key="test-key") chat_options = ChatOptions() function_call_ids: dict[int, tuple[str, str]] = {} + seen_reasoning_delta_item_ids: set[str] = {"reasoning_456"} event = ResponseReasoningTextDoneEvent( type="response.reasoning_text.done", @@ -2824,12 +2825,40 @@ def test_streaming_reasoning_text_done_event() -> None: ) with patch.object(client, "_get_metadata_from_response", return_value={"test": "data"}) as mock_metadata: - response = client._parse_chunk_from_openai(event, chat_options, function_call_ids) # type: ignore + response = client._parse_chunk_from_openai( + event, chat_options, function_call_ids, seen_reasoning_delta_item_ids + ) # type: ignore + + assert len(response.contents) == 0 + mock_metadata.assert_called_once_with(event) + assert response.additional_properties == {"test": "data"} + + +def test_streaming_reasoning_text_done_event_fallback_without_deltas() -> None: + """Test reasoning text done event emits content when no deltas were received for this item_id.""" + client = OpenAIChatClient(model="test-model", api_key="test-key") + chat_options = ChatOptions() + function_call_ids: dict[int, tuple[str, str]] = {} + seen_reasoning_delta_item_ids: set[str] = set() + + event = ResponseReasoningTextDoneEvent( + type="response.reasoning_text.done", + content_index=0, + item_id="reasoning_456", + output_index=0, + sequence_number=2, + text="complete reasoning", + ) + + with patch.object(client, "_get_metadata_from_response", return_value={"test": "data"}) as mock_metadata: + response = client._parse_chunk_from_openai( + event, chat_options, function_call_ids, seen_reasoning_delta_item_ids + ) # type: ignore assert len(response.contents) == 1 assert response.contents[0].type == "text_reasoning" + assert response.contents[0].id == "reasoning_456" assert response.contents[0].text == "complete reasoning" - assert response.contents[0].raw_representation == event mock_metadata.assert_called_once_with(event) assert response.additional_properties == {"test": "data"} @@ -2859,11 +2888,12 @@ def test_streaming_reasoning_summary_text_delta_event() -> None: mock_metadata.assert_called_once_with(event) -def test_streaming_reasoning_summary_text_done_event() -> None: - """Test reasoning summary text done event creates TextReasoningContent with complete text.""" +def test_streaming_reasoning_summary_text_done_event_skipped_after_deltas() -> None: + """Test reasoning summary text done event does not emit content when deltas were already received.""" client = OpenAIChatClient(model="test-model", api_key="test-key") chat_options = ChatOptions() function_call_ids: dict[int, tuple[str, str]] = {} + seen_reasoning_delta_item_ids: set[str] = {"summary_012"} event = ResponseReasoningSummaryTextDoneEvent( type="response.reasoning_summary_text.done", @@ -2875,16 +2905,94 @@ def test_streaming_reasoning_summary_text_done_event() -> None: ) with patch.object(client, "_get_metadata_from_response", return_value={"custom": "meta"}) as mock_metadata: - response = client._parse_chunk_from_openai(event, chat_options, function_call_ids) # type: ignore + response = client._parse_chunk_from_openai( + event, chat_options, function_call_ids, seen_reasoning_delta_item_ids + ) # type: ignore + + assert len(response.contents) == 0 + mock_metadata.assert_called_once_with(event) + assert response.additional_properties == {"custom": "meta"} + + +def test_streaming_reasoning_summary_text_done_event_fallback_without_deltas() -> None: + """Test reasoning summary text done event emits content when no deltas were received for this item_id.""" + client = OpenAIChatClient(model="test-model", api_key="test-key") + chat_options = ChatOptions() + function_call_ids: dict[int, tuple[str, str]] = {} + seen_reasoning_delta_item_ids: set[str] = set() + + event = ResponseReasoningSummaryTextDoneEvent( + type="response.reasoning_summary_text.done", + item_id="summary_012", + output_index=0, + sequence_number=4, + summary_index=0, + text="complete summary", + ) + + with patch.object(client, "_get_metadata_from_response", return_value={"custom": "meta"}) as mock_metadata: + response = client._parse_chunk_from_openai( + event, chat_options, function_call_ids, seen_reasoning_delta_item_ids + ) # type: ignore assert len(response.contents) == 1 assert response.contents[0].type == "text_reasoning" + assert response.contents[0].id == "summary_012" assert response.contents[0].text == "complete summary" - assert response.contents[0].raw_representation == event mock_metadata.assert_called_once_with(event) assert response.additional_properties == {"custom": "meta"} +def test_streaming_reasoning_deltas_then_done_no_duplication() -> None: + """Sending delta events followed by a done event produces content only from deltas.""" + client = OpenAIChatClient(model="test-model", api_key="test-key") + chat_options = ChatOptions() + function_call_ids: dict[int, tuple[str, str]] = {} + seen_reasoning_delta_item_ids: set[str] = set() + item_id = "reasoning_seq" + + delta1 = ResponseReasoningTextDeltaEvent( + type="response.reasoning_text.delta", + content_index=0, + item_id=item_id, + output_index=0, + sequence_number=1, + delta="Hello ", + ) + delta2 = ResponseReasoningTextDeltaEvent( + type="response.reasoning_text.delta", + content_index=0, + item_id=item_id, + output_index=0, + sequence_number=2, + delta="world", + ) + done = ResponseReasoningTextDoneEvent( + type="response.reasoning_text.done", + content_index=0, + item_id=item_id, + output_index=0, + sequence_number=3, + text="Hello world", + ) + + all_contents = [] + with patch.object(client, "_get_metadata_from_response", return_value={}): + for event in [delta1, delta2, done]: + response = client._parse_chunk_from_openai( + event, + chat_options, + function_call_ids, + seen_reasoning_delta_item_ids, # type: ignore + ) + all_contents.extend(response.contents) + + assert len(all_contents) == 2 + assert all_contents[0].text == "Hello " + assert all_contents[1].text == "world" + assert "".join(c.text for c in all_contents) == "Hello world" + + def test_streaming_reasoning_events_preserve_metadata() -> None: """Test that reasoning events preserve metadata like regular text events.""" client = OpenAIChatClient(model="test-model", api_key="test-key") diff --git a/python/samples/02-agents/conversations/cosmos_history_provider_conversation_persistence.py b/python/samples/02-agents/conversations/cosmos_history_provider_conversation_persistence.py index ef2b444d28..548f09a92a 100644 --- a/python/samples/02-agents/conversations/cosmos_history_provider_conversation_persistence.py +++ b/python/samples/02-agents/conversations/cosmos_history_provider_conversation_persistence.py @@ -82,9 +82,7 @@ async def main() -> None: ): session = agent.create_session() - response1 = await agent.run( - "My name is Ada. I'm building a distributed database in Rust.", session=session - ) + response1 = await agent.run("My name is Ada. I'm building a distributed database in Rust.", session=session) print("User: My name is Ada. I'm building a distributed database in Rust.") print(f"Assistant: {response1.text}\n") diff --git a/python/samples/02-agents/conversations/cosmos_history_provider_sessions.py b/python/samples/02-agents/conversations/cosmos_history_provider_sessions.py index 2d1861e503..31e8eef16c 100644 --- a/python/samples/02-agents/conversations/cosmos_history_provider_sessions.py +++ b/python/samples/02-agents/conversations/cosmos_history_provider_sessions.py @@ -82,9 +82,7 @@ async def main() -> None: alice_session = agent.create_session(session_id="tenant-alice-session-1") - response = await agent.run( - "Hi! I'm planning a trip to Italy. I love Renaissance art.", session=alice_session - ) + response = await agent.run("Hi! I'm planning a trip to Italy. I love Renaissance art.", session=alice_session) print("Alice: I'm planning a trip to Italy. I love Renaissance art.") print(f"Assistant: {response.text}\n") @@ -97,9 +95,7 @@ async def main() -> None: bob_session = agent.create_session(session_id="tenant-bob-session-1") - response = await agent.run( - "Hey! I'm learning to cook Thai food. I just made pad thai.", session=bob_session - ) + response = await agent.run("Hey! I'm learning to cook Thai food. I just made pad thai.", session=bob_session) print("Bob: I'm learning to cook Thai food. I just made pad thai.") print(f"Assistant: {response.text}\n") diff --git a/python/samples/03-workflows/checkpoint/cosmos_workflow_checkpointing.py b/python/samples/03-workflows/checkpoint/cosmos_workflow_checkpointing.py index 4726742ffc..fd4608db03 100644 --- a/python/samples/03-workflows/checkpoint/cosmos_workflow_checkpointing.py +++ b/python/samples/03-workflows/checkpoint/cosmos_workflow_checkpointing.py @@ -111,10 +111,7 @@ async def main() -> None: cosmos_key = os.getenv("AZURE_COSMOS_KEY") if not cosmos_endpoint or not cosmos_database_name or not cosmos_container_name: - print( - "Please set AZURE_COSMOS_ENDPOINT, AZURE_COSMOS_DATABASE_NAME, " - "and AZURE_COSMOS_CONTAINER_NAME." - ) + print("Please set AZURE_COSMOS_ENDPOINT, AZURE_COSMOS_DATABASE_NAME, and AZURE_COSMOS_CONTAINER_NAME.") return # Authentication: supports both managed identity/RBAC and key-based auth. @@ -131,12 +128,15 @@ async def main() -> None: else: from azure.identity.aio import DefaultAzureCredential - async with DefaultAzureCredential() as credential, CosmosCheckpointStorage( - endpoint=cosmos_endpoint, - credential=credential, - database_name=cosmos_database_name, - container_name=cosmos_container_name, - ) as checkpoint_storage: + async with ( + DefaultAzureCredential() as credential, + CosmosCheckpointStorage( + endpoint=cosmos_endpoint, + credential=credential, + database_name=cosmos_database_name, + container_name=cosmos_container_name, + ) as checkpoint_storage, + ): await _run_workflow(checkpoint_storage) diff --git a/python/samples/03-workflows/checkpoint/cosmos_workflow_checkpointing_foundry.py b/python/samples/03-workflows/checkpoint/cosmos_workflow_checkpointing_foundry.py index 49c3e779f9..7d4f6ad17f 100644 --- a/python/samples/03-workflows/checkpoint/cosmos_workflow_checkpointing_foundry.py +++ b/python/samples/03-workflows/checkpoint/cosmos_workflow_checkpointing_foundry.py @@ -57,10 +57,7 @@ async def main() -> None: return if not cosmos_endpoint or not cosmos_database_name or not cosmos_container_name: - print( - "Please set AZURE_COSMOS_ENDPOINT, AZURE_COSMOS_DATABASE_NAME, " - "and AZURE_COSMOS_CONTAINER_NAME." - ) + print("Please set AZURE_COSMOS_ENDPOINT, AZURE_COSMOS_DATABASE_NAME, and AZURE_COSMOS_CONTAINER_NAME.") return # Use a single AzureCliCredential for both Cosmos and Foundry,