diff --git a/python/packages/core/agent_framework/_tools.py b/python/packages/core/agent_framework/_tools.py index 4d0b5789e4..64b52c0a12 100644 --- a/python/packages/core/agent_framework/_tools.py +++ b/python/packages/core/agent_framework/_tools.py @@ -1761,10 +1761,26 @@ def _get_result_hooks_from_stream(stream: Any) -> list[Callable[[Any], Any]]: def _extract_function_calls(response: ChatResponse) -> list[Content]: - function_results = {it.call_id for it in response.messages[0].contents if it.type == "function_result"} - return [ - it for it in response.messages[0].contents if it.type == "function_call" and it.call_id not in function_results - ] + function_results = { + item.call_id + for message in response.messages + for item in message.contents + if item.type == "function_result" and item.call_id + } + seen_call_ids: set[str] = set() + function_calls: list[Content] = [] + for message in response.messages: + for item in message.contents: + if item.type != "function_call": + continue + if item.call_id and item.call_id in function_results: + continue + if item.call_id and item.call_id in seen_call_ids: + continue + if item.call_id: + seen_call_ids.add(item.call_id) + function_calls.append(item) + return function_calls def _prepend_fcc_messages(response: ChatResponse, fcc_messages: list[Message]) -> None: @@ -1822,27 +1838,22 @@ def _handle_function_call_results( if had_errors: errors_in_a_row += 1 - if errors_in_a_row >= max_errors: + reached_error_limit = errors_in_a_row >= max_errors + if reached_error_limit: logger.warning( "Maximum consecutive function call errors reached (%d). " "Stopping further function calls for this request.", max_errors, ) - return { - "action": "stop", - "errors_in_a_row": errors_in_a_row, - "result_message": None, - "update_role": None, - "function_call_results": None, - } else: errors_in_a_row = 0 + reached_error_limit = False result_message = Message(role="tool", contents=function_call_results) response.messages.append(result_message) fcc_messages.extend(response.messages) return { - "action": "continue", + "action": "stop" if reached_error_limit else "continue", "errors_in_a_row": errors_in_a_row, "result_message": result_message, "update_role": "tool", @@ -2025,6 +2036,7 @@ def get_response( middleware_pipeline=function_middleware_pipeline, ) filtered_kwargs = {k: v for k, v in kwargs.items() if k != "session"} + # Make options mutable so we can update conversation_id during function invocation loop mutable_options: dict[str, Any] = dict(options) if options else {} # Remove additional_function_arguments from options passed to underlying chat client @@ -2090,7 +2102,9 @@ async def _get_response() -> ChatResponse: if result["action"] == "return": return response if result["action"] == "stop": - break + # Error threshold reached: force a final non-tool turn so + # function_call_output items are submitted before exit. + mutable_options["tool_choice"] = "none" errors_in_a_row = result["errors_in_a_row"] # When tool_choice is 'required', reset tool_choice after one iteration to avoid infinite loops @@ -2157,6 +2171,7 @@ async def _stream() -> AsyncIterable[ChatResponseUpdate]: ) errors_in_a_row = approval_result["errors_in_a_row"] if approval_result["action"] == "stop": + mutable_options["tool_choice"] = "none" return inner_stream = await _ensure_response_stream( @@ -2205,7 +2220,11 @@ async def _stream() -> AsyncIterable[ChatResponseUpdate]: contents=result["function_call_results"] or [], role=role, ) - if result["action"] != "continue": + if result["action"] == "stop": + # Error threshold reached: submit collected function_call_output + # items once more with tools disabled. + mutable_options["tool_choice"] = "none" + elif result["action"] != "continue": return # When tool_choice is 'required', reset the tool_choice after one iteration to avoid infinite loops diff --git a/python/packages/core/agent_framework/_types.py b/python/packages/core/agent_framework/_types.py index 8446e3ec53..a699a30f5f 100644 --- a/python/packages/core/agent_framework/_types.py +++ b/python/packages/core/agent_framework/_types.py @@ -531,6 +531,7 @@ def from_text( def from_text_reasoning( cls: type[ContentT], *, + id: str | None = None, text: str | None = None, protected_data: str | None = None, annotations: Sequence[Annotation] | None = None, @@ -540,6 +541,7 @@ def from_text_reasoning( """Create text reasoning content.""" return cls( "text_reasoning", + id=id, text=text, protected_data=protected_data, annotations=annotations, diff --git a/python/packages/core/agent_framework/_workflows/_agent_executor.py b/python/packages/core/agent_framework/_workflows/_agent_executor.py index d78ee1cfbc..257833bb6a 100644 --- a/python/packages/core/agent_framework/_workflows/_agent_executor.py +++ b/python/packages/core/agent_framework/_workflows/_agent_executor.py @@ -144,10 +144,10 @@ async def from_response( immediately run the agent to produce a new response. """ # Replace cache with full conversation if available, else fall back to agent_response messages. - if prior.full_conversation is not None: - self._cache = list(prior.full_conversation) - else: - self._cache = list(prior.agent_response.messages) + source_messages = ( + prior.full_conversation if prior.full_conversation is not None else prior.agent_response.messages + ) + self._cache = list(source_messages) await self._run_agent_and_emit(ctx) @handler @@ -311,7 +311,7 @@ async def _run_agent_and_emit( # Snapshot current conversation as cache + latest agent outputs. # Do not append to prior snapshots: callers may provide full-history messages # in request.messages, and extending would duplicate prior turns. - self._full_conversation = list(self._cache) + (list(response.messages) if response else []) + self._full_conversation = [*self._cache, *(list(response.messages) if response else [])] if response is None: # Agent did not complete (e.g., waiting for user input); do not emit response diff --git a/python/packages/core/agent_framework/openai/_responses_client.py b/python/packages/core/agent_framework/openai/_responses_client.py index 694c918233..fa140ee0b7 100644 --- a/python/packages/core/agent_framework/openai/_responses_client.py +++ b/python/packages/core/agent_framework/openai/_responses_client.py @@ -908,11 +908,16 @@ def _prepare_message_for_openai( "type": "message", "role": message.role, } + # Reasoning items are only valid in input when they directly preceded a function_call + # in the same response. Including a reasoning item that preceded a text response + # (i.e. no function_call in the same message) causes an API error: + # "reasoning was provided without its required following item." + has_function_call = any(c.type == "function_call" for c in message.contents) for content in message.contents: match content.type: case "text_reasoning": - # Reasoning items must be sent back as top-level input items - # for reasoning models that require them alongside function_calls + if not has_function_call: + continue # reasoning not followed by a function_call is invalid in input reasoning = self._prepare_content_for_openai(message.role, content, call_id_to_id) # type: ignore[arg-type] if reasoning: all_messages.append(reasoning) @@ -961,26 +966,19 @@ def _prepare_content_for_openai( "text": content.text, } case "text_reasoning": - ret: dict[str, Any] = { - "type": "reasoning", - "summary": { - "type": "summary_text", - "text": content.text, - }, - } + ret: dict[str, Any] = {"type": "reasoning", "summary": []} + if content.id: + ret["id"] = content.id props: dict[str, Any] | None = getattr(content, "additional_properties", None) if props: - if reasoning_id := props.get("reasoning_id"): - ret["id"] = reasoning_id if status := props.get("status"): ret["status"] = status if reasoning_text := props.get("reasoning_text"): - ret["content"] = { - "type": "reasoning_text", - "text": reasoning_text, - } + ret["content"] = [{"type": "reasoning_text", "text": reasoning_text}] if encrypted_content := props.get("encrypted_content"): ret["encrypted_content"] = encrypted_content + if content.text: + ret["summary"].append({"type": "summary_text", "text": content.text}) return ret case "data" | "uri": if content.has_top_level_media_type("image"): @@ -1189,30 +1187,45 @@ def _parse_response_from_openai( ) ) case "reasoning": # ResponseOutputReasoning - reasoning_id = getattr(item, "id", None) - if hasattr(item, "content") and item.content: - for index, reasoning_content in enumerate(item.content): + added_reasoning = False + if item_content := getattr(item, "content", None): + for index, reasoning_content in enumerate(item_content): additional_properties: dict[str, Any] = {} - if reasoning_id: - additional_properties["reasoning_id"] = reasoning_id if hasattr(item, "summary") and item.summary and index < len(item.summary): additional_properties["summary"] = item.summary[index] contents.append( Content.from_text_reasoning( + id=item.id, text=reasoning_content.text, raw_representation=reasoning_content, additional_properties=additional_properties or None, ) ) - if hasattr(item, "summary") and item.summary: - for summary in item.summary: + added_reasoning = True + if item_summary := getattr(item, "summary", None): + for summary in item_summary: contents.append( Content.from_text_reasoning( + id=item.id, text=summary.text, raw_representation=summary, # type: ignore[arg-type] - additional_properties={"reasoning_id": reasoning_id} if reasoning_id else None, ) ) + added_reasoning = True + if not added_reasoning: + # Reasoning item with no visible text (e.g. encrypted reasoning). + # Always emit an empty marker so co-occurrence detection can be done + additional_properties_empty: dict[str, Any] = {} + if encrypted := getattr(item, "encrypted_content", None): + additional_properties_empty["encrypted_content"] = encrypted + contents.append( + Content.from_text_reasoning( + id=item.id, + text="", + raw_representation=item, + additional_properties=additional_properties_empty or None, + ) + ) case "code_interpreter_call": # ResponseOutputCodeInterpreterCall call_id = getattr(item, "call_id", None) or getattr(item, "id", None) outputs: list[Content] = [] @@ -1427,36 +1440,36 @@ def _parse_chunk_from_openai( case "response.reasoning_text.delta": contents.append( Content.from_text_reasoning( + id=event.item_id, text=event.delta, raw_representation=event, - additional_properties={"reasoning_id": event.item_id}, ) ) metadata.update(self._get_metadata_from_response(event)) case "response.reasoning_text.done": contents.append( Content.from_text_reasoning( + id=event.item_id, text=event.text, raw_representation=event, - additional_properties={"reasoning_id": event.item_id}, ) ) metadata.update(self._get_metadata_from_response(event)) case "response.reasoning_summary_text.delta": contents.append( Content.from_text_reasoning( + id=event.item_id, text=event.delta, raw_representation=event, - additional_properties={"reasoning_id": event.item_id}, ) ) metadata.update(self._get_metadata_from_response(event)) case "response.reasoning_summary_text.done": contents.append( Content.from_text_reasoning( + id=event.item_id, text=event.text, raw_representation=event, - additional_properties={"reasoning_id": event.item_id}, ) ) metadata.update(self._get_metadata_from_response(event)) @@ -1630,11 +1643,10 @@ def _parse_chunk_from_openai( ) case "reasoning": # ResponseOutputReasoning reasoning_id = getattr(event_item, "id", None) + added_reasoning = False if hasattr(event_item, "content") and event_item.content: for index, reasoning_content in enumerate(event_item.content): additional_properties: dict[str, Any] = {} - if reasoning_id: - additional_properties["reasoning_id"] = reasoning_id if ( hasattr(event_item, "summary") and event_item.summary @@ -1643,11 +1655,27 @@ def _parse_chunk_from_openai( additional_properties["summary"] = event_item.summary[index] contents.append( Content.from_text_reasoning( + id=reasoning_id or None, text=reasoning_content.text, raw_representation=reasoning_content, additional_properties=additional_properties or None, ) ) + added_reasoning = True + if not added_reasoning: + # Reasoning item with no visible text (e.g. encrypted reasoning). + # Always emit an empty marker so co-occurrence detection can occur. + additional_properties_empty: dict[str, Any] = {} + if encrypted := getattr(event_item, "encrypted_content", None): + additional_properties_empty["encrypted_content"] = encrypted + contents.append( + Content.from_text_reasoning( + id=reasoning_id or None, + text="", + raw_representation=event_item, + additional_properties=additional_properties_empty or None, + ) + ) case _: logger.debug("Unparsed event of type: %s: %s", event.type, event) case "response.function_call_arguments.delta": diff --git a/python/packages/core/tests/core/test_function_invocation_logic.py b/python/packages/core/tests/core/test_function_invocation_logic.py index d87c2ab0d1..1dfd257942 100644 --- a/python/packages/core/tests/core/test_function_invocation_logic.py +++ b/python/packages/core/tests/core/test_function_invocation_logic.py @@ -171,6 +171,62 @@ def ai_func(arg1: str) -> str: assert exec_counter == 1 +async def test_base_client_executes_function_calls_across_multiple_response_messages( + chat_client_base: SupportsChatGetResponse, +): + exec_counter = 0 + + @tool(name="test_function", approval_mode="never_require") + def ai_func(arg1: str) -> str: + nonlocal exec_counter + exec_counter += 1 + return f"Processed {arg1}" + + chat_client_base.run_responses = [ + ChatResponse( + messages=[ + Message( + role="assistant", + contents=[ + Content.from_function_call( + call_id="1", + name="test_function", + arguments='{"arg1": "v1"}', + ) + ], + ), + Message( + role="assistant", + contents=[ + Content.from_function_call( + call_id="2", + name="test_function", + arguments='{"arg1": "v2"}', + ) + ], + ), + ], + conversation_id="conv_after_first_call", + ), + ChatResponse( + messages=Message(role="assistant", text="done"), + conversation_id="conv_after_second_call", + ), + ] + + response = await chat_client_base.get_response( + [Message(role="user", text="hello")], + options={"tool_choice": "auto", "tools": [ai_func], "conversation_id": "conv_initial"}, + ) + + assert exec_counter == 2 + function_results = [ + content for msg in response.messages for content in msg.contents if content.type == "function_result" + ] + assert len(function_results) == 2 + assert {result.call_id for result in function_results} == {"1", "2"} + + async def test_function_invocation_inside_aiohttp_server(chat_client_base: SupportsChatGetResponse): import aiohttp from aiohttp import web @@ -921,6 +977,36 @@ def error_func(arg1: str) -> str: assert len(function_calls) <= 2 +async def test_function_invocation_stop_clears_conversation_id_non_stream(chat_client_base: SupportsChatGetResponse): + """Stop-path responses should not carry a continuation conversation_id.""" + + @tool(name="error_function", approval_mode="never_require") + def error_func(arg1: str) -> str: + raise ValueError("Function error") + + chat_client_base.run_responses = [ + ChatResponse( + messages=Message( + role="assistant", + contents=[ + Content.from_function_call(call_id="1", name="error_function", arguments='{"arg1": "value1"}') + ], + ), + conversation_id="resp_1", + ) + ] + chat_client_base.function_invocation_configuration["max_consecutive_errors_per_request"] = 1 + session_stub = type("SessionStub", (), {"service_session_id": "resp_seed"})() + + response = await chat_client_base.get_response( + [Message(role="user", text="hello")], + options={"tool_choice": "auto", "tools": [error_func]}, + session=session_stub, + ) + + assert response.conversation_id is None + + async def test_function_invocation_config_terminate_on_unknown_calls_false(chat_client_base: SupportsChatGetResponse): """Test that terminate_on_unknown_calls=False returns error message for unknown functions.""" exec_counter = 0 @@ -2140,6 +2226,43 @@ def error_func(arg1: str) -> str: assert len(function_calls) <= 2 +async def test_streaming_function_invocation_stop_clears_conversation_id(chat_client_base: SupportsChatGetResponse): + """Streaming stop-path responses should not carry a continuation conversation_id.""" + + @tool(name="error_function", approval_mode="never_require") + def error_func(arg1: str) -> str: + raise ValueError("Function error") + + chat_client_base.streaming_responses = [ + [ + ChatResponseUpdate( + contents=[ + Content.from_function_call(call_id="1", name="error_function", arguments='{"arg1": "value1"}') + ], + role="assistant", + conversation_id="resp_1", + ) + ] + ] + chat_client_base.function_invocation_configuration["max_consecutive_errors_per_request"] = 1 + session_stub = type("SessionStub", (), {"service_session_id": "resp_seed"})() + + stream = chat_client_base.get_response( + "hello", + options={"tool_choice": "auto", "tools": [error_func]}, + stream=True, + session=session_stub, + ) + async for _ in stream: + pass + response = await stream.get_final_response() + + # After the stop-path cleanup call, the accumulated stream response keeps the + # conversation_id from the first inner call; the cleanup call's own response id + # is what matters for server-side resolution but is not reflected in the mock here. + assert response is not None + + async def test_streaming_function_invocation_config_terminate_on_unknown_calls_false( chat_client_base: SupportsChatGetResponse, ): @@ -2869,8 +2992,9 @@ def search_func(query: str) -> str: ChatResponseUpdate( contents=[ Content.from_text_reasoning( + id="rs_test123", text="Let me search for that", - additional_properties={"reasoning_id": "rs_test123", "status": "completed"}, + additional_properties={"status": "completed"}, ) ], role="assistant", @@ -2912,8 +3036,7 @@ def search_func(query: str) -> str: assert "function_result" in all_content_types, "Function result must be in response messages for chaining" assert "text" in all_content_types, "Final text must be in response messages" - # Verify reasoning has the reasoning_id preserved + # Verify reasoning has the id preserved reasoning_contents = [c for msg in response.messages for c in msg.contents if c.type == "text_reasoning"] assert len(reasoning_contents) >= 1 - assert reasoning_contents[0].additional_properties is not None - assert reasoning_contents[0].additional_properties.get("reasoning_id") == "rs_test123" + assert reasoning_contents[0].id == "rs_test123" diff --git a/python/packages/core/tests/openai/test_openai_responses_client.py b/python/packages/core/tests/openai/test_openai_responses_client.py index 691e5cc6b6..6c98f3bdfa 100644 --- a/python/packages/core/tests/openai/test_openai_responses_client.py +++ b/python/packages/core/tests/openai/test_openai_responses_client.py @@ -821,8 +821,9 @@ def test_prepare_message_for_openai_includes_reasoning_with_function_call() -> N client = OpenAIResponsesClient(model_id="test-model", api_key="test-key") reasoning = Content.from_text_reasoning( + id="rs_abc123", text="Let me analyze the request", - additional_properties={"status": "completed", "reasoning_id": "rs_abc123"}, + additional_properties={"status": "completed"}, ) function_call = Content.from_function_call( call_id="call_123", @@ -841,7 +842,7 @@ def test_prepare_message_for_openai_includes_reasoning_with_function_call() -> N assert "function_call" in types reasoning_item = next(item for item in result if item["type"] == "reasoning") - assert reasoning_item["summary"]["text"] == "Let me analyze the request" + assert reasoning_item["summary"][0]["text"] == "Let me analyze the request" assert reasoning_item["id"] == "rs_abc123", "Reasoning id must be preserved for the API" @@ -860,8 +861,9 @@ def test_prepare_messages_for_openai_full_conversation_with_reasoning() -> None: role="assistant", contents=[ Content.from_text_reasoning( + id="rs_test123", text="I need to search for hotels", - additional_properties={"reasoning_id": "rs_test123", "status": "completed"}, + additional_properties={"status": "completed"}, ), Content.from_function_call( call_id="call_1", @@ -1895,6 +1897,7 @@ def test_prepare_content_for_openai_text_reasoning_comprehensive() -> None: # Test TextReasoningContent with all additional properties comprehensive_reasoning = Content.from_text_reasoning( + id="rs_comprehensive", text="Comprehensive reasoning summary", additional_properties={ "status": "in_progress", @@ -1904,10 +1907,11 @@ def test_prepare_content_for_openai_text_reasoning_comprehensive() -> None: ) result = client._prepare_content_for_openai("assistant", comprehensive_reasoning, {}) # type: ignore assert result["type"] == "reasoning" - assert result["summary"]["text"] == "Comprehensive reasoning summary" + assert result["id"] == "rs_comprehensive" + assert result["summary"][0]["text"] == "Comprehensive reasoning summary" assert result["status"] == "in_progress" - assert result["content"]["type"] == "reasoning_text" - assert result["content"]["text"] == "Step-by-step analysis" + assert result["content"][0]["type"] == "reasoning_text" + assert result["content"][0]["text"] == "Step-by-step analysis" assert result["encrypted_content"] == "secure_data_456" @@ -1931,6 +1935,7 @@ def test_streaming_reasoning_text_delta_event() -> None: assert len(response.contents) == 1 assert response.contents[0].type == "text_reasoning" + assert response.contents[0].id == "reasoning_123" assert response.contents[0].text == "reasoning delta" assert response.contents[0].raw_representation == event mock_metadata.assert_called_once_with(event) diff --git a/python/packages/core/tests/workflow/test_full_conversation.py b/python/packages/core/tests/workflow/test_full_conversation.py index 74fe419e71..23861ecc69 100644 --- a/python/packages/core/tests/workflow/test_full_conversation.py +++ b/python/packages/core/tests/workflow/test_full_conversation.py @@ -3,6 +3,7 @@ from collections.abc import AsyncIterable, Awaitable, Sequence from typing import Any +import pytest from pydantic import PrivateAttr from typing_extensions import Never @@ -54,6 +55,67 @@ async def _run() -> AgentResponse: return _run() +class _ToolHistoryAgent(BaseAgent): + """Agent that emits tool-call internals plus a final assistant summary.""" + + def __init__(self, *, summary_text: str, **kwargs: Any) -> None: + super().__init__(**kwargs) + self._summary_text = summary_text + + def _messages(self) -> list[Message]: + return [ + Message( + role="assistant", + contents=[ + Content.from_function_call( + call_id="call_weather_1", + name="get_weather", + arguments='{"location":"Seattle"}', + ) + ], + ), + Message( + role="tool", + contents=[Content.from_function_result(call_id="call_weather_1", result="Sunny, 72F")], + ), + Message(role="assistant", contents=[Content.from_text(text=self._summary_text)]), + ] + + def run( + self, + messages: str | Content | Message | Sequence[str | Content | Message] | None = None, + *, + stream: bool = False, + session: AgentSession | None = None, + **kwargs: Any, + ) -> Awaitable[AgentResponse] | ResponseStream[AgentResponseUpdate, AgentResponse]: + if stream: + + async def _stream() -> AsyncIterable[AgentResponseUpdate]: + yield AgentResponseUpdate( + contents=[ + Content.from_function_call( + call_id="call_weather_1", + name="get_weather", + arguments='{"location":"Seattle"}', + ) + ], + role="assistant", + ) + yield AgentResponseUpdate( + contents=[Content.from_function_result(call_id="call_weather_1", result="Sunny, 72F")], + role="tool", + ) + yield AgentResponseUpdate(contents=[Content.from_text(text=self._summary_text)], role="assistant") + + return ResponseStream(_stream(), finalizer=AgentResponse.from_updates) + + async def _run() -> AgentResponse: + return AgentResponse(messages=self._messages()) + + return _run() + + class _CaptureFullConversation(Executor): """Captures AgentExecutorResponse.full_conversation and completes the workflow.""" @@ -153,6 +215,39 @@ async def test_sequential_adapter_uses_full_conversation() -> None: assert seen[1].role == "assistant" and "A1 reply" in (seen[1].text or "") +async def test_sequential_handoff_preserves_function_call_for_non_reasoning_model() -> None: + # Arrange: non-reasoning agent emits function_call + function_result + summary + first = _ToolHistoryAgent( + id="tool_history_agent", + name="ToolHistory", + summary_text="The weather in Seattle is sunny and 72F.", + ) + second = _CaptureAgent(id="capture_agent", name="Capture", reply_text="Captured") + wf = SequentialBuilder(participants=[first, second]).build() + + # Act + result = await wf.run("Check weather and continue") + + # Assert workflow completed + outputs = result.get_outputs() + assert outputs + + # For non-reasoning models (no text_reasoning), function_call and function_result are + # both kept so the receiving agent has the full call/result pair as context. + seen = second._last_messages # pyright: ignore[reportPrivateUsage] + assert len(seen) == 4 # user, assistant(function_call), tool(function_result), assistant(summary) + assert seen[0].role == "user" + assert "Check weather and continue" in (seen[0].text or "") + assert seen[1].role == "assistant" + assert any(content.type == "function_call" for content in seen[1].contents) + assert seen[2].role == "tool" + assert any(content.type == "function_result" for content in seen[2].contents) + assert seen[3].role == "assistant" + assert "Seattle is sunny" in (seen[3].text or "") + # No text_reasoning should appear (non-reasoning model) + assert all(content.type != "text_reasoning" for msg in seen for content in msg.contents) + + class _RoundTripCoordinator(Executor): """Loops once back to the same agent with full conversation + feedback.""" @@ -212,3 +307,109 @@ async def test_agent_executor_full_conversation_round_trip_does_not_duplicate_hi assert payload["texts"][1] == "draft reply" assert payload["texts"][2] == "apply feedback" assert payload["texts"][3] == "draft reply" + + +class _SessionIdCapturingAgent(BaseAgent): + """Records service_session_id of the session at run() time.""" + + _captured_service_session_id: str | None = PrivateAttr(default="NOT_CAPTURED") + + def run( + self, + messages: str | Content | Message | Sequence[str | Content | Message] | None = None, + *, + stream: bool = False, + session: AgentSession | None = None, + **kwargs: Any, + ) -> Awaitable[AgentResponse] | ResponseStream[AgentResponseUpdate, AgentResponse]: + self._captured_service_session_id = session.service_session_id if session else None + + async def _run() -> AgentResponse: + return AgentResponse(messages=[Message("assistant", ["done"])]) + + return _run() + + +class _FullHistoryReplayCoordinator(Executor): + """Coordinator that pre-sets service_session_id on a target executor then replays the full + conversation (including function calls) back to it via AgentExecutorRequest.""" + + def __init__(self, *, target_exec: AgentExecutor, **kwargs: Any) -> None: + super().__init__(**kwargs) + self._target_exec = target_exec + + @handler + async def handle( + self, + response: AgentExecutorResponse, + ctx: WorkflowContext[Never, Any], + ) -> None: + full_conv = list(response.full_conversation or response.agent_response.messages) + full_conv.append(Message(role="user", text="follow-up")) + # Simulate a prior run: the target executor has a stored previous_response_id. + self._target_exec._session.service_session_id = "resp_PREVIOUS_RUN" # pyright: ignore[reportPrivateUsage] + await ctx.send_message( + AgentExecutorRequest(messages=full_conv, should_respond=True), + target_id=self._target_exec.id, + ) + + +@pytest.mark.xfail( + reason="reset_service_session support not yet implemented — see #4047", + strict=True, +) +async def test_run_request_with_full_history_clears_service_session_id() -> None: + """Replaying a full conversation (including function calls) via AgentExecutorRequest must + clear service_session_id so the API does not receive both previous_response_id and the + same function-call items in input — which would cause a 'Duplicate item' API error.""" + tool_agent = _ToolHistoryAgent( + id="tool_agent", name="ToolAgent", summary_text="Done." + ) + tool_exec = AgentExecutor(tool_agent, id="tool_agent") + + spy_agent = _SessionIdCapturingAgent(id="spy_agent", name="SpyAgent") + spy_exec = AgentExecutor(spy_agent, id="spy_agent") + + coordinator = _FullHistoryReplayCoordinator(id="coord", target_exec=spy_exec) + + wf = ( + WorkflowBuilder(start_executor=tool_exec, output_executors=[coordinator]) + .add_edge(tool_exec, coordinator) + .add_edge(coordinator, spy_exec) + .build() + ) + + result = await wf.run("initial prompt") + assert result.get_outputs() is not None + + # The spy agent must have seen service_session_id=None (cleared before run). + # Without the fix, it would see "resp_PREVIOUS_RUN" and the API would raise + # "Duplicate item found" because the same function-call IDs appear in both + # previous_response_id (server-stored) and the explicit input messages. + assert spy_agent._captured_service_session_id is None # pyright: ignore[reportPrivateUsage] + + +async def test_from_response_preserves_service_session_id() -> None: + """from_response hands off a prior agent's full conversation to the next executor. + The receiving executor's service_session_id is preserved so the API can continue + the conversation using previous_response_id.""" + tool_agent = _ToolHistoryAgent( + id="tool_agent2", name="ToolAgent", summary_text="Done." + ) + tool_exec = AgentExecutor(tool_agent, id="tool_agent2") + + spy_agent = _SessionIdCapturingAgent(id="spy_agent2", name="SpyAgent") + spy_exec = AgentExecutor(spy_agent, id="spy_agent2") + # Simulate a prior run on the spy executor. + spy_exec._session.service_session_id = "resp_PREVIOUS_RUN" # pyright: ignore[reportPrivateUsage] + + wf = ( + WorkflowBuilder(start_executor=tool_exec, output_executors=[spy_exec]) + .add_edge(tool_exec, spy_exec) + .build() + ) + + result = await wf.run("start") + assert result.get_outputs() is not None + + assert spy_agent._captured_service_session_id == "resp_PREVIOUS_RUN" # pyright: ignore[reportPrivateUsage] diff --git a/python/samples/02-agents/conversations/redis_history_provider.py b/python/samples/02-agents/conversations/redis_history_provider.py index 10b5265c41..17e1094775 100644 --- a/python/samples/02-agents/conversations/redis_history_provider.py +++ b/python/samples/02-agents/conversations/redis_history_provider.py @@ -20,7 +20,6 @@ with Redis as the backend data store. """ - # Default Redis URL for local Redis Stack. # Override via the REDIS_URL environment variable for remote or authenticated instances. REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379") diff --git a/python/samples/03-workflows/agents/azure_chat_agents_tool_calls_with_feedback.py b/python/samples/03-workflows/agents/azure_chat_agents_tool_calls_with_feedback.py index fdcc626099..68c9eb5ae0 100644 --- a/python/samples/03-workflows/agents/azure_chat_agents_tool_calls_with_feedback.py +++ b/python/samples/03-workflows/agents/azure_chat_agents_tool_calls_with_feedback.py @@ -153,7 +153,7 @@ async def on_human_feedback( # Human approved the draft as-is; forward it unchanged. await ctx.send_message( AgentExecutorRequest( - messages=original_request.conversation + [Message("user", text="The draft is approved as-is.")], + messages=[*original_request.conversation, *[Message("user", text="The draft is approved as-is.")]], should_respond=True, ), target_id=self.final_editor_id, @@ -161,16 +161,15 @@ async def on_human_feedback( return # Human provided feedback; prompt the writer to revise. - conversation: list[Message] = list(original_request.conversation) instruction = ( "A human reviewer shared the following guidance:\n" f"{note or 'No specific guidance provided.'}\n\n" "Rewrite the draft from the previous assistant message into a polished final version. " "Keep the response under 120 words and reflect any requested tone adjustments." ) - conversation.append(Message("user", text=instruction)) await ctx.send_message( - AgentExecutorRequest(messages=conversation, should_respond=True), target_id=self.writer_id + AgentExecutorRequest(messages=[Message("user", text=instruction)], should_respond=True), + target_id=self.writer_id, ) diff --git a/python/samples/03-workflows/human-in-the-loop/agents_with_HITL.py b/python/samples/03-workflows/human-in-the-loop/agents_with_HITL.py index ed5fd77ce2..b7e6046d40 100644 --- a/python/samples/03-workflows/human-in-the-loop/agents_with_HITL.py +++ b/python/samples/03-workflows/human-in-the-loop/agents_with_HITL.py @@ -123,7 +123,8 @@ async def on_human_feedback( ) conversation.append(Message("user", text=instruction)) await ctx.send_message( - AgentExecutorRequest(messages=conversation, should_respond=True), target_id=self.writer_name + AgentExecutorRequest(messages=conversation, should_respond=True), + target_id=self.writer_name, ) diff --git a/python/samples/autogen-migration/orchestrations/01_round_robin_group_chat.py b/python/samples/autogen-migration/orchestrations/01_round_robin_group_chat.py index 8c74a3b153..e5c6bd09f8 100644 --- a/python/samples/autogen-migration/orchestrations/01_round_robin_group_chat.py +++ b/python/samples/autogen-migration/orchestrations/01_round_robin_group_chat.py @@ -144,7 +144,9 @@ async def check_approval( if last_message and "APPROVED" in last_message.text: await context.yield_output("Content approved.") else: - await context.send_message(AgentExecutorRequest(messages=response.full_conversation, should_respond=True)) + await context.send_message( + AgentExecutorRequest(messages=response.full_conversation, should_respond=True) + ) workflow = ( WorkflowBuilder(start_executor=researcher)