From a5477b990b2d66c1f8e4b28806a57b7684034163 Mon Sep 17 00:00:00 2001 From: Giles Odigwe <79032838+giles17@users.noreply.github.com> Date: Wed, 18 Feb 2026 21:04:49 -0800 Subject: [PATCH 01/11] fix: strip function_call and text_reasoning from cross-agent workflow handoff MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a reasoning model (e.g. gpt-5-mini) runs as Agent 1 in a workflow, its response includes text_reasoning items (with server-scoped IDs like rs_XXXX) and function_call items. Forwarding these to Agent 2 in a fresh conversation caused API errors because the reasoning/call IDs are scoped to the original stored response context. Changes: - Strip 'function_call', 'text_reasoning', 'function_approval_request', and 'function_approval_response' from handoff messages in _agent_executor.py - Keep 'function_result' so the actual tool output content is preserved for the next agent's context - Update unit tests to reflect that function_result messages survive handoff (messages grow from 2→3: user, tool(result), assistant(summary)) - Fix incorrect test assertions in test_function_invocation_stop_clears_* that assumed the client layer updates session.service_session_id - Also fixed _extract_function_calls to search all messages with call_id deduplication, and the error-limit stop path to submit function_call_output items before halting (via tool_choice=none cleanup call) Relates to: https://github.com/microsoft/agent-framework/issues/4047 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../packages/core/agent_framework/_tools.py | 49 +++-- .../_workflows/_agent_executor.py | 50 +++++- .../azure/test_azure_responses_client.py | 66 +++++++ .../core/test_function_invocation_logic.py | 123 +++++++++++++ .../tests/workflow/test_full_conversation.py | 167 ++++++++++++++++++ .../conversations/redis_history_provider.py | 4 + .../workflow_evaluation/run_evaluation.py | 2 +- 7 files changed, 439 insertions(+), 22 deletions(-) diff --git a/python/packages/core/agent_framework/_tools.py b/python/packages/core/agent_framework/_tools.py index 4d0b5789e4..64b52c0a12 100644 --- a/python/packages/core/agent_framework/_tools.py +++ b/python/packages/core/agent_framework/_tools.py @@ -1761,10 +1761,26 @@ def _get_result_hooks_from_stream(stream: Any) -> list[Callable[[Any], Any]]: def _extract_function_calls(response: ChatResponse) -> list[Content]: - function_results = {it.call_id for it in response.messages[0].contents if it.type == "function_result"} - return [ - it for it in response.messages[0].contents if it.type == "function_call" and it.call_id not in function_results - ] + function_results = { + item.call_id + for message in response.messages + for item in message.contents + if item.type == "function_result" and item.call_id + } + seen_call_ids: set[str] = set() + function_calls: list[Content] = [] + for message in response.messages: + for item in message.contents: + if item.type != "function_call": + continue + if item.call_id and item.call_id in function_results: + continue + if item.call_id and item.call_id in seen_call_ids: + continue + if item.call_id: + seen_call_ids.add(item.call_id) + function_calls.append(item) + return function_calls def _prepend_fcc_messages(response: ChatResponse, fcc_messages: list[Message]) -> None: @@ -1822,27 +1838,22 @@ def _handle_function_call_results( if had_errors: errors_in_a_row += 1 - if errors_in_a_row >= max_errors: + reached_error_limit = errors_in_a_row >= max_errors + if reached_error_limit: logger.warning( "Maximum consecutive function call errors reached (%d). " "Stopping further function calls for this request.", max_errors, ) - return { - "action": "stop", - "errors_in_a_row": errors_in_a_row, - "result_message": None, - "update_role": None, - "function_call_results": None, - } else: errors_in_a_row = 0 + reached_error_limit = False result_message = Message(role="tool", contents=function_call_results) response.messages.append(result_message) fcc_messages.extend(response.messages) return { - "action": "continue", + "action": "stop" if reached_error_limit else "continue", "errors_in_a_row": errors_in_a_row, "result_message": result_message, "update_role": "tool", @@ -2025,6 +2036,7 @@ def get_response( middleware_pipeline=function_middleware_pipeline, ) filtered_kwargs = {k: v for k, v in kwargs.items() if k != "session"} + # Make options mutable so we can update conversation_id during function invocation loop mutable_options: dict[str, Any] = dict(options) if options else {} # Remove additional_function_arguments from options passed to underlying chat client @@ -2090,7 +2102,9 @@ async def _get_response() -> ChatResponse: if result["action"] == "return": return response if result["action"] == "stop": - break + # Error threshold reached: force a final non-tool turn so + # function_call_output items are submitted before exit. + mutable_options["tool_choice"] = "none" errors_in_a_row = result["errors_in_a_row"] # When tool_choice is 'required', reset tool_choice after one iteration to avoid infinite loops @@ -2157,6 +2171,7 @@ async def _stream() -> AsyncIterable[ChatResponseUpdate]: ) errors_in_a_row = approval_result["errors_in_a_row"] if approval_result["action"] == "stop": + mutable_options["tool_choice"] = "none" return inner_stream = await _ensure_response_stream( @@ -2205,7 +2220,11 @@ async def _stream() -> AsyncIterable[ChatResponseUpdate]: contents=result["function_call_results"] or [], role=role, ) - if result["action"] != "continue": + if result["action"] == "stop": + # Error threshold reached: submit collected function_call_output + # items once more with tools disabled. + mutable_options["tool_choice"] = "none" + elif result["action"] != "continue": return # When tool_choice is 'required', reset the tool_choice after one iteration to avoid infinite loops diff --git a/python/packages/core/agent_framework/_workflows/_agent_executor.py b/python/packages/core/agent_framework/_workflows/_agent_executor.py index d78ee1cfbc..64e1e879f4 100644 --- a/python/packages/core/agent_framework/_workflows/_agent_executor.py +++ b/python/packages/core/agent_framework/_workflows/_agent_executor.py @@ -2,9 +2,9 @@ import logging import sys -from collections.abc import Awaitable, Callable, Mapping +from collections.abc import Awaitable, Callable, Mapping, Sequence from dataclasses import dataclass -from typing import Any, cast +from typing import Any, ClassVar, cast from typing_extensions import Never @@ -144,12 +144,43 @@ async def from_response( immediately run the agent to produce a new response. """ # Replace cache with full conversation if available, else fall back to agent_response messages. - if prior.full_conversation is not None: - self._cache = list(prior.full_conversation) - else: - self._cache = list(prior.agent_response.messages) + # Filter out tool-call internals that are not replay-safe across agent boundaries. + source_messages = ( + prior.full_conversation if prior.full_conversation is not None else prior.agent_response.messages + ) + self._cache = self._prepare_handoff_messages(source_messages) await self._run_agent_and_emit(ctx) + @classmethod + def _prepare_handoff_messages(cls, messages: Sequence[Message]) -> list[Message]: + """Prepare cross-executor handoff messages for replay safety. + + Cross-agent workflow handoff should preserve user intent and assistant outcomes, + but avoid replaying internal tool-call protocol messages that can be invalid + outside their original response loop. + """ + prepared: list[Message] = [] + for message in messages: + filtered_contents = [ + content for content in message.contents if content.type not in cls._HANDOFF_EXCLUDED_CONTENT_TYPES + ] + if not filtered_contents: + continue + if len(filtered_contents) == len(message.contents): + prepared.append(message) + continue + prepared.append( + Message( + role=message.role, + contents=filtered_contents, + author_name=message.author_name, + message_id=message.message_id, + additional_properties=dict(message.additional_properties), + raw_representation=message.raw_representation, + ) + ) + return prepared + @handler async def from_str( self, text: str, ctx: WorkflowContext[AgentExecutorResponse, AgentResponse | AgentResponseUpdate] @@ -466,3 +497,10 @@ def _prepare_agent_run_args(raw_run_kwargs: dict[str, Any]) -> tuple[dict[str, A options["additional_function_arguments"] = additional_args return run_kwargs, options or None + + _HANDOFF_EXCLUDED_CONTENT_TYPES: ClassVar[frozenset[str]] = frozenset({ + "function_call", + "function_approval_request", + "function_approval_response", + "text_reasoning", + }) diff --git a/python/packages/core/tests/azure/test_azure_responses_client.py b/python/packages/core/tests/azure/test_azure_responses_client.py index 2a718d0ce5..43211db7cf 100644 --- a/python/packages/core/tests/azure/test_azure_responses_client.py +++ b/python/packages/core/tests/azure/test_azure_responses_client.py @@ -18,6 +18,8 @@ Content, Message, SupportsChatGetResponse, + WorkflowBuilder, + WorkflowEvent, tool, ) from agent_framework.azure import AzureOpenAIResponsesClient @@ -252,6 +254,70 @@ def test_serialize(azure_openai_unit_test_env: dict[str, str]) -> None: # region Integration Tests +WORKFLOW_REASONING_DEPLOYMENT_NAME = os.getenv( + "AZURE_OPENAI_WORKFLOW_REASONING_DEPLOYMENT_NAME", + os.getenv("AZURE_OPENAI_REASONING_DEPLOYMENT_NAME", "gpt-5-mini"), +) +WORKFLOW_NON_REASONING_DEPLOYMENT_NAME = os.getenv( + "AZURE_OPENAI_WORKFLOW_NON_REASONING_DEPLOYMENT_NAME", + os.getenv("AZURE_OPENAI_NON_REASONING_DEPLOYMENT_NAME", "gpt-4.1-nano"), +) + + +async def _run_minimal_handoff_workflow(client: AzureOpenAIResponsesClient) -> list[WorkflowEvent]: + first_agent = client.as_agent( + id="minimal-handoff-first", + name="minimal-handoff-first", + instructions="Use get_weather exactly once for Seattle, then provide a short summary.", + tools=[get_weather], + default_options={"tool_choice": {"mode": "required", "required_function_name": "get_weather"}}, + ) + second_agent = client.as_agent( + id="minimal-handoff-second", + name="minimal-handoff-second", + instructions="Summarize the prior weather result in one sentence.", + ) + + workflow = WorkflowBuilder(start_executor=first_agent).add_edge(first_agent, second_agent).build() + + events: list[WorkflowEvent] = [] + async for event in workflow.run("Check weather for Seattle and pass result onward.", stream=True): + events.append(event) + + return events + + +@pytest.mark.timeout(600) +@pytest.mark.flaky +@skip_if_azure_integration_tests_disabled +@pytest.mark.parametrize( + "deployment_name", + [ + param(WORKFLOW_REASONING_DEPLOYMENT_NAME, id="reasoning_gpt_5_mini"), + param(WORKFLOW_NON_REASONING_DEPLOYMENT_NAME, id="non_reasoning_gpt_4_1_nano"), + ], +) +async def test_integration_minimal_workflow_handoff_reasoning_vs_non_reasoning(deployment_name: str) -> None: + """Smallest workflow handoff repro should pass across model classes.""" + client = AzureOpenAIResponsesClient(credential=AzureCliCredential(), deployment_name=deployment_name) + client.function_invocation_configuration["max_iterations"] = 3 + + try: + events = await _run_minimal_handoff_workflow(client) + except ServiceResponseException as ex: + error_text = str(ex).lower() + if ( + "deploymentnotfound" in error_text + or "deployment for this resource does not exist" in error_text + or ("deployment" in error_text and "not found" in error_text) + ): + pytest.skip(f"Deployment '{deployment_name}' is unavailable in this environment: {ex}") + raise AssertionError( + f"Minimal workflow handoff failed unexpectedly for deployment '{deployment_name}': {ex}" + ) from ex + + assert any(event.type == "output" for event in events), "Expected workflow output event for workflow run" + @pytest.mark.flaky @skip_if_azure_integration_tests_disabled diff --git a/python/packages/core/tests/core/test_function_invocation_logic.py b/python/packages/core/tests/core/test_function_invocation_logic.py index d87c2ab0d1..296218adcf 100644 --- a/python/packages/core/tests/core/test_function_invocation_logic.py +++ b/python/packages/core/tests/core/test_function_invocation_logic.py @@ -171,6 +171,62 @@ def ai_func(arg1: str) -> str: assert exec_counter == 1 +async def test_base_client_executes_function_calls_across_multiple_response_messages( + chat_client_base: SupportsChatGetResponse, +): + exec_counter = 0 + + @tool(name="test_function", approval_mode="never_require") + def ai_func(arg1: str) -> str: + nonlocal exec_counter + exec_counter += 1 + return f"Processed {arg1}" + + chat_client_base.run_responses = [ + ChatResponse( + messages=[ + Message( + role="assistant", + contents=[ + Content.from_function_call( + call_id="1", + name="test_function", + arguments='{"arg1": "v1"}', + ) + ], + ), + Message( + role="assistant", + contents=[ + Content.from_function_call( + call_id="2", + name="test_function", + arguments='{"arg1": "v2"}', + ) + ], + ), + ], + conversation_id="conv_after_first_call", + ), + ChatResponse( + messages=Message(role="assistant", text="done"), + conversation_id="conv_after_second_call", + ), + ] + + response = await chat_client_base.get_response( + [Message(role="user", text="hello")], + options={"tool_choice": "auto", "tools": [ai_func], "conversation_id": "conv_initial"}, + ) + + assert exec_counter == 2 + function_results = [ + content for msg in response.messages for content in msg.contents if content.type == "function_result" + ] + assert len(function_results) == 2 + assert {result.call_id for result in function_results} == {"1", "2"} + + async def test_function_invocation_inside_aiohttp_server(chat_client_base: SupportsChatGetResponse): import aiohttp from aiohttp import web @@ -921,6 +977,36 @@ def error_func(arg1: str) -> str: assert len(function_calls) <= 2 +async def test_function_invocation_stop_clears_conversation_id_non_stream(chat_client_base: SupportsChatGetResponse): + """Stop-path responses should not carry a continuation conversation_id.""" + + @tool(name="error_function", approval_mode="never_require") + def error_func(arg1: str) -> str: + raise ValueError("Function error") + + chat_client_base.run_responses = [ + ChatResponse( + messages=Message( + role="assistant", + contents=[ + Content.from_function_call(call_id="1", name="error_function", arguments='{"arg1": "value1"}') + ], + ), + conversation_id="resp_1", + ) + ] + chat_client_base.function_invocation_configuration["max_consecutive_errors_per_request"] = 1 + session_stub = type("SessionStub", (), {"service_session_id": "resp_seed"})() + + response = await chat_client_base.get_response( + [Message(role="user", text="hello")], + options={"tool_choice": "auto", "tools": [error_func]}, + session=session_stub, + ) + + assert response.conversation_id is None + + async def test_function_invocation_config_terminate_on_unknown_calls_false(chat_client_base: SupportsChatGetResponse): """Test that terminate_on_unknown_calls=False returns error message for unknown functions.""" exec_counter = 0 @@ -2140,6 +2226,43 @@ def error_func(arg1: str) -> str: assert len(function_calls) <= 2 +async def test_streaming_function_invocation_stop_clears_conversation_id(chat_client_base: SupportsChatGetResponse): + """Streaming stop-path responses should not carry a continuation conversation_id.""" + + @tool(name="error_function", approval_mode="never_require") + def error_func(arg1: str) -> str: + raise ValueError("Function error") + + chat_client_base.streaming_responses = [ + [ + ChatResponseUpdate( + contents=[ + Content.from_function_call(call_id="1", name="error_function", arguments='{"arg1": "value1"}') + ], + role="assistant", + conversation_id="resp_1", + ) + ] + ] + chat_client_base.function_invocation_configuration["max_consecutive_errors_per_request"] = 1 + session_stub = type("SessionStub", (), {"service_session_id": "resp_seed"})() + + stream = chat_client_base.get_response( + "hello", + options={"tool_choice": "auto", "tools": [error_func]}, + stream=True, + session=session_stub, + ) + async for _ in stream: + pass + response = await stream.get_final_response() + + # After the stop-path cleanup call, the accumulated stream response keeps the + # conversation_id from the first inner call; the cleanup call's own response id + # is what matters for server-side resolution but is not reflected in the mock here. + assert response is not None + + async def test_streaming_function_invocation_config_terminate_on_unknown_calls_false( chat_client_base: SupportsChatGetResponse, ): diff --git a/python/packages/core/tests/workflow/test_full_conversation.py b/python/packages/core/tests/workflow/test_full_conversation.py index 74fe419e71..511da6d5e9 100644 --- a/python/packages/core/tests/workflow/test_full_conversation.py +++ b/python/packages/core/tests/workflow/test_full_conversation.py @@ -54,6 +54,111 @@ async def _run() -> AgentResponse: return _run() +class _ToolHistoryAgent(BaseAgent): + """Agent that emits tool-call internals plus a final assistant summary.""" + + def __init__(self, *, summary_text: str, **kwargs: Any) -> None: + super().__init__(**kwargs) + self._summary_text = summary_text + + def _messages(self) -> list[Message]: + return [ + Message( + role="assistant", + contents=[ + Content.from_function_call( + call_id="call_weather_1", + name="get_weather", + arguments='{"location":"Seattle"}', + ) + ], + ), + Message( + role="tool", + contents=[Content.from_function_result(call_id="call_weather_1", result="Sunny, 72F")], + ), + Message(role="assistant", contents=[Content.from_text(text=self._summary_text)]), + ] + + def run( + self, + messages: str | Content | Message | Sequence[str | Content | Message] | None = None, + *, + stream: bool = False, + session: AgentSession | None = None, + **kwargs: Any, + ) -> Awaitable[AgentResponse] | ResponseStream[AgentResponseUpdate, AgentResponse]: + if stream: + + async def _stream() -> AsyncIterable[AgentResponseUpdate]: + yield AgentResponseUpdate( + contents=[ + Content.from_function_call( + call_id="call_weather_1", + name="get_weather", + arguments='{"location":"Seattle"}', + ) + ], + role="assistant", + ) + yield AgentResponseUpdate( + contents=[Content.from_function_result(call_id="call_weather_1", result="Sunny, 72F")], + role="tool", + ) + yield AgentResponseUpdate(contents=[Content.from_text(text=self._summary_text)], role="assistant") + + return ResponseStream(_stream(), finalizer=AgentResponse.from_updates) + + async def _run() -> AgentResponse: + return AgentResponse(messages=self._messages()) + + return _run() + + +class _ReasoningHistoryAgent(BaseAgent): + """Agent that emits text_reasoning + tool-call internals plus a final assistant summary.""" + + def __init__(self, *, summary_text: str, **kwargs: Any) -> None: + super().__init__(**kwargs) + self._summary_text = summary_text + + def _messages(self) -> list[Message]: + return [ + Message( + role="assistant", + contents=[ + Content.from_text_reasoning( + text="I should call get_weather to answer this.", + additional_properties={"reasoning_id": "rs_abc123"}, + ), + Content.from_function_call( + call_id="call_weather_2", + name="get_weather", + arguments='{"location":"Boston"}', + ), + ], + ), + Message( + role="tool", + contents=[Content.from_function_result(call_id="call_weather_2", result="Rainy, 55F")], + ), + Message(role="assistant", contents=[Content.from_text(text=self._summary_text)]), + ] + + def run( + self, + messages: str | Content | Message | Sequence[str | Content | Message] | None = None, + *, + stream: bool = False, + session: AgentSession | None = None, + **kwargs: Any, + ) -> Awaitable[AgentResponse] | ResponseStream[AgentResponseUpdate, AgentResponse]: + async def _run() -> AgentResponse: + return AgentResponse(messages=self._messages()) + + return _run() + + class _CaptureFullConversation(Executor): """Captures AgentExecutorResponse.full_conversation and completes the workflow.""" @@ -153,6 +258,68 @@ async def test_sequential_adapter_uses_full_conversation() -> None: assert seen[1].role == "assistant" and "A1 reply" in (seen[1].text or "") +async def test_sequential_handoff_strips_tool_call_internals_from_prior_history() -> None: + # Arrange + first = _ToolHistoryAgent( + id="tool_history_agent", + name="ToolHistory", + summary_text="The weather in Seattle is sunny and 72F.", + ) + second = _CaptureAgent(id="capture_agent", name="Capture", reply_text="Captured") + wf = SequentialBuilder(participants=[first, second]).build() + + # Act + result = await wf.run("Check weather and continue") + + # Assert workflow completed + outputs = result.get_outputs() + assert outputs + + # Assert second agent sees replay-safe history (no function_call internals; function_result content is retained) + seen = second._last_messages # pyright: ignore[reportPrivateUsage] + assert len(seen) == 3 # user, tool(function_result), assistant(summary) + assert seen[0].role == "user" + assert "Check weather and continue" in (seen[0].text or "") + assert seen[1].role == "tool" + assert any(content.type == "function_result" for content in seen[1].contents) + assert seen[2].role == "assistant" + assert "Seattle is sunny" in (seen[2].text or "") + assert all(content.type not in {"function_call"} for msg in seen for content in msg.contents) + + +async def test_sequential_handoff_strips_text_reasoning_from_prior_history() -> None: + # Arrange: first agent emits text_reasoning + tool call internals + summary + first = _ReasoningHistoryAgent( + id="reasoning_agent", + name="ReasoningAgent", + summary_text="The weather in Boston is rainy and 55F.", + ) + second = _CaptureAgent(id="capture_agent", name="Capture", reply_text="Captured") + wf = SequentialBuilder(participants=[first, second]).build() + + # Act + result = await wf.run("Check weather with reasoning") + + # Assert workflow completed + outputs = result.get_outputs() + assert outputs + + # Assert second agent sees replay-safe history (no text_reasoning/function_call; function_result is retained) + seen = second._last_messages # pyright: ignore[reportPrivateUsage] + assert len(seen) == 3 # user, tool(function_result), assistant(summary) + assert seen[0].role == "user" + assert "Check weather with reasoning" in (seen[0].text or "") + assert seen[1].role == "tool" + assert any(content.type == "function_result" for content in seen[1].contents) + assert seen[2].role == "assistant" + assert "Boston is rainy" in (seen[2].text or "") + assert all( + content.type not in {"text_reasoning", "function_call"} + for msg in seen + for content in msg.contents + ) + + class _RoundTripCoordinator(Executor): """Loops once back to the same agent with full conversation + feedback.""" diff --git a/python/samples/02-agents/conversations/redis_history_provider.py b/python/samples/02-agents/conversations/redis_history_provider.py index 10b5265c41..2b68c26076 100644 --- a/python/samples/02-agents/conversations/redis_history_provider.py +++ b/python/samples/02-agents/conversations/redis_history_provider.py @@ -20,6 +20,10 @@ with Redis as the backend data store. """ +# Default Redis URL for local Redis Stack. +# Override via the REDIS_URL environment variable for remote or authenticated instances. +REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379") + # Default Redis URL for local Redis Stack. # Override via the REDIS_URL environment variable for remote or authenticated instances. diff --git a/python/samples/05-end-to-end/workflow_evaluation/run_evaluation.py b/python/samples/05-end-to-end/workflow_evaluation/run_evaluation.py index 6ad3641721..39550eb13d 100644 --- a/python/samples/05-end-to-end/workflow_evaluation/run_evaluation.py +++ b/python/samples/05-end-to-end/workflow_evaluation/run_evaluation.py @@ -199,7 +199,7 @@ async def main(): openai_client = create_openai_client() # Model configuration - workflow_agent_model = os.environ.get("AZURE_AI_MODEL_DEPLOYMENT_NAME_WORKFLOW", "gpt-4.1-nano") + workflow_agent_model = os.environ.get("AZURE_AI_MODEL_DEPLOYMENT_NAME_WORKFLOW", "gpt-5-mini") eval_model = os.environ.get("AZURE_AI_MODEL_DEPLOYMENT_NAME_EVAL", "gpt-5.2") # Focus on these agents, uncomment other ones you want to have evals run on From 3de00bc9a439ba7b0787255be83212bb9e22a6b6 Mon Sep 17 00:00:00 2001 From: eavanvalkenburg Date: Thu, 19 Feb 2026 14:03:33 +0100 Subject: [PATCH 02/11] fix: reasoning model workflow handoff and history serialization MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes multiple related issues when using reasoning models (gpt-5-mini, gpt-5.2) in multi-agent workflows that chain agents via from_response or replay full conversation history via AgentExecutorRequest. ## Reasoning items always emitted on output_item.added When a reasoning model produces encrypted or hidden reasoning (no visible text), the Responses API still fires a reasoning output item without any reasoning_text.delta events. Previously no text_reasoning Content was emitted in that case, making it invisible to downstream logic. Both the non-streaming (_parse_response_from_openai) and streaming (output_item.added) paths now always emit at least one text_reasoning Content — with empty text if no content is available — so co-occurrence detection and serialization guards work reliably. ## Reasoning items only serialized when paired with a function_call The Responses API only accepts reasoning items in input when they directly preceded a function_call in the original response. Sending a reasoning item that preceded a text response (no tool call) causes: "reasoning was provided without its required following item" _prepare_message_for_openai now checks has_function_call per message and skips text_reasoning serialization when there is no accompanying function_call. ## summary field is an array, not an object The reasoning item summary field sent to the Responses API must be an array of objects ([{"type": "summary_text", "text": ...}]), not a single object. Fixed _prepare_content_for_openai accordingly. ## service_session_id cleared when explicit history is provided When a workflow coordinator replays a full conversation (including function calls from a previous agent run) back to an executor via AgentExecutorRequest or from_response, the executor's session still held a service_session_id (previous_response_id) from the prior run. The API then received the same function-call items twice — once from previous_response_id (server-stored) and once from the explicit input — causing: "Duplicate item found with id fc_...". AgentExecutor.run (when should_respond=True) and from_response now reset self._session.service_session_id = None before running so that explicit input is the sole source of conversation context. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../_workflows/_agent_executor.py | 52 +---- .../openai/_responses_client.py | 61 ++++-- .../tests/workflow/test_full_conversation.py | 201 ++++++++++-------- 3 files changed, 174 insertions(+), 140 deletions(-) diff --git a/python/packages/core/agent_framework/_workflows/_agent_executor.py b/python/packages/core/agent_framework/_workflows/_agent_executor.py index 64e1e879f4..45b1dd8da2 100644 --- a/python/packages/core/agent_framework/_workflows/_agent_executor.py +++ b/python/packages/core/agent_framework/_workflows/_agent_executor.py @@ -2,9 +2,9 @@ import logging import sys -from collections.abc import Awaitable, Callable, Mapping, Sequence +from collections.abc import Awaitable, Callable, Mapping from dataclasses import dataclass -from typing import Any, ClassVar, cast +from typing import Any, cast from typing_extensions import Never @@ -130,6 +130,11 @@ async def run( """ self._cache.extend(request.messages) if request.should_respond: + # The caller is supplying explicit conversation history, so any stored + # previous_response_id from an earlier run would cause the API to see the + # same items twice (once via previous_response_id and once in input). + # Reset it so the agent starts a fresh context with the provided messages. + self._session.service_session_id = None await self._run_agent_and_emit(ctx) @handler @@ -144,43 +149,15 @@ async def from_response( immediately run the agent to produce a new response. """ # Replace cache with full conversation if available, else fall back to agent_response messages. - # Filter out tool-call internals that are not replay-safe across agent boundaries. source_messages = ( prior.full_conversation if prior.full_conversation is not None else prior.agent_response.messages ) - self._cache = self._prepare_handoff_messages(source_messages) + self._cache = list(source_messages) + # Reset service_session_id: the full conversation is being provided explicitly, + # so using previous_response_id would duplicate items already in the input. + self._session.service_session_id = None await self._run_agent_and_emit(ctx) - @classmethod - def _prepare_handoff_messages(cls, messages: Sequence[Message]) -> list[Message]: - """Prepare cross-executor handoff messages for replay safety. - - Cross-agent workflow handoff should preserve user intent and assistant outcomes, - but avoid replaying internal tool-call protocol messages that can be invalid - outside their original response loop. - """ - prepared: list[Message] = [] - for message in messages: - filtered_contents = [ - content for content in message.contents if content.type not in cls._HANDOFF_EXCLUDED_CONTENT_TYPES - ] - if not filtered_contents: - continue - if len(filtered_contents) == len(message.contents): - prepared.append(message) - continue - prepared.append( - Message( - role=message.role, - contents=filtered_contents, - author_name=message.author_name, - message_id=message.message_id, - additional_properties=dict(message.additional_properties), - raw_representation=message.raw_representation, - ) - ) - return prepared - @handler async def from_str( self, text: str, ctx: WorkflowContext[AgentExecutorResponse, AgentResponse | AgentResponseUpdate] @@ -497,10 +474,3 @@ def _prepare_agent_run_args(raw_run_kwargs: dict[str, Any]) -> tuple[dict[str, A options["additional_function_arguments"] = additional_args return run_kwargs, options or None - - _HANDOFF_EXCLUDED_CONTENT_TYPES: ClassVar[frozenset[str]] = frozenset({ - "function_call", - "function_approval_request", - "function_approval_response", - "text_reasoning", - }) diff --git a/python/packages/core/agent_framework/openai/_responses_client.py b/python/packages/core/agent_framework/openai/_responses_client.py index 694c918233..d194d580f2 100644 --- a/python/packages/core/agent_framework/openai/_responses_client.py +++ b/python/packages/core/agent_framework/openai/_responses_client.py @@ -908,11 +908,16 @@ def _prepare_message_for_openai( "type": "message", "role": message.role, } + # Reasoning items are only valid in input when they directly preceded a function_call + # in the same response. Including a reasoning item that preceded a text response + # (i.e. no function_call in the same message) causes an API error: + # "reasoning was provided without its required following item." + has_function_call = any(c.type == "function_call" for c in message.contents) for content in message.contents: match content.type: case "text_reasoning": - # Reasoning items must be sent back as top-level input items - # for reasoning models that require them alongside function_calls + if not has_function_call: + continue # reasoning not followed by a function_call is invalid in input reasoning = self._prepare_content_for_openai(message.role, content, call_id_to_id) # type: ignore[arg-type] if reasoning: all_messages.append(reasoning) @@ -961,13 +966,7 @@ def _prepare_content_for_openai( "text": content.text, } case "text_reasoning": - ret: dict[str, Any] = { - "type": "reasoning", - "summary": { - "type": "summary_text", - "text": content.text, - }, - } + ret: dict[str, Any] = {"type": "reasoning", "summary": []} props: dict[str, Any] | None = getattr(content, "additional_properties", None) if props: if reasoning_id := props.get("reasoning_id"): @@ -975,12 +974,11 @@ def _prepare_content_for_openai( if status := props.get("status"): ret["status"] = status if reasoning_text := props.get("reasoning_text"): - ret["content"] = { - "type": "reasoning_text", - "text": reasoning_text, - } + ret["content"] = [{"type": "reasoning_text", "text": reasoning_text}] if encrypted_content := props.get("encrypted_content"): ret["encrypted_content"] = encrypted_content + if content.text: + ret["summary"].append({"type": "summary_text", "text": content.text}) return ret case "data" | "uri": if content.has_top_level_media_type("image"): @@ -1190,6 +1188,7 @@ def _parse_response_from_openai( ) case "reasoning": # ResponseOutputReasoning reasoning_id = getattr(item, "id", None) + added_reasoning = False if hasattr(item, "content") and item.content: for index, reasoning_content in enumerate(item.content): additional_properties: dict[str, Any] = {} @@ -1204,6 +1203,7 @@ def _parse_response_from_openai( additional_properties=additional_properties or None, ) ) + added_reasoning = True if hasattr(item, "summary") and item.summary: for summary in item.summary: contents.append( @@ -1213,6 +1213,23 @@ def _parse_response_from_openai( additional_properties={"reasoning_id": reasoning_id} if reasoning_id else None, ) ) + added_reasoning = True + if not added_reasoning: + # Reasoning item with no visible text (e.g. encrypted reasoning). + # Always emit an empty marker so co-occurrence detection in + # _prepare_handoff_messages can identify reasoning-model responses. + additional_properties_empty: dict[str, Any] = {} + if reasoning_id: + additional_properties_empty["reasoning_id"] = reasoning_id + if encrypted := getattr(item, "encrypted_content", None): + additional_properties_empty["encrypted_content"] = encrypted + contents.append( + Content.from_text_reasoning( + text="", + raw_representation=item, + additional_properties=additional_properties_empty or None, + ) + ) case "code_interpreter_call": # ResponseOutputCodeInterpreterCall call_id = getattr(item, "call_id", None) or getattr(item, "id", None) outputs: list[Content] = [] @@ -1630,6 +1647,7 @@ def _parse_chunk_from_openai( ) case "reasoning": # ResponseOutputReasoning reasoning_id = getattr(event_item, "id", None) + added_reasoning = False if hasattr(event_item, "content") and event_item.content: for index, reasoning_content in enumerate(event_item.content): additional_properties: dict[str, Any] = {} @@ -1648,6 +1666,23 @@ def _parse_chunk_from_openai( additional_properties=additional_properties or None, ) ) + added_reasoning = True + if not added_reasoning: + # Reasoning item with no visible text (e.g. encrypted reasoning). + # Always emit an empty marker so co-occurrence detection in + # _prepare_handoff_messages can identify reasoning-model responses. + additional_properties_empty: dict[str, Any] = {} + if reasoning_id: + additional_properties_empty["reasoning_id"] = reasoning_id + if encrypted := getattr(event_item, "encrypted_content", None): + additional_properties_empty["encrypted_content"] = encrypted + contents.append( + Content.from_text_reasoning( + text="", + raw_representation=event_item, + additional_properties=additional_properties_empty or None, + ) + ) case _: logger.debug("Unparsed event of type: %s: %s", event.type, event) case "response.function_call_arguments.delta": diff --git a/python/packages/core/tests/workflow/test_full_conversation.py b/python/packages/core/tests/workflow/test_full_conversation.py index 511da6d5e9..03f1809e10 100644 --- a/python/packages/core/tests/workflow/test_full_conversation.py +++ b/python/packages/core/tests/workflow/test_full_conversation.py @@ -115,50 +115,6 @@ async def _run() -> AgentResponse: return _run() -class _ReasoningHistoryAgent(BaseAgent): - """Agent that emits text_reasoning + tool-call internals plus a final assistant summary.""" - - def __init__(self, *, summary_text: str, **kwargs: Any) -> None: - super().__init__(**kwargs) - self._summary_text = summary_text - - def _messages(self) -> list[Message]: - return [ - Message( - role="assistant", - contents=[ - Content.from_text_reasoning( - text="I should call get_weather to answer this.", - additional_properties={"reasoning_id": "rs_abc123"}, - ), - Content.from_function_call( - call_id="call_weather_2", - name="get_weather", - arguments='{"location":"Boston"}', - ), - ], - ), - Message( - role="tool", - contents=[Content.from_function_result(call_id="call_weather_2", result="Rainy, 55F")], - ), - Message(role="assistant", contents=[Content.from_text(text=self._summary_text)]), - ] - - def run( - self, - messages: str | Content | Message | Sequence[str | Content | Message] | None = None, - *, - stream: bool = False, - session: AgentSession | None = None, - **kwargs: Any, - ) -> Awaitable[AgentResponse] | ResponseStream[AgentResponseUpdate, AgentResponse]: - async def _run() -> AgentResponse: - return AgentResponse(messages=self._messages()) - - return _run() - - class _CaptureFullConversation(Executor): """Captures AgentExecutorResponse.full_conversation and completes the workflow.""" @@ -258,8 +214,8 @@ async def test_sequential_adapter_uses_full_conversation() -> None: assert seen[1].role == "assistant" and "A1 reply" in (seen[1].text or "") -async def test_sequential_handoff_strips_tool_call_internals_from_prior_history() -> None: - # Arrange +async def test_sequential_handoff_preserves_function_call_for_non_reasoning_model() -> None: + # Arrange: non-reasoning agent emits function_call + function_result + summary first = _ToolHistoryAgent( id="tool_history_agent", name="ToolHistory", @@ -275,49 +231,20 @@ async def test_sequential_handoff_strips_tool_call_internals_from_prior_history( outputs = result.get_outputs() assert outputs - # Assert second agent sees replay-safe history (no function_call internals; function_result content is retained) + # For non-reasoning models (no text_reasoning), function_call and function_result are + # both kept so the receiving agent has the full call/result pair as context. seen = second._last_messages # pyright: ignore[reportPrivateUsage] - assert len(seen) == 3 # user, tool(function_result), assistant(summary) + assert len(seen) == 4 # user, assistant(function_call), tool(function_result), assistant(summary) assert seen[0].role == "user" assert "Check weather and continue" in (seen[0].text or "") - assert seen[1].role == "tool" - assert any(content.type == "function_result" for content in seen[1].contents) - assert seen[2].role == "assistant" - assert "Seattle is sunny" in (seen[2].text or "") - assert all(content.type not in {"function_call"} for msg in seen for content in msg.contents) - - -async def test_sequential_handoff_strips_text_reasoning_from_prior_history() -> None: - # Arrange: first agent emits text_reasoning + tool call internals + summary - first = _ReasoningHistoryAgent( - id="reasoning_agent", - name="ReasoningAgent", - summary_text="The weather in Boston is rainy and 55F.", - ) - second = _CaptureAgent(id="capture_agent", name="Capture", reply_text="Captured") - wf = SequentialBuilder(participants=[first, second]).build() - - # Act - result = await wf.run("Check weather with reasoning") - - # Assert workflow completed - outputs = result.get_outputs() - assert outputs - - # Assert second agent sees replay-safe history (no text_reasoning/function_call; function_result is retained) - seen = second._last_messages # pyright: ignore[reportPrivateUsage] - assert len(seen) == 3 # user, tool(function_result), assistant(summary) - assert seen[0].role == "user" - assert "Check weather with reasoning" in (seen[0].text or "") - assert seen[1].role == "tool" - assert any(content.type == "function_result" for content in seen[1].contents) - assert seen[2].role == "assistant" - assert "Boston is rainy" in (seen[2].text or "") - assert all( - content.type not in {"text_reasoning", "function_call"} - for msg in seen - for content in msg.contents - ) + assert seen[1].role == "assistant" + assert any(content.type == "function_call" for content in seen[1].contents) + assert seen[2].role == "tool" + assert any(content.type == "function_result" for content in seen[2].contents) + assert seen[3].role == "assistant" + assert "Seattle is sunny" in (seen[3].text or "") + # No text_reasoning should appear (non-reasoning model) + assert all(content.type != "text_reasoning" for msg in seen for content in msg.contents) class _RoundTripCoordinator(Executor): @@ -379,3 +306,105 @@ async def test_agent_executor_full_conversation_round_trip_does_not_duplicate_hi assert payload["texts"][1] == "draft reply" assert payload["texts"][2] == "apply feedback" assert payload["texts"][3] == "draft reply" + + +class _SessionIdCapturingAgent(BaseAgent): + """Records service_session_id of the session at run() time.""" + + _captured_service_session_id: str | None = PrivateAttr(default="NOT_CAPTURED") + + def run( + self, + messages: str | Content | Message | Sequence[str | Content | Message] | None = None, + *, + stream: bool = False, + session: AgentSession | None = None, + **kwargs: Any, + ) -> Awaitable[AgentResponse] | ResponseStream[AgentResponseUpdate, AgentResponse]: + self._captured_service_session_id = session.service_session_id if session else None + + async def _run() -> AgentResponse: + return AgentResponse(messages=[Message("assistant", ["done"])]) + + return _run() + + +class _FullHistoryReplayCoordinator(Executor): + """Coordinator that pre-sets service_session_id on a target executor then replays the full + conversation (including function calls) back to it via AgentExecutorRequest.""" + + def __init__(self, *, target_exec: AgentExecutor, **kwargs: Any) -> None: + super().__init__(**kwargs) + self._target_exec = target_exec + + @handler + async def handle( + self, + response: AgentExecutorResponse, + ctx: WorkflowContext[Never, Any], + ) -> None: + full_conv = list(response.full_conversation or response.agent_response.messages) + full_conv.append(Message(role="user", text="follow-up")) + # Simulate a prior run: the target executor has a stored previous_response_id. + self._target_exec._session.service_session_id = "resp_PREVIOUS_RUN" # pyright: ignore[reportPrivateUsage] + await ctx.send_message( + AgentExecutorRequest(messages=full_conv, should_respond=True), + target_id=self._target_exec.id, + ) + + +async def test_run_request_with_full_history_clears_service_session_id() -> None: + """Replaying a full conversation (including function calls) via AgentExecutorRequest must + clear service_session_id so the API does not receive both previous_response_id and the + same function-call items in input — which would cause a 'Duplicate item' API error.""" + tool_agent = _ToolHistoryAgent( + id="tool_agent", name="ToolAgent", summary_text="Done." + ) + tool_exec = AgentExecutor(tool_agent, id="tool_agent") + + spy_agent = _SessionIdCapturingAgent(id="spy_agent", name="SpyAgent") + spy_exec = AgentExecutor(spy_agent, id="spy_agent") + + coordinator = _FullHistoryReplayCoordinator(id="coord", target_exec=spy_exec) + + wf = ( + WorkflowBuilder(start_executor=tool_exec, output_executors=[coordinator]) + .add_edge(tool_exec, coordinator) + .add_edge(coordinator, spy_exec) + .build() + ) + + result = await wf.run("initial prompt") + assert result.get_outputs() is not None + + # The spy agent must have seen service_session_id=None (cleared before run). + # Without the fix, it would see "resp_PREVIOUS_RUN" and the API would raise + # "Duplicate item found" because the same function-call IDs appear in both + # previous_response_id (server-stored) and the explicit input messages. + assert spy_agent._captured_service_session_id is None # pyright: ignore[reportPrivateUsage] + + +async def test_from_response_clears_service_session_id() -> None: + """from_response hands off a prior agent's full conversation to the next executor. + The receiving executor's service_session_id must be cleared so the API does not + see the same items twice (via previous_response_id and in the explicit input).""" + tool_agent = _ToolHistoryAgent( + id="tool_agent2", name="ToolAgent", summary_text="Done." + ) + tool_exec = AgentExecutor(tool_agent, id="tool_agent2") + + spy_agent = _SessionIdCapturingAgent(id="spy_agent2", name="SpyAgent") + spy_exec = AgentExecutor(spy_agent, id="spy_agent2") + # Simulate a prior run on the spy executor. + spy_exec._session.service_session_id = "resp_PREVIOUS_RUN" # pyright: ignore[reportPrivateUsage] + + wf = ( + WorkflowBuilder(start_executor=tool_exec, output_executors=[spy_exec]) + .add_edge(tool_exec, spy_exec) + .build() + ) + + result = await wf.run("start") + assert result.get_outputs() is not None + + assert spy_agent._captured_service_session_id is None # pyright: ignore[reportPrivateUsage] From b014fceae223b48c94bf825a2412f23c65435718 Mon Sep 17 00:00:00 2001 From: eavanvalkenburg Date: Thu, 19 Feb 2026 14:23:43 +0100 Subject: [PATCH 03/11] small improvements in text reasoning --- .../packages/core/agent_framework/_types.py | 2 ++ .../openai/_responses_client.py | 35 ++++++++----------- .../core/test_function_invocation_logic.py | 8 ++--- .../openai/test_openai_responses_client.py | 17 +++++---- 4 files changed, 32 insertions(+), 30 deletions(-) diff --git a/python/packages/core/agent_framework/_types.py b/python/packages/core/agent_framework/_types.py index 8446e3ec53..a699a30f5f 100644 --- a/python/packages/core/agent_framework/_types.py +++ b/python/packages/core/agent_framework/_types.py @@ -531,6 +531,7 @@ def from_text( def from_text_reasoning( cls: type[ContentT], *, + id: str | None = None, text: str | None = None, protected_data: str | None = None, annotations: Sequence[Annotation] | None = None, @@ -540,6 +541,7 @@ def from_text_reasoning( """Create text reasoning content.""" return cls( "text_reasoning", + id=id, text=text, protected_data=protected_data, annotations=annotations, diff --git a/python/packages/core/agent_framework/openai/_responses_client.py b/python/packages/core/agent_framework/openai/_responses_client.py index d194d580f2..45f19376a0 100644 --- a/python/packages/core/agent_framework/openai/_responses_client.py +++ b/python/packages/core/agent_framework/openai/_responses_client.py @@ -967,10 +967,10 @@ def _prepare_content_for_openai( } case "text_reasoning": ret: dict[str, Any] = {"type": "reasoning", "summary": []} + if content.id: + ret["id"] = content.id props: dict[str, Any] | None = getattr(content, "additional_properties", None) if props: - if reasoning_id := props.get("reasoning_id"): - ret["id"] = reasoning_id if status := props.get("status"): ret["status"] = status if reasoning_text := props.get("reasoning_text"): @@ -1187,30 +1187,28 @@ def _parse_response_from_openai( ) ) case "reasoning": # ResponseOutputReasoning - reasoning_id = getattr(item, "id", None) added_reasoning = False - if hasattr(item, "content") and item.content: - for index, reasoning_content in enumerate(item.content): + if item_content := getattr(item, "content", None): + for index, reasoning_content in enumerate(item_content): additional_properties: dict[str, Any] = {} - if reasoning_id: - additional_properties["reasoning_id"] = reasoning_id if hasattr(item, "summary") and item.summary and index < len(item.summary): additional_properties["summary"] = item.summary[index] contents.append( Content.from_text_reasoning( + id=item.id, text=reasoning_content.text, raw_representation=reasoning_content, additional_properties=additional_properties or None, ) ) added_reasoning = True - if hasattr(item, "summary") and item.summary: - for summary in item.summary: + if item_summary := getattr(item, "summary", None): + for summary in item_summary: contents.append( Content.from_text_reasoning( + id=item.id, text=summary.text, raw_representation=summary, # type: ignore[arg-type] - additional_properties={"reasoning_id": reasoning_id} if reasoning_id else None, ) ) added_reasoning = True @@ -1219,12 +1217,11 @@ def _parse_response_from_openai( # Always emit an empty marker so co-occurrence detection in # _prepare_handoff_messages can identify reasoning-model responses. additional_properties_empty: dict[str, Any] = {} - if reasoning_id: - additional_properties_empty["reasoning_id"] = reasoning_id if encrypted := getattr(item, "encrypted_content", None): additional_properties_empty["encrypted_content"] = encrypted contents.append( Content.from_text_reasoning( + id=item.id, text="", raw_representation=item, additional_properties=additional_properties_empty or None, @@ -1444,36 +1441,36 @@ def _parse_chunk_from_openai( case "response.reasoning_text.delta": contents.append( Content.from_text_reasoning( + id=event.item_id, text=event.delta, raw_representation=event, - additional_properties={"reasoning_id": event.item_id}, ) ) metadata.update(self._get_metadata_from_response(event)) case "response.reasoning_text.done": contents.append( Content.from_text_reasoning( + id=event.item_id, text=event.text, raw_representation=event, - additional_properties={"reasoning_id": event.item_id}, ) ) metadata.update(self._get_metadata_from_response(event)) case "response.reasoning_summary_text.delta": contents.append( Content.from_text_reasoning( + id=event.item_id, text=event.delta, raw_representation=event, - additional_properties={"reasoning_id": event.item_id}, ) ) metadata.update(self._get_metadata_from_response(event)) case "response.reasoning_summary_text.done": contents.append( Content.from_text_reasoning( + id=event.item_id, text=event.text, raw_representation=event, - additional_properties={"reasoning_id": event.item_id}, ) ) metadata.update(self._get_metadata_from_response(event)) @@ -1651,8 +1648,6 @@ def _parse_chunk_from_openai( if hasattr(event_item, "content") and event_item.content: for index, reasoning_content in enumerate(event_item.content): additional_properties: dict[str, Any] = {} - if reasoning_id: - additional_properties["reasoning_id"] = reasoning_id if ( hasattr(event_item, "summary") and event_item.summary @@ -1661,6 +1656,7 @@ def _parse_chunk_from_openai( additional_properties["summary"] = event_item.summary[index] contents.append( Content.from_text_reasoning( + id=reasoning_id or None, text=reasoning_content.text, raw_representation=reasoning_content, additional_properties=additional_properties or None, @@ -1672,12 +1668,11 @@ def _parse_chunk_from_openai( # Always emit an empty marker so co-occurrence detection in # _prepare_handoff_messages can identify reasoning-model responses. additional_properties_empty: dict[str, Any] = {} - if reasoning_id: - additional_properties_empty["reasoning_id"] = reasoning_id if encrypted := getattr(event_item, "encrypted_content", None): additional_properties_empty["encrypted_content"] = encrypted contents.append( Content.from_text_reasoning( + id=reasoning_id or None, text="", raw_representation=event_item, additional_properties=additional_properties_empty or None, diff --git a/python/packages/core/tests/core/test_function_invocation_logic.py b/python/packages/core/tests/core/test_function_invocation_logic.py index 296218adcf..1dfd257942 100644 --- a/python/packages/core/tests/core/test_function_invocation_logic.py +++ b/python/packages/core/tests/core/test_function_invocation_logic.py @@ -2992,8 +2992,9 @@ def search_func(query: str) -> str: ChatResponseUpdate( contents=[ Content.from_text_reasoning( + id="rs_test123", text="Let me search for that", - additional_properties={"reasoning_id": "rs_test123", "status": "completed"}, + additional_properties={"status": "completed"}, ) ], role="assistant", @@ -3035,8 +3036,7 @@ def search_func(query: str) -> str: assert "function_result" in all_content_types, "Function result must be in response messages for chaining" assert "text" in all_content_types, "Final text must be in response messages" - # Verify reasoning has the reasoning_id preserved + # Verify reasoning has the id preserved reasoning_contents = [c for msg in response.messages for c in msg.contents if c.type == "text_reasoning"] assert len(reasoning_contents) >= 1 - assert reasoning_contents[0].additional_properties is not None - assert reasoning_contents[0].additional_properties.get("reasoning_id") == "rs_test123" + assert reasoning_contents[0].id == "rs_test123" diff --git a/python/packages/core/tests/openai/test_openai_responses_client.py b/python/packages/core/tests/openai/test_openai_responses_client.py index 691e5cc6b6..6c98f3bdfa 100644 --- a/python/packages/core/tests/openai/test_openai_responses_client.py +++ b/python/packages/core/tests/openai/test_openai_responses_client.py @@ -821,8 +821,9 @@ def test_prepare_message_for_openai_includes_reasoning_with_function_call() -> N client = OpenAIResponsesClient(model_id="test-model", api_key="test-key") reasoning = Content.from_text_reasoning( + id="rs_abc123", text="Let me analyze the request", - additional_properties={"status": "completed", "reasoning_id": "rs_abc123"}, + additional_properties={"status": "completed"}, ) function_call = Content.from_function_call( call_id="call_123", @@ -841,7 +842,7 @@ def test_prepare_message_for_openai_includes_reasoning_with_function_call() -> N assert "function_call" in types reasoning_item = next(item for item in result if item["type"] == "reasoning") - assert reasoning_item["summary"]["text"] == "Let me analyze the request" + assert reasoning_item["summary"][0]["text"] == "Let me analyze the request" assert reasoning_item["id"] == "rs_abc123", "Reasoning id must be preserved for the API" @@ -860,8 +861,9 @@ def test_prepare_messages_for_openai_full_conversation_with_reasoning() -> None: role="assistant", contents=[ Content.from_text_reasoning( + id="rs_test123", text="I need to search for hotels", - additional_properties={"reasoning_id": "rs_test123", "status": "completed"}, + additional_properties={"status": "completed"}, ), Content.from_function_call( call_id="call_1", @@ -1895,6 +1897,7 @@ def test_prepare_content_for_openai_text_reasoning_comprehensive() -> None: # Test TextReasoningContent with all additional properties comprehensive_reasoning = Content.from_text_reasoning( + id="rs_comprehensive", text="Comprehensive reasoning summary", additional_properties={ "status": "in_progress", @@ -1904,10 +1907,11 @@ def test_prepare_content_for_openai_text_reasoning_comprehensive() -> None: ) result = client._prepare_content_for_openai("assistant", comprehensive_reasoning, {}) # type: ignore assert result["type"] == "reasoning" - assert result["summary"]["text"] == "Comprehensive reasoning summary" + assert result["id"] == "rs_comprehensive" + assert result["summary"][0]["text"] == "Comprehensive reasoning summary" assert result["status"] == "in_progress" - assert result["content"]["type"] == "reasoning_text" - assert result["content"]["text"] == "Step-by-step analysis" + assert result["content"][0]["type"] == "reasoning_text" + assert result["content"][0]["text"] == "Step-by-step analysis" assert result["encrypted_content"] == "secure_data_456" @@ -1931,6 +1935,7 @@ def test_streaming_reasoning_text_delta_event() -> None: assert len(response.contents) == 1 assert response.contents[0].type == "text_reasoning" + assert response.contents[0].id == "reasoning_123" assert response.contents[0].text == "reasoning delta" assert response.contents[0].raw_representation == event mock_metadata.assert_called_once_with(event) From 9ce1f45c32e22e897c4d550477e10281ba785c58 Mon Sep 17 00:00:00 2001 From: eavanvalkenburg Date: Thu, 19 Feb 2026 14:44:18 +0100 Subject: [PATCH 04/11] refactor: add reset_service_session to AgentExecutorRequest for explicit history replay MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the implicit 'always clear service_session_id when should_respond=True' with an explicit opt-in field on AgentExecutorRequest. The old approach used should_respond=True as a proxy for 'full history replay', but that conflates two distinct intents: - Orchestrations group chat sends should_respond=True with an empty/single-message list (not a full replay) — unnecessarily clearing service_session_id. - HITL / feedback coordinators send the full prior conversation and truly need a fresh service session ID to avoid duplicate-item API errors. Changes: - Add AgentExecutorRequest.reset_service_session: bool = False - AgentExecutor.run only clears service_session_id when this flag is True - AgentExecutor.from_response unchanged (always clears; always full conversation) - Set reset_service_session=True in all full-history-replay call sites: agents_with_HITL.py, azure_chat_agents_tool_calls_with_feedback.py, autogen-migration round-robin coordinator, tau2 runner - Update _FullHistoryReplayCoordinator test helper to pass the flag Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../_workflows/_agent_executor.py | 20 +++++++++++++------ .../openai/_responses_client.py | 3 +-- .../tests/workflow/test_full_conversation.py | 2 +- .../tau2/agent_framework_lab_tau2/runner.py | 2 +- ...re_chat_agents_tool_calls_with_feedback.py | 4 +++- .../human-in-the-loop/agents_with_HITL.py | 4 +++- .../01_round_robin_group_chat.py | 4 +++- 7 files changed, 26 insertions(+), 13 deletions(-) diff --git a/python/packages/core/agent_framework/_workflows/_agent_executor.py b/python/packages/core/agent_framework/_workflows/_agent_executor.py index 45b1dd8da2..88ec842c8f 100644 --- a/python/packages/core/agent_framework/_workflows/_agent_executor.py +++ b/python/packages/core/agent_framework/_workflows/_agent_executor.py @@ -37,10 +37,17 @@ class AgentExecutorRequest: messages: A list of chat messages to be processed by the agent. should_respond: A flag indicating whether the agent should respond to the messages. If False, the messages will be saved to the executor's cache but not sent to the agent. + reset_service_session: When True, the executor will clear its stored ``service_session_id`` + before running the agent. Set this to ``True`` whenever the ``messages`` list represents + the **full conversation history** (a replay from scratch), so the service does not also + replay items it already stored under the previous response ID — which would cause a + "Duplicate item" API error. Leave as ``False`` (the default) when appending only new + messages to an ongoing conversation. """ messages: list[Message] should_respond: bool = True + reset_service_session: bool = False @dataclass @@ -130,11 +137,12 @@ async def run( """ self._cache.extend(request.messages) if request.should_respond: - # The caller is supplying explicit conversation history, so any stored - # previous_response_id from an earlier run would cause the API to see the - # same items twice (once via previous_response_id and once in input). - # Reset it so the agent starts a fresh context with the provided messages. - self._session.service_session_id = None + if request.reset_service_session: + # The caller is replaying full conversation history, so any stored + # previous_response_id from an earlier run would cause the API to see the + # same items twice (once via previous_response_id and once in input). + # Reset it so the agent starts a fresh context with the provided messages. + self._session.service_session_id = None await self._run_agent_and_emit(ctx) @handler @@ -319,7 +327,7 @@ async def _run_agent_and_emit( # Snapshot current conversation as cache + latest agent outputs. # Do not append to prior snapshots: callers may provide full-history messages # in request.messages, and extending would duplicate prior turns. - self._full_conversation = list(self._cache) + (list(response.messages) if response else []) + self._full_conversation = [*self._cache, *(list(response.messages) if response else [])] if response is None: # Agent did not complete (e.g., waiting for user input); do not emit response diff --git a/python/packages/core/agent_framework/openai/_responses_client.py b/python/packages/core/agent_framework/openai/_responses_client.py index 45f19376a0..64a5f5306a 100644 --- a/python/packages/core/agent_framework/openai/_responses_client.py +++ b/python/packages/core/agent_framework/openai/_responses_client.py @@ -1214,8 +1214,7 @@ def _parse_response_from_openai( added_reasoning = True if not added_reasoning: # Reasoning item with no visible text (e.g. encrypted reasoning). - # Always emit an empty marker so co-occurrence detection in - # _prepare_handoff_messages can identify reasoning-model responses. + # Always emit an empty marker so co-occurrence detection can be done additional_properties_empty: dict[str, Any] = {} if encrypted := getattr(item, "encrypted_content", None): additional_properties_empty["encrypted_content"] = encrypted diff --git a/python/packages/core/tests/workflow/test_full_conversation.py b/python/packages/core/tests/workflow/test_full_conversation.py index 03f1809e10..0722aaf1e7 100644 --- a/python/packages/core/tests/workflow/test_full_conversation.py +++ b/python/packages/core/tests/workflow/test_full_conversation.py @@ -348,7 +348,7 @@ async def handle( # Simulate a prior run: the target executor has a stored previous_response_id. self._target_exec._session.service_session_id = "resp_PREVIOUS_RUN" # pyright: ignore[reportPrivateUsage] await ctx.send_message( - AgentExecutorRequest(messages=full_conv, should_respond=True), + AgentExecutorRequest(messages=full_conv, should_respond=True, reset_service_session=True), target_id=self._target_exec.id, ) diff --git a/python/packages/lab/tau2/agent_framework_lab_tau2/runner.py b/python/packages/lab/tau2/agent_framework_lab_tau2/runner.py index 78a9496444..4849f0cb67 100644 --- a/python/packages/lab/tau2/agent_framework_lab_tau2/runner.py +++ b/python/packages/lab/tau2/agent_framework_lab_tau2/runner.py @@ -267,7 +267,7 @@ async def conversation_orchestrator( # Send flipped messages to the opposite agent # Critical: Target ID must be specified to prevent broadcasting to both agents await ctx.send_message( - AgentExecutorRequest(messages=flipped, should_respond=True), + AgentExecutorRequest(messages=flipped, should_respond=True, reset_service_session=True), target_id=USER_SIMULATOR_ID if is_from_agent else ASSISTANT_AGENT_ID, ) diff --git a/python/samples/03-workflows/agents/azure_chat_agents_tool_calls_with_feedback.py b/python/samples/03-workflows/agents/azure_chat_agents_tool_calls_with_feedback.py index fdcc626099..46a0f3c273 100644 --- a/python/samples/03-workflows/agents/azure_chat_agents_tool_calls_with_feedback.py +++ b/python/samples/03-workflows/agents/azure_chat_agents_tool_calls_with_feedback.py @@ -155,6 +155,7 @@ async def on_human_feedback( AgentExecutorRequest( messages=original_request.conversation + [Message("user", text="The draft is approved as-is.")], should_respond=True, + reset_service_session=True, ), target_id=self.final_editor_id, ) @@ -170,7 +171,8 @@ async def on_human_feedback( ) conversation.append(Message("user", text=instruction)) await ctx.send_message( - AgentExecutorRequest(messages=conversation, should_respond=True), target_id=self.writer_id + AgentExecutorRequest(messages=conversation, should_respond=True, reset_service_session=True), + target_id=self.writer_id, ) diff --git a/python/samples/03-workflows/human-in-the-loop/agents_with_HITL.py b/python/samples/03-workflows/human-in-the-loop/agents_with_HITL.py index ed5fd77ce2..6247320d25 100644 --- a/python/samples/03-workflows/human-in-the-loop/agents_with_HITL.py +++ b/python/samples/03-workflows/human-in-the-loop/agents_with_HITL.py @@ -108,6 +108,7 @@ async def on_human_feedback( AgentExecutorRequest( messages=original_request.conversation + [Message("user", text="The draft is approved as-is.")], should_respond=True, + reset_service_session=True, ), target_id=self.final_editor_name, ) @@ -123,7 +124,8 @@ async def on_human_feedback( ) conversation.append(Message("user", text=instruction)) await ctx.send_message( - AgentExecutorRequest(messages=conversation, should_respond=True), target_id=self.writer_name + AgentExecutorRequest(messages=conversation, should_respond=True, reset_service_session=True), + target_id=self.writer_name, ) diff --git a/python/samples/autogen-migration/orchestrations/01_round_robin_group_chat.py b/python/samples/autogen-migration/orchestrations/01_round_robin_group_chat.py index 8c74a3b153..7e26789c6d 100644 --- a/python/samples/autogen-migration/orchestrations/01_round_robin_group_chat.py +++ b/python/samples/autogen-migration/orchestrations/01_round_robin_group_chat.py @@ -144,7 +144,9 @@ async def check_approval( if last_message and "APPROVED" in last_message.text: await context.yield_output("Content approved.") else: - await context.send_message(AgentExecutorRequest(messages=response.full_conversation, should_respond=True)) + await context.send_message( + AgentExecutorRequest(messages=response.full_conversation, should_respond=True, reset_service_session=True) + ) workflow = ( WorkflowBuilder(start_executor=researcher) From 88294f0503e37af00a7c8abefc07f1226946841c Mon Sep 17 00:00:00 2001 From: eavanvalkenburg Date: Thu, 19 Feb 2026 14:52:34 +0100 Subject: [PATCH 05/11] comment update --- .../packages/core/agent_framework/openai/_responses_client.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/packages/core/agent_framework/openai/_responses_client.py b/python/packages/core/agent_framework/openai/_responses_client.py index 64a5f5306a..fa140ee0b7 100644 --- a/python/packages/core/agent_framework/openai/_responses_client.py +++ b/python/packages/core/agent_framework/openai/_responses_client.py @@ -1664,8 +1664,7 @@ def _parse_chunk_from_openai( added_reasoning = True if not added_reasoning: # Reasoning item with no visible text (e.g. encrypted reasoning). - # Always emit an empty marker so co-occurrence detection in - # _prepare_handoff_messages can identify reasoning-model responses. + # Always emit an empty marker so co-occurrence detection can occur. additional_properties_empty: dict[str, Any] = {} if encrypted := getattr(event_item, "encrypted_content", None): additional_properties_empty["encrypted_content"] = encrypted From ab94e00fe34b0be792f67ec933648c997c50a90f Mon Sep 17 00:00:00 2001 From: eavanvalkenburg Date: Thu, 19 Feb 2026 18:47:41 +0100 Subject: [PATCH 06/11] fixes from feedback --- .../core/agent_framework/_workflows/_agent_executor.py | 3 --- .../02-agents/conversations/redis_history_provider.py | 5 ----- 2 files changed, 8 deletions(-) diff --git a/python/packages/core/agent_framework/_workflows/_agent_executor.py b/python/packages/core/agent_framework/_workflows/_agent_executor.py index 88ec842c8f..bf0076b45f 100644 --- a/python/packages/core/agent_framework/_workflows/_agent_executor.py +++ b/python/packages/core/agent_framework/_workflows/_agent_executor.py @@ -161,9 +161,6 @@ async def from_response( prior.full_conversation if prior.full_conversation is not None else prior.agent_response.messages ) self._cache = list(source_messages) - # Reset service_session_id: the full conversation is being provided explicitly, - # so using previous_response_id would duplicate items already in the input. - self._session.service_session_id = None await self._run_agent_and_emit(ctx) @handler diff --git a/python/samples/02-agents/conversations/redis_history_provider.py b/python/samples/02-agents/conversations/redis_history_provider.py index 2b68c26076..17e1094775 100644 --- a/python/samples/02-agents/conversations/redis_history_provider.py +++ b/python/samples/02-agents/conversations/redis_history_provider.py @@ -25,11 +25,6 @@ REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379") -# Default Redis URL for local Redis Stack. -# Override via the REDIS_URL environment variable for remote or authenticated instances. -REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379") - - async def example_manual_memory_store() -> None: """Basic example of using Redis history provider.""" print("=== Basic Redis History Provider Example ===") From 2aaf01f5adbc879394c5163ac00e0ec54b02c2a5 Mon Sep 17 00:00:00 2001 From: eavanvalkenburg Date: Thu, 19 Feb 2026 18:54:25 +0100 Subject: [PATCH 07/11] fix test --- .../core/tests/workflow/test_full_conversation.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/packages/core/tests/workflow/test_full_conversation.py b/python/packages/core/tests/workflow/test_full_conversation.py index 0722aaf1e7..2497ab4eec 100644 --- a/python/packages/core/tests/workflow/test_full_conversation.py +++ b/python/packages/core/tests/workflow/test_full_conversation.py @@ -384,10 +384,10 @@ async def test_run_request_with_full_history_clears_service_session_id() -> None assert spy_agent._captured_service_session_id is None # pyright: ignore[reportPrivateUsage] -async def test_from_response_clears_service_session_id() -> None: +async def test_from_response_preserves_service_session_id() -> None: """from_response hands off a prior agent's full conversation to the next executor. - The receiving executor's service_session_id must be cleared so the API does not - see the same items twice (via previous_response_id and in the explicit input).""" + The receiving executor's service_session_id is preserved so the API can continue + the conversation using previous_response_id.""" tool_agent = _ToolHistoryAgent( id="tool_agent2", name="ToolAgent", summary_text="Done." ) @@ -407,4 +407,4 @@ async def test_from_response_clears_service_session_id() -> None: result = await wf.run("start") assert result.get_outputs() is not None - assert spy_agent._captured_service_session_id is None # pyright: ignore[reportPrivateUsage] + assert spy_agent._captured_service_session_id == "resp_PREVIOUS_RUN" # pyright: ignore[reportPrivateUsage] From d9193a227263c66fa0e4e711347449a61195b297 Mon Sep 17 00:00:00 2001 From: eavanvalkenburg Date: Thu, 19 Feb 2026 20:23:02 +0100 Subject: [PATCH 08/11] reverted changes to agent executor --- .../_workflows/_agent_executor.py | 13 ---- .../azure/test_azure_responses_client.py | 66 +++++++++---------- .../tests/workflow/test_full_conversation.py | 7 +- ...re_chat_agents_tool_calls_with_feedback.py | 3 +- .../human-in-the-loop/agents_with_HITL.py | 3 +- .../01_round_robin_group_chat.py | 2 +- 6 files changed, 41 insertions(+), 53 deletions(-) diff --git a/python/packages/core/agent_framework/_workflows/_agent_executor.py b/python/packages/core/agent_framework/_workflows/_agent_executor.py index bf0076b45f..257833bb6a 100644 --- a/python/packages/core/agent_framework/_workflows/_agent_executor.py +++ b/python/packages/core/agent_framework/_workflows/_agent_executor.py @@ -37,17 +37,10 @@ class AgentExecutorRequest: messages: A list of chat messages to be processed by the agent. should_respond: A flag indicating whether the agent should respond to the messages. If False, the messages will be saved to the executor's cache but not sent to the agent. - reset_service_session: When True, the executor will clear its stored ``service_session_id`` - before running the agent. Set this to ``True`` whenever the ``messages`` list represents - the **full conversation history** (a replay from scratch), so the service does not also - replay items it already stored under the previous response ID — which would cause a - "Duplicate item" API error. Leave as ``False`` (the default) when appending only new - messages to an ongoing conversation. """ messages: list[Message] should_respond: bool = True - reset_service_session: bool = False @dataclass @@ -137,12 +130,6 @@ async def run( """ self._cache.extend(request.messages) if request.should_respond: - if request.reset_service_session: - # The caller is replaying full conversation history, so any stored - # previous_response_id from an earlier run would cause the API to see the - # same items twice (once via previous_response_id and once in input). - # Reset it so the agent starts a fresh context with the provided messages. - self._session.service_session_id = None await self._run_agent_and_emit(ctx) @handler diff --git a/python/packages/core/tests/azure/test_azure_responses_client.py b/python/packages/core/tests/azure/test_azure_responses_client.py index 43211db7cf..498e9c3834 100644 --- a/python/packages/core/tests/azure/test_azure_responses_client.py +++ b/python/packages/core/tests/azure/test_azure_responses_client.py @@ -13,13 +13,17 @@ from agent_framework import ( Agent, + AgentExecutorRequest, + AgentExecutorResponse, AgentResponse, ChatResponse, Content, Message, SupportsChatGetResponse, WorkflowBuilder, + WorkflowContext, WorkflowEvent, + executor, tool, ) from agent_framework.azure import AzureOpenAIResponsesClient @@ -265,6 +269,12 @@ def test_serialize(azure_openai_unit_test_env: dict[str, str]) -> None: async def _run_minimal_handoff_workflow(client: AzureOpenAIResponsesClient) -> list[WorkflowEvent]: + """Run a minimal workflow that hands off from a tool-calling agent to a second agent. + + Uses AgentExecutorRequest to replay full conversation history to the second agent, + which triggers the duplicate-item error on reasoning models when service_session_id + is not cleared. + """ first_agent = client.as_agent( id="minimal-handoff-first", name="minimal-handoff-first", @@ -272,13 +282,33 @@ async def _run_minimal_handoff_workflow(client: AzureOpenAIResponsesClient) -> l tools=[get_weather], default_options={"tool_choice": {"mode": "required", "required_function_name": "get_weather"}}, ) - second_agent = client.as_agent( + + second_client = AzureOpenAIResponsesClient(credential=AzureCliCredential(), deployment_name=client.deployment_name) + second_agent = second_client.as_agent( id="minimal-handoff-second", name="minimal-handoff-second", instructions="Summarize the prior weather result in one sentence.", ) - workflow = WorkflowBuilder(start_executor=first_agent).add_edge(first_agent, second_agent).build() + @executor(id="coordinator") + async def forward_via_request( + response: AgentExecutorResponse, + ctx: WorkflowContext[AgentExecutorRequest, Any], + ) -> None: + """Forward full conversation from first agent to second via AgentExecutorRequest.""" + messages = list(response.full_conversation or response.agent_response.messages) + messages.append(Message("user", text="Now summarize the weather.")) + await ctx.send_message( + AgentExecutorRequest(messages=messages, should_respond=True), + target_id=second_agent.id, + ) + + workflow = ( + WorkflowBuilder(start_executor=first_agent, output_executors=[second_agent]) + .add_edge(first_agent, forward_via_request) + .add_edge(forward_via_request, second_agent) + .build() + ) events: list[WorkflowEvent] = [] async for event in workflow.run("Check weather for Seattle and pass result onward.", stream=True): @@ -287,38 +317,6 @@ async def _run_minimal_handoff_workflow(client: AzureOpenAIResponsesClient) -> l return events -@pytest.mark.timeout(600) -@pytest.mark.flaky -@skip_if_azure_integration_tests_disabled -@pytest.mark.parametrize( - "deployment_name", - [ - param(WORKFLOW_REASONING_DEPLOYMENT_NAME, id="reasoning_gpt_5_mini"), - param(WORKFLOW_NON_REASONING_DEPLOYMENT_NAME, id="non_reasoning_gpt_4_1_nano"), - ], -) -async def test_integration_minimal_workflow_handoff_reasoning_vs_non_reasoning(deployment_name: str) -> None: - """Smallest workflow handoff repro should pass across model classes.""" - client = AzureOpenAIResponsesClient(credential=AzureCliCredential(), deployment_name=deployment_name) - client.function_invocation_configuration["max_iterations"] = 3 - - try: - events = await _run_minimal_handoff_workflow(client) - except ServiceResponseException as ex: - error_text = str(ex).lower() - if ( - "deploymentnotfound" in error_text - or "deployment for this resource does not exist" in error_text - or ("deployment" in error_text and "not found" in error_text) - ): - pytest.skip(f"Deployment '{deployment_name}' is unavailable in this environment: {ex}") - raise AssertionError( - f"Minimal workflow handoff failed unexpectedly for deployment '{deployment_name}': {ex}" - ) from ex - - assert any(event.type == "output" for event in events), "Expected workflow output event for workflow run" - - @pytest.mark.flaky @skip_if_azure_integration_tests_disabled @pytest.mark.parametrize( diff --git a/python/packages/core/tests/workflow/test_full_conversation.py b/python/packages/core/tests/workflow/test_full_conversation.py index 2497ab4eec..23861ecc69 100644 --- a/python/packages/core/tests/workflow/test_full_conversation.py +++ b/python/packages/core/tests/workflow/test_full_conversation.py @@ -3,6 +3,7 @@ from collections.abc import AsyncIterable, Awaitable, Sequence from typing import Any +import pytest from pydantic import PrivateAttr from typing_extensions import Never @@ -348,11 +349,15 @@ async def handle( # Simulate a prior run: the target executor has a stored previous_response_id. self._target_exec._session.service_session_id = "resp_PREVIOUS_RUN" # pyright: ignore[reportPrivateUsage] await ctx.send_message( - AgentExecutorRequest(messages=full_conv, should_respond=True, reset_service_session=True), + AgentExecutorRequest(messages=full_conv, should_respond=True), target_id=self._target_exec.id, ) +@pytest.mark.xfail( + reason="reset_service_session support not yet implemented — see #4047", + strict=True, +) async def test_run_request_with_full_history_clears_service_session_id() -> None: """Replaying a full conversation (including function calls) via AgentExecutorRequest must clear service_session_id so the API does not receive both previous_response_id and the diff --git a/python/samples/03-workflows/agents/azure_chat_agents_tool_calls_with_feedback.py b/python/samples/03-workflows/agents/azure_chat_agents_tool_calls_with_feedback.py index 46a0f3c273..9cc9af290d 100644 --- a/python/samples/03-workflows/agents/azure_chat_agents_tool_calls_with_feedback.py +++ b/python/samples/03-workflows/agents/azure_chat_agents_tool_calls_with_feedback.py @@ -155,7 +155,6 @@ async def on_human_feedback( AgentExecutorRequest( messages=original_request.conversation + [Message("user", text="The draft is approved as-is.")], should_respond=True, - reset_service_session=True, ), target_id=self.final_editor_id, ) @@ -171,7 +170,7 @@ async def on_human_feedback( ) conversation.append(Message("user", text=instruction)) await ctx.send_message( - AgentExecutorRequest(messages=conversation, should_respond=True, reset_service_session=True), + AgentExecutorRequest(messages=conversation, should_respond=True), target_id=self.writer_id, ) diff --git a/python/samples/03-workflows/human-in-the-loop/agents_with_HITL.py b/python/samples/03-workflows/human-in-the-loop/agents_with_HITL.py index 6247320d25..b7e6046d40 100644 --- a/python/samples/03-workflows/human-in-the-loop/agents_with_HITL.py +++ b/python/samples/03-workflows/human-in-the-loop/agents_with_HITL.py @@ -108,7 +108,6 @@ async def on_human_feedback( AgentExecutorRequest( messages=original_request.conversation + [Message("user", text="The draft is approved as-is.")], should_respond=True, - reset_service_session=True, ), target_id=self.final_editor_name, ) @@ -124,7 +123,7 @@ async def on_human_feedback( ) conversation.append(Message("user", text=instruction)) await ctx.send_message( - AgentExecutorRequest(messages=conversation, should_respond=True, reset_service_session=True), + AgentExecutorRequest(messages=conversation, should_respond=True), target_id=self.writer_name, ) diff --git a/python/samples/autogen-migration/orchestrations/01_round_robin_group_chat.py b/python/samples/autogen-migration/orchestrations/01_round_robin_group_chat.py index 7e26789c6d..e5c6bd09f8 100644 --- a/python/samples/autogen-migration/orchestrations/01_round_robin_group_chat.py +++ b/python/samples/autogen-migration/orchestrations/01_round_robin_group_chat.py @@ -145,7 +145,7 @@ async def check_approval( await context.yield_output("Content approved.") else: await context.send_message( - AgentExecutorRequest(messages=response.full_conversation, should_respond=True, reset_service_session=True) + AgentExecutorRequest(messages=response.full_conversation, should_respond=True) ) workflow = ( From 155e7b92bab79532ea9102c5d279cf9c0c77b317 Mon Sep 17 00:00:00 2001 From: eavanvalkenburg Date: Thu, 19 Feb 2026 20:25:47 +0100 Subject: [PATCH 09/11] fix: remove reset_service_session from tau2 runner Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- python/packages/lab/tau2/agent_framework_lab_tau2/runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/packages/lab/tau2/agent_framework_lab_tau2/runner.py b/python/packages/lab/tau2/agent_framework_lab_tau2/runner.py index 4849f0cb67..78a9496444 100644 --- a/python/packages/lab/tau2/agent_framework_lab_tau2/runner.py +++ b/python/packages/lab/tau2/agent_framework_lab_tau2/runner.py @@ -267,7 +267,7 @@ async def conversation_orchestrator( # Send flipped messages to the opposite agent # Critical: Target ID must be specified to prevent broadcasting to both agents await ctx.send_message( - AgentExecutorRequest(messages=flipped, should_respond=True, reset_service_session=True), + AgentExecutorRequest(messages=flipped, should_respond=True), target_id=USER_SIMULATOR_ID if is_from_agent else ASSISTANT_AGENT_ID, ) From 9ff3bd63eef3d399ca028c83883c42479e907093 Mon Sep 17 00:00:00 2001 From: eavanvalkenburg Date: Thu, 19 Feb 2026 20:29:12 +0100 Subject: [PATCH 10/11] two other reverts --- .../azure/test_azure_responses_client.py | 64 ------------------- .../workflow_evaluation/run_evaluation.py | 2 +- 2 files changed, 1 insertion(+), 65 deletions(-) diff --git a/python/packages/core/tests/azure/test_azure_responses_client.py b/python/packages/core/tests/azure/test_azure_responses_client.py index 498e9c3834..2a718d0ce5 100644 --- a/python/packages/core/tests/azure/test_azure_responses_client.py +++ b/python/packages/core/tests/azure/test_azure_responses_client.py @@ -13,17 +13,11 @@ from agent_framework import ( Agent, - AgentExecutorRequest, - AgentExecutorResponse, AgentResponse, ChatResponse, Content, Message, SupportsChatGetResponse, - WorkflowBuilder, - WorkflowContext, - WorkflowEvent, - executor, tool, ) from agent_framework.azure import AzureOpenAIResponsesClient @@ -258,64 +252,6 @@ def test_serialize(azure_openai_unit_test_env: dict[str, str]) -> None: # region Integration Tests -WORKFLOW_REASONING_DEPLOYMENT_NAME = os.getenv( - "AZURE_OPENAI_WORKFLOW_REASONING_DEPLOYMENT_NAME", - os.getenv("AZURE_OPENAI_REASONING_DEPLOYMENT_NAME", "gpt-5-mini"), -) -WORKFLOW_NON_REASONING_DEPLOYMENT_NAME = os.getenv( - "AZURE_OPENAI_WORKFLOW_NON_REASONING_DEPLOYMENT_NAME", - os.getenv("AZURE_OPENAI_NON_REASONING_DEPLOYMENT_NAME", "gpt-4.1-nano"), -) - - -async def _run_minimal_handoff_workflow(client: AzureOpenAIResponsesClient) -> list[WorkflowEvent]: - """Run a minimal workflow that hands off from a tool-calling agent to a second agent. - - Uses AgentExecutorRequest to replay full conversation history to the second agent, - which triggers the duplicate-item error on reasoning models when service_session_id - is not cleared. - """ - first_agent = client.as_agent( - id="minimal-handoff-first", - name="minimal-handoff-first", - instructions="Use get_weather exactly once for Seattle, then provide a short summary.", - tools=[get_weather], - default_options={"tool_choice": {"mode": "required", "required_function_name": "get_weather"}}, - ) - - second_client = AzureOpenAIResponsesClient(credential=AzureCliCredential(), deployment_name=client.deployment_name) - second_agent = second_client.as_agent( - id="minimal-handoff-second", - name="minimal-handoff-second", - instructions="Summarize the prior weather result in one sentence.", - ) - - @executor(id="coordinator") - async def forward_via_request( - response: AgentExecutorResponse, - ctx: WorkflowContext[AgentExecutorRequest, Any], - ) -> None: - """Forward full conversation from first agent to second via AgentExecutorRequest.""" - messages = list(response.full_conversation or response.agent_response.messages) - messages.append(Message("user", text="Now summarize the weather.")) - await ctx.send_message( - AgentExecutorRequest(messages=messages, should_respond=True), - target_id=second_agent.id, - ) - - workflow = ( - WorkflowBuilder(start_executor=first_agent, output_executors=[second_agent]) - .add_edge(first_agent, forward_via_request) - .add_edge(forward_via_request, second_agent) - .build() - ) - - events: list[WorkflowEvent] = [] - async for event in workflow.run("Check weather for Seattle and pass result onward.", stream=True): - events.append(event) - - return events - @pytest.mark.flaky @skip_if_azure_integration_tests_disabled diff --git a/python/samples/05-end-to-end/workflow_evaluation/run_evaluation.py b/python/samples/05-end-to-end/workflow_evaluation/run_evaluation.py index 39550eb13d..6ad3641721 100644 --- a/python/samples/05-end-to-end/workflow_evaluation/run_evaluation.py +++ b/python/samples/05-end-to-end/workflow_evaluation/run_evaluation.py @@ -199,7 +199,7 @@ async def main(): openai_client = create_openai_client() # Model configuration - workflow_agent_model = os.environ.get("AZURE_AI_MODEL_DEPLOYMENT_NAME_WORKFLOW", "gpt-5-mini") + workflow_agent_model = os.environ.get("AZURE_AI_MODEL_DEPLOYMENT_NAME_WORKFLOW", "gpt-4.1-nano") eval_model = os.environ.get("AZURE_AI_MODEL_DEPLOYMENT_NAME_EVAL", "gpt-5.2") # Focus on these agents, uncomment other ones you want to have evals run on From 05d336c48f09c8bc42381f5e85a2897a3b7ec192 Mon Sep 17 00:00:00 2001 From: eavanvalkenburg Date: Thu, 19 Feb 2026 20:50:27 +0100 Subject: [PATCH 11/11] fix sample --- .../agents/azure_chat_agents_tool_calls_with_feedback.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/python/samples/03-workflows/agents/azure_chat_agents_tool_calls_with_feedback.py b/python/samples/03-workflows/agents/azure_chat_agents_tool_calls_with_feedback.py index 9cc9af290d..68c9eb5ae0 100644 --- a/python/samples/03-workflows/agents/azure_chat_agents_tool_calls_with_feedback.py +++ b/python/samples/03-workflows/agents/azure_chat_agents_tool_calls_with_feedback.py @@ -153,7 +153,7 @@ async def on_human_feedback( # Human approved the draft as-is; forward it unchanged. await ctx.send_message( AgentExecutorRequest( - messages=original_request.conversation + [Message("user", text="The draft is approved as-is.")], + messages=[*original_request.conversation, *[Message("user", text="The draft is approved as-is.")]], should_respond=True, ), target_id=self.final_editor_id, @@ -161,16 +161,14 @@ async def on_human_feedback( return # Human provided feedback; prompt the writer to revise. - conversation: list[Message] = list(original_request.conversation) instruction = ( "A human reviewer shared the following guidance:\n" f"{note or 'No specific guidance provided.'}\n\n" "Rewrite the draft from the previous assistant message into a polished final version. " "Keep the response under 120 words and reflect any requested tone adjustments." ) - conversation.append(Message("user", text=instruction)) await ctx.send_message( - AgentExecutorRequest(messages=conversation, should_respond=True), + AgentExecutorRequest(messages=[Message("user", text=instruction)], should_respond=True), target_id=self.writer_id, )