From 93221fd30a56be4054c52088dcd2bd108cbfba9b Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Thu, 27 Nov 2025 19:33:09 +0900 Subject: [PATCH 1/3] Support an autonomous handoff flow. --- .../agent_framework/_workflows/_handoff.py | 158 +++++++++++++++--- .../core/tests/workflow/test_handoff.py | 93 +++++++++++ .../orchestrations/02_selector_group_chat.py | 9 +- .../getting_started/workflows/README.md | 1 + .../agents/group_chat_workflow_as_agent.py | 2 +- .../orchestration/handoff_autonomous.py | 147 ++++++++++++++++ 6 files changed, 386 insertions(+), 24 deletions(-) create mode 100644 python/samples/getting_started/workflows/orchestration/handoff_autonomous.py diff --git a/python/packages/core/agent_framework/_workflows/_handoff.py b/python/packages/core/agent_framework/_workflows/_handoff.py index 054c53f6e3..77a0f3dd91 100644 --- a/python/packages/core/agent_framework/_workflows/_handoff.py +++ b/python/packages/core/agent_framework/_workflows/_handoff.py @@ -4,10 +4,13 @@ The handoff pattern models a coordinator agent that optionally routes control to specialist agents before handing the conversation back to the user. -The flow is intentionally cyclical: +The flow is intentionally cyclical by default: user input -> coordinator -> optional specialist -> request user input -> ... +An autonomous interaction mode can bypass the user input request and continue routing +responses back to agents until a handoff occurs or termination criteria are met. + Key properties: - The entire conversation is maintained and reused on every hop - The coordinator signals a handoff by invoking a tool call that names the specialist @@ -19,7 +22,7 @@ import sys from collections.abc import Awaitable, Callable, Mapping, Sequence from dataclasses import dataclass, field -from typing import Any +from typing import Any, Literal from agent_framework import ( AgentProtocol, @@ -59,8 +62,8 @@ logger = logging.getLogger(__name__) - _HANDOFF_TOOL_PATTERN = re.compile(r"(?:handoff|transfer)[_\s-]*to[_\s-]*(?P[\w-]+)", re.IGNORECASE) +_DEFAULT_AUTONOMOUS_TURN_LIMIT = 50 def _create_handoff_tool(alias: str, description: str | None = None) -> AIFunction[Any, Any]: @@ -291,6 +294,8 @@ def __init__( id: str, handoff_tool_targets: Mapping[str, str] | None = None, return_to_previous: bool = False, + interaction_mode: Literal["human_in_loop", "autonomous"] = "human_in_loop", + autonomous_turn_limit: int | None = None, ) -> None: """Create a coordinator that manages routing between specialists and the user.""" super().__init__(id) @@ -302,6 +307,9 @@ def __init__( self._handoff_tool_targets = {k.lower(): v for k, v in (handoff_tool_targets or {}).items()} self._return_to_previous = return_to_previous self._current_agent_id: str | None = None # Track the current agent handling conversation + self._interaction_mode = interaction_mode + self._autonomous_turn_limit = autonomous_turn_limit + self._autonomous_turns = 0 def _get_author_name(self) -> str: """Get the coordinator name for orchestrator-generated messages.""" @@ -340,6 +348,7 @@ async def handle_agent_response( if target is not None: # Update current agent when handoff occurs self._current_agent_id = target + self._autonomous_turns = 0 logger.info(f"Handoff detected: {source} -> {target}. Routing control to specialist '{target}'.") # Clean tool-related content before sending to next agent @@ -354,17 +363,38 @@ async def handle_agent_response( # Update current agent when they respond without handoff self._current_agent_id = source - logger.info( - f"Agent '{source}' responded without handoff. " - f"Requesting user input. Return-to-previous: {self._return_to_previous}" - ) - if await self._check_termination(): # Clean the output conversation for display cleaned_output = clean_conversation_for_handoff(conversation) await ctx.yield_output(cleaned_output) return + if self._interaction_mode == "autonomous": + self._autonomous_turns += 1 + if self._autonomous_turn_limit is not None and self._autonomous_turns >= self._autonomous_turn_limit: + logger.info( + f"Autonomous turn limit reached ({self._autonomous_turn_limit}). " + "Yielding conversation and stopping." + ) + cleaned_output = clean_conversation_for_handoff(conversation) + await ctx.yield_output(cleaned_output) + return + + # In autonomous mode, agents continue iterating until they invoke a handoff tool + logger.info( + f"Agent '{source}' responded without handoff (turn {self._autonomous_turns}). " + "Continuing autonomous execution." + ) + cleaned = clean_conversation_for_handoff(conversation) + request = AgentExecutorRequest(messages=cleaned, should_respond=True) + await ctx.send_message(request, target_id=source) + return + + logger.info( + f"Agent '{source}' responded without handoff. " + f"Requesting user input. Return-to-previous: {self._return_to_previous}" + ) + # Clean conversation before sending to gateway for user input request # This removes tool messages that shouldn't be shown to users cleaned_for_display = clean_conversation_for_handoff(conversation) @@ -386,6 +416,9 @@ async def handle_user_input( # Update authoritative conversation self._conversation = list(message.full_conversation) + # Reset autonomous turn counter on new user input + self._autonomous_turns = 0 + # Check termination before sending to agent if await self._check_termination(): await ctx.yield_output(list(self._conversation)) @@ -478,11 +511,12 @@ def _snapshot_pattern_metadata(self) -> dict[str, Any]: Returns: Dict containing current agent if return-to-previous is enabled """ + metadata: dict[str, Any] = {} if self._return_to_previous: - return { - "current_agent_id": self._current_agent_id, - } - return {} + metadata["current_agent_id"] = self._current_agent_id + if self._interaction_mode == "autonomous": + metadata["autonomous_turns"] = self._autonomous_turns + return metadata @override def _restore_pattern_metadata(self, metadata: dict[str, Any]) -> None: @@ -495,6 +529,8 @@ def _restore_pattern_metadata(self, metadata: dict[str, Any]) -> None: """ if self._return_to_previous and "current_agent_id" in metadata: self._current_agent_id = metadata["current_agent_id"] + if self._interaction_mode == "autonomous" and "autonomous_turns" in metadata: + self._autonomous_turns = metadata["autonomous_turns"] def _apply_response_metadata(self, conversation: list[ChatMessage], agent_response: AgentRunResponse) -> None: """Merge top-level response metadata into the latest assistant message.""" @@ -604,13 +640,17 @@ class HandoffBuilder: r"""Fluent builder for conversational handoff workflows with coordinator and specialist agents. The handoff pattern enables a coordinator agent to route requests to specialist agents. - A termination condition determines when the workflow should stop requesting input and complete. + Interaction mode controls whether the workflow requests user input after each agent response or + completes autonomously once agents finish responding. A termination condition determines when + the workflow should stop requesting input and complete. Routing Patterns: - **Single-Tier (Default):** Only the coordinator can hand off to specialists. After any specialist + **Single-Tier (Default):** Only the coordinator can hand off to specialists. By default, after any specialist responds, control returns to the user for more input. This creates a cyclical flow: user -> coordinator -> [optional specialist] -> user -> coordinator -> ... + Use `with_interaction_mode("autonomous")` to skip requesting additional user input and yield the + final conversation when an agent responds without delegating. **Multi-Tier (Advanced):** Specialists can hand off to other specialists using `.add_handoff()`. This provides more flexibility for complex workflows but is less controllable than the single-tier @@ -621,13 +661,16 @@ class HandoffBuilder: Key Features: - **Automatic handoff detection**: The coordinator invokes a handoff tool whose - arguments (for example ``{"handoff_to": "shipping_agent"}``) identify the specialist to receive control. + arguments (for example `{"handoff_to": "shipping_agent"}`) identify the specialist to receive control. - **Auto-generated tools**: By default the builder synthesizes `handoff_to_` tools for the coordinator, so you don't manually define placeholder functions. - **Full conversation history**: The entire conversation (including any `ChatMessage.additional_properties`) is preserved and passed to each agent. - **Termination control**: By default, terminates after 10 user messages. Override with `.with_termination_condition(lambda conv: ...)` for custom logic (e.g., detect "goodbye"). + - **Interaction modes**: Choose `human_in_loop` (default) to prompt users between agent turns, + or `autonomous` to continue routing back to agents without prompting for user input until a + handoff occurs or a termination/turn limit is reached (default autonomous turn limit: 50). - **Checkpointing**: Optional persistence for resumable workflows. Usage (Single-Tier): @@ -765,7 +808,7 @@ def __init__( Participants must have stable names/ids because the workflow maps the handoff tool arguments to these identifiers. Agent names should match the strings emitted by the coordinator's handoff tool (e.g., a tool that - outputs ``{\"handoff_to\": \"billing\"}`` requires an agent named ``billing``). + outputs `{\"handoff_to\": \"billing\"}` requires an agent named `billing`). """ self._name = name self._description = description @@ -781,6 +824,8 @@ def __init__( self._auto_register_handoff_tools: bool = True self._handoff_config: dict[str, list[str]] = {} # Maps agent_id -> [target_agent_ids] self._return_to_previous: bool = False + self._interaction_mode: Literal["human_in_loop", "autonomous"] = "human_in_loop" + self._autonomous_turn_limit: int | None = _DEFAULT_AUTONOMOUS_TURN_LIMIT if participants: self.participants(participants) @@ -871,8 +916,10 @@ def set_coordinator(self, agent: str | AgentProtocol | Executor) -> "HandoffBuil 1. Handle the request directly and respond to the user, OR 2. Hand off to a specialist agent by including handoff metadata in the response - After a specialist responds, the workflow automatically returns control to the user, - creating a cyclical flow: user -> coordinator -> [specialist] -> user -> ... + After a specialist responds, the workflow automatically returns control to the user + (default) creating a cyclical flow: user -> coordinator -> [specialist] -> user -> ... + Configure `with_interaction_mode("autonomous")` to continue with the responding agent + without requesting another user turn until a handoff occurs or a termination/turn limit is met. Args: agent: The agent to use as the coordinator. Can be: @@ -899,8 +946,8 @@ def set_coordinator(self, agent: str | AgentProtocol | Executor) -> "HandoffBuil Note: The coordinator determines routing by invoking a handoff tool call whose - arguments identify the target specialist (for example ``{\"handoff_to\": \"billing\"}``). - Decorate the tool with ``approval_mode="always_require"`` to ensure the workflow + arguments identify the target specialist (for example `{\"handoff_to\": \"billing\"}`). + Decorate the tool with `approval_mode="always_require"` to ensure the workflow intercepts the call before execution and can make the transition. """ if not self._executors: @@ -1236,6 +1283,75 @@ async def check_termination(conv: list[ChatMessage]) -> bool: self._termination_condition = condition return self + def with_interaction_mode( + self, + interaction_mode: Literal["human_in_loop", "autonomous"] | None = "human_in_loop", + ) -> "HandoffBuilder": + """Choose whether the workflow requests user input or runs autonomously after agent replies. + + In autonomous mode, agents (including specialists) continue iterating on their task + until they explicitly invoke a handoff tool or the turn limit is reached. This allows + specialists to perform long-running autonomous tasks (e.g., research, coding, analysis) + without prematurely returning control to the coordinator or user. + + Args: + interaction_mode: `"human_in_loop"` (default) requests user input after each agent response + that does not trigger a handoff. `"autonomous"` lets agents continue + working until they invoke a handoff tool or the turn limit is reached + (default: 50, configurable via `with_autonomous_turn_limit`). + `None` resets to the default. + + Returns: + Self for chaining. + + Example: + + .. code-block:: python + + workflow = ( + HandoffBuilder(participants=[coordinator, research_agent]) + .set_coordinator(coordinator) + .add_handoff(coordinator, research_agent) + .add_handoff(research_agent, coordinator) + .with_interaction_mode("autonomous") + .with_autonomous_turn_limit(20) + .build() + ) + + # Flow: User asks a question + # -> Coordinator routes to Research Agent + # -> Research Agent iterates (researches, analyzes, refines) + # -> Research Agent calls handoff_to_coordinator when done + # -> Coordinator provides final response + """ + if interaction_mode is None: + self._interaction_mode = "human_in_loop" + return self + + if interaction_mode not in ("human_in_loop", "autonomous"): + raise ValueError("interaction_mode must be either 'human_in_loop' or 'autonomous'") + self._interaction_mode = interaction_mode + return self + + def with_autonomous_turn_limit(self, limit: int | None) -> "HandoffBuilder": + """Cap agent responses in autonomous mode before yielding the conversation. + + In autonomous mode, agents (including specialists) continue iterating until they + invoke a handoff tool or this turn limit is reached. Default limit is 50 responses + across the workflow; set to None to disable. + + Args: + limit: Maximum number of agent responses before the workflow yields. + Set to None to disable the limit (use with caution). + + Returns: + Self for method chaining. + """ + if limit is not None and limit <= 0: + raise ValueError("autonomous turn limit must be positive when provided") + self._autonomous_turn_limit = limit + return self + def enable_return_to_previous(self, enabled: bool = True) -> "HandoffBuilder": """Enable direct return to the current agent after user input, bypassing the coordinator. @@ -1437,6 +1553,8 @@ def _handoff_orchestrator_factory(_: _GroupChatConfig) -> Executor: id="handoff-coordinator", handoff_tool_targets=handoff_tool_targets, return_to_previous=self._return_to_previous, + interaction_mode=self._interaction_mode, + autonomous_turn_limit=self._autonomous_turn_limit, ) wiring = _GroupChatConfig( diff --git a/python/packages/core/tests/workflow/test_handoff.py b/python/packages/core/tests/workflow/test_handoff.py index 3bbed7681e..51e46374f7 100644 --- a/python/packages/core/tests/workflow/test_handoff.py +++ b/python/packages/core/tests/workflow/test_handoff.py @@ -288,6 +288,99 @@ async def test_tool_call_handoff_detection_with_text_hint(): assert len(specialist.calls[0]) >= 2 +async def test_autonomous_interaction_mode_yields_output_without_user_request(): + """Ensure autonomous interaction mode yields output without requesting user input.""" + triage = _RecordingAgent(name="triage", handoff_to="specialist") + specialist = _RecordingAgent(name="specialist") + + workflow = ( + HandoffBuilder(participants=[triage, specialist]) + .set_coordinator("triage") + .with_interaction_mode("autonomous") + .with_autonomous_turn_limit(1) + .build() + ) + + events = await _drain(workflow.run_stream("Package arrived broken")) + assert len(triage.calls) == 1 + assert len(specialist.calls) == 1 + requests = [ev for ev in events if isinstance(ev, RequestInfoEvent)] + assert not requests, "Autonomous mode should not request additional user input" + + outputs = [ev for ev in events if isinstance(ev, WorkflowOutputEvent)] + assert outputs, "Autonomous mode should yield a workflow output" + + final_conversation = outputs[-1].data + assert isinstance(final_conversation, list) + conversation_list = cast(list[ChatMessage], final_conversation) + assert any( + msg.role == Role.ASSISTANT and (msg.text or "").startswith("specialist reply") for msg in conversation_list + ) + + +async def test_autonomous_continues_without_handoff_until_termination(): + """Autonomous mode should keep invoking the same agent when no handoff occurs.""" + worker = _RecordingAgent(name="worker") + + workflow = ( + HandoffBuilder(participants=[worker]) + .set_coordinator(worker) + .with_interaction_mode("autonomous") + .with_autonomous_turn_limit(3) + .with_termination_condition(lambda conv: False) + .build() + ) + + events = await _drain(workflow.run_stream("Start")) + outputs = [ev for ev in events if isinstance(ev, WorkflowOutputEvent)] + assert outputs, "Autonomous mode should yield output after termination condition" + assert len(worker.calls) == 3, "Worker should be invoked multiple times without user input" + requests = [ev for ev in events if isinstance(ev, RequestInfoEvent)] + assert not requests, "Autonomous mode should not request user input" + + +async def test_autonomous_turn_limit_stops_loop(): + """Autonomous mode should stop when the configured turn limit is reached.""" + worker = _RecordingAgent(name="worker") + + workflow = ( + HandoffBuilder(participants=[worker]) + .set_coordinator(worker) + .with_interaction_mode("autonomous") + .with_autonomous_turn_limit(2) + .with_termination_condition(lambda conv: False) + .build() + ) + + events = await _drain(workflow.run_stream("Start")) + outputs = [ev for ev in events if isinstance(ev, WorkflowOutputEvent)] + assert outputs, "Turn limit should force a workflow output" + assert len(worker.calls) == 2, "Worker should stop after reaching the turn limit" + requests = [ev for ev in events if isinstance(ev, RequestInfoEvent)] + assert not requests, "Autonomous mode should not request user input" + + +async def test_autonomous_routes_back_to_coordinator_when_specialist_stops(): + """Specialist without handoff should route back to coordinator in autonomous mode.""" + triage = _RecordingAgent(name="triage", handoff_to="specialist") + specialist = _RecordingAgent(name="specialist") + + workflow = ( + HandoffBuilder(participants=[triage, specialist]) + .set_coordinator(triage) + .add_handoff(triage, specialist) + .with_interaction_mode("autonomous") + .with_autonomous_turn_limit(3) + .with_termination_condition(lambda conv: len(conv) >= 4) + .build() + ) + + events = await _drain(workflow.run_stream("Issue")) + outputs = [ev for ev in events if isinstance(ev, WorkflowOutputEvent)] + assert outputs, "Workflow should complete without user input" + assert len(specialist.calls) >= 1, "Specialist should run without handoff" + + def test_build_fails_without_coordinator(): """Verify that build() raises ValueError when set_coordinator() was not called.""" triage = _RecordingAgent(name="triage") diff --git a/python/samples/autogen-migration/orchestrations/02_selector_group_chat.py b/python/samples/autogen-migration/orchestrations/02_selector_group_chat.py index 097680a3b7..6c5425f49f 100644 --- a/python/samples/autogen-migration/orchestrations/02_selector_group_chat.py +++ b/python/samples/autogen-migration/orchestrations/02_selector_group_chat.py @@ -86,9 +86,12 @@ async def run_agent_framework() -> None: workflow = ( GroupChatBuilder() .participants([python_expert, javascript_expert, database_expert]) - .set_prompt_based_manager( - chat_client=client, - instructions="Based on the conversation, select the most appropriate expert to respond next.", + .set_manager( + manager=client.create_agent( + name="selector_manager", + instructions="Based on the conversation, select the most appropriate expert to respond next.", + ), + display_name="SelectorManager", ) .with_max_rounds(1) .build() diff --git a/python/samples/getting_started/workflows/README.md b/python/samples/getting_started/workflows/README.md index 4dbeeb6071..710cef2cf5 100644 --- a/python/samples/getting_started/workflows/README.md +++ b/python/samples/getting_started/workflows/README.md @@ -98,6 +98,7 @@ For observability samples in Agent Framework, see the [observability getting sta | Handoff (Simple) | [orchestration/handoff_simple.py](./orchestration/handoff_simple.py) | Single-tier routing: triage agent routes to specialists, control returns to user after each specialist response | | Handoff (Specialist-to-Specialist) | [orchestration/handoff_specialist_to_specialist.py](./orchestration/handoff_specialist_to_specialist.py) | Multi-tier routing: specialists can hand off to other specialists using `.add_handoff()` fluent API | | Handoff (Return-to-Previous) | [orchestration/handoff_return_to_previous.py](./orchestration/handoff_return_to_previous.py) | Return-to-previous routing: after user input, routes back to the previous specialist instead of coordinator using `.enable_return_to_previous()` | +| Handoff (Autonomous) | [orchestration/handoff_autonomous.py](./orchestration/handoff_autonomous.py) | Autonomous mode: specialists iterate independently until invoking a handoff tool using `.with_interaction_mode("autonomous")` | | Magentic Workflow (Multi-Agent) | [orchestration/magentic.py](./orchestration/magentic.py) | Orchestrate multiple agents with Magentic manager and streaming | | Magentic + Human Plan Review | [orchestration/magentic_human_plan_update.py](./orchestration/magentic_human_plan_update.py) | Human reviews/updates the plan before execution | | Magentic + Checkpoint Resume | [orchestration/magentic_checkpoint.py](./orchestration/magentic_checkpoint.py) | Resume Magentic orchestration from saved checkpoints | diff --git a/python/samples/getting_started/workflows/agents/group_chat_workflow_as_agent.py b/python/samples/getting_started/workflows/agents/group_chat_workflow_as_agent.py index ff147df453..c94b3004d8 100644 --- a/python/samples/getting_started/workflows/agents/group_chat_workflow_as_agent.py +++ b/python/samples/getting_started/workflows/agents/group_chat_workflow_as_agent.py @@ -38,7 +38,7 @@ async def main() -> None: workflow = ( GroupChatBuilder() - .set_prompt_based_manager(chat_client=OpenAIChatClient(), display_name="Coordinator") + .set_manager(manager=OpenAIChatClient().create_agent(), display_name="Coordinator") .participants(researcher=researcher, writer=writer) .build() ) diff --git a/python/samples/getting_started/workflows/orchestration/handoff_autonomous.py b/python/samples/getting_started/workflows/orchestration/handoff_autonomous.py new file mode 100644 index 0000000000..9ea16f39c0 --- /dev/null +++ b/python/samples/getting_started/workflows/orchestration/handoff_autonomous.py @@ -0,0 +1,147 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +import logging +from collections.abc import AsyncIterable +from typing import cast + +from agent_framework import ( + AgentRunUpdateEvent, + ChatAgent, + ChatMessage, + HandoffBuilder, + WorkflowEvent, + WorkflowOutputEvent, +) +from agent_framework.azure import AzureOpenAIChatClient +from azure.identity import AzureCliCredential + +logging.basicConfig(level=logging.ERROR) + +"""Sample: Autonomous handoff workflow with agent iteration. + +This sample demonstrates `with_interaction_mode("autonomous")`, where agents continue +iterating on their task until they explicitly invoke a handoff tool. This allows +specialists to perform long-running autonomous work (research, coding, analysis) +without prematurely returning control to the coordinator or user. + +Routing Pattern: + User -> Coordinator -> Specialist (iterates N times) -> Handoff -> Final Output + +Prerequisites: + - `az login` (Azure CLI authentication) + - Environment variables for AzureOpenAIChatClient (AZURE_OPENAI_ENDPOINT, etc.) + +Key Concepts: + - Autonomous interaction mode: agents iterate until they handoff + - Turn limits: use `with_autonomous_turn_limit(N)` to cap total iterations +""" + + +def create_agents( + chat_client: AzureOpenAIChatClient, +) -> tuple[ChatAgent, ChatAgent, ChatAgent]: + """Create coordinator and specialists for autonomous iteration.""" + coordinator = chat_client.create_agent( + instructions=( + "You are a coordinator. Route user requests to either research_agent or summary_agent. " + "Always call exactly one handoff tool with a short routing acknowledgement. " + "If unsure, default to research_agent. Never request information yourself. " + "After a specialist hands off back to you, provide a concise final summary and stop." + ), + name="coordinator", + ) + + research_agent = chat_client.create_agent( + instructions=( + "You are a research specialist that explores topics thoroughly. " + "When given a research task, break it down into multiple aspects and explore each one. " + "Continue your research across multiple responses - don't try to finish everything in one response. " + "After each response, think about what else needs to be explored. " + "When you have covered the topic comprehensively (at least 3-4 different aspects), " + "call the handoff tool to return to coordinator with your findings. " + "Keep each individual response focused on one aspect." + ), + name="research_agent", + ) + + summary_agent = chat_client.create_agent( + instructions=( + "You summarize research findings. Provide a concise, well-organized summary. " + "When done, hand off to coordinator." + ), + name="summary_agent", + ) + + return coordinator, research_agent, summary_agent + + +async def _drain(stream: AsyncIterable[WorkflowEvent]) -> list[WorkflowEvent]: + """Collect all events from an async stream.""" + return [event async for event in stream] + + +def _print_conversation(events: list[WorkflowEvent]) -> None: + """Print the final conversation snapshot from workflow output events.""" + for event in events: + if isinstance(event, AgentRunUpdateEvent): + print(event.data, flush=True, end="") + elif isinstance(event, WorkflowOutputEvent): + conversation = cast(list[ChatMessage], event.data) + print("\n=== Final Conversation (Autonomous with Iteration) ===") + for message in conversation: + speaker = message.author_name or message.role.value + text_preview = message.text[:200] + "..." if len(message.text) > 200 else message.text + print(f"- {speaker}: {text_preview}") + print(f"\nTotal messages: {len(conversation)}") + print("=====================================================") + + +async def main() -> None: + """Run an autonomous handoff workflow with specialist iteration enabled.""" + chat_client = AzureOpenAIChatClient(credential=AzureCliCredential()) + coordinator, research_agent, summary_agent = create_agents(chat_client) + + # Build the workflow with autonomous mode + # In autonomous mode, agents continue iterating until they invoke a handoff tool + workflow = ( + HandoffBuilder( + name="autonomous_iteration_handoff", + participants=[coordinator, research_agent, summary_agent], + ) + .set_coordinator(coordinator) + .add_handoff(coordinator, [research_agent, summary_agent]) + .add_handoff(research_agent, coordinator) # Research can hand back to coordinator + .add_handoff(summary_agent, coordinator) + .with_interaction_mode("autonomous") + .with_autonomous_turn_limit(15) # Limit total turns to prevent runaway + .with_termination_condition( + # Terminate after coordinator provides 5 assistant responses + lambda conv: sum(1 for msg in conv if msg.author_name == "coordinator" and msg.role.value == "assistant") + >= 5 + ) + .build() + ) + + initial_request = "Research the key benefits and challenges of renewable energy adoption." + print("Initial request:", initial_request) + print("\nExpecting multiple iterations from the research agent...\n") + + events = await _drain(workflow.run_stream(initial_request)) + _print_conversation(events) + + """ + Expected behavior: + - Coordinator routes to research_agent. + - Research agent iterates multiple times, exploring different aspects of renewable energy. + - Each iteration adds to the conversation without returning to coordinator. + - After thorough research, research_agent calls handoff to coordinator. + - Coordinator provides final summary. + + In autonomous mode, agents continue working until they invoke a handoff tool, + allowing the research_agent to perform 3-4+ responses before handing off. + """ + + +if __name__ == "__main__": + asyncio.run(main()) From c46cde8c1101e207c025213bb0c7852a63f0dda1 Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Tue, 2 Dec 2025 10:34:29 +0900 Subject: [PATCH 2/3] Simplify public API --- .../agent_framework/_workflows/_handoff.py | 25 +++++++++-- .../core/tests/workflow/test_handoff.py | 43 +++++++++++++++++++ .../getting_started/workflows/README.md | 2 +- .../orchestration/handoff_autonomous.py | 5 +-- 4 files changed, 67 insertions(+), 8 deletions(-) diff --git a/python/packages/core/agent_framework/_workflows/_handoff.py b/python/packages/core/agent_framework/_workflows/_handoff.py index 77a0f3dd91..66ea76936f 100644 --- a/python/packages/core/agent_framework/_workflows/_handoff.py +++ b/python/packages/core/agent_framework/_workflows/_handoff.py @@ -1286,6 +1286,8 @@ async def check_termination(conv: list[ChatMessage]) -> bool: def with_interaction_mode( self, interaction_mode: Literal["human_in_loop", "autonomous"] | None = "human_in_loop", + *, + autonomous_turn_limit: int | None = None, ) -> "HandoffBuilder": """Choose whether the workflow requests user input or runs autonomously after agent replies. @@ -1297,9 +1299,13 @@ def with_interaction_mode( Args: interaction_mode: `"human_in_loop"` (default) requests user input after each agent response that does not trigger a handoff. `"autonomous"` lets agents continue - working until they invoke a handoff tool or the turn limit is reached - (default: 50, configurable via `with_autonomous_turn_limit`). + working until they invoke a handoff tool or the turn limit is reached. `None` resets to the default. + autonomous_turn_limit: Maximum number of agent responses before the workflow yields + when in autonomous mode. Only applicable when interaction_mode + is `"autonomous"`. Default is 50. Set to `None` to disable + the limit (use with caution). Raises `ValueError` if provided + when interaction_mode is `"human_in_loop"`. Returns: Self for chaining. @@ -1313,8 +1319,7 @@ def with_interaction_mode( .set_coordinator(coordinator) .add_handoff(coordinator, research_agent) .add_handoff(research_agent, coordinator) - .with_interaction_mode("autonomous") - .with_autonomous_turn_limit(20) + .with_interaction_mode("autonomous", autonomous_turn_limit=20) .build() ) @@ -1331,11 +1336,23 @@ def with_interaction_mode( if interaction_mode not in ("human_in_loop", "autonomous"): raise ValueError("interaction_mode must be either 'human_in_loop' or 'autonomous'") self._interaction_mode = interaction_mode + + if autonomous_turn_limit is not None: + if interaction_mode != "autonomous": + raise ValueError("autonomous_turn_limit can only be set when interaction_mode is 'autonomous'") + if autonomous_turn_limit <= 0: + raise ValueError("autonomous_turn_limit must be positive when provided") + self._autonomous_turn_limit = autonomous_turn_limit + return self def with_autonomous_turn_limit(self, limit: int | None) -> "HandoffBuilder": """Cap agent responses in autonomous mode before yielding the conversation. + Note: + This method is provided for backward compatibility. Prefer using + `with_interaction_mode("autonomous", autonomous_turn_limit=N)` instead. + In autonomous mode, agents (including specialists) continue iterating until they invoke a handoff tool or this turn limit is reached. Default limit is 50 responses across the workflow; set to None to disable. diff --git a/python/packages/core/tests/workflow/test_handoff.py b/python/packages/core/tests/workflow/test_handoff.py index 51e46374f7..542231c452 100644 --- a/python/packages/core/tests/workflow/test_handoff.py +++ b/python/packages/core/tests/workflow/test_handoff.py @@ -381,6 +381,49 @@ async def test_autonomous_routes_back_to_coordinator_when_specialist_stops(): assert len(specialist.calls) >= 1, "Specialist should run without handoff" +async def test_autonomous_mode_with_inline_turn_limit(): + """Autonomous mode should respect turn limit passed via with_interaction_mode.""" + worker = _RecordingAgent(name="worker") + + workflow = ( + HandoffBuilder(participants=[worker]) + .set_coordinator(worker) + .with_interaction_mode("autonomous", autonomous_turn_limit=2) + .with_termination_condition(lambda conv: False) + .build() + ) + + events = await _drain(workflow.run_stream("Start")) + outputs = [ev for ev in events if isinstance(ev, WorkflowOutputEvent)] + assert outputs, "Turn limit should force a workflow output" + assert len(worker.calls) == 2, "Worker should stop after reaching the inline turn limit" + + +def test_autonomous_turn_limit_requires_autonomous_mode(): + """Verify that autonomous_turn_limit raises an error when mode is human_in_loop.""" + worker = _RecordingAgent(name="worker") + + with pytest.raises(ValueError, match="autonomous_turn_limit can only be set when interaction_mode is 'autonomous'"): + HandoffBuilder(participants=[worker]).set_coordinator(worker).with_interaction_mode( + "human_in_loop", autonomous_turn_limit=10 + ) + + +def test_autonomous_turn_limit_must_be_positive(): + """Verify that autonomous_turn_limit raises an error when <= 0.""" + worker = _RecordingAgent(name="worker") + + with pytest.raises(ValueError, match="autonomous_turn_limit must be positive"): + HandoffBuilder(participants=[worker]).set_coordinator(worker).with_interaction_mode( + "autonomous", autonomous_turn_limit=0 + ) + + with pytest.raises(ValueError, match="autonomous_turn_limit must be positive"): + HandoffBuilder(participants=[worker]).set_coordinator(worker).with_interaction_mode( + "autonomous", autonomous_turn_limit=-5 + ) + + def test_build_fails_without_coordinator(): """Verify that build() raises ValueError when set_coordinator() was not called.""" triage = _RecordingAgent(name="triage") diff --git a/python/samples/getting_started/workflows/README.md b/python/samples/getting_started/workflows/README.md index 710cef2cf5..2dece5ba76 100644 --- a/python/samples/getting_started/workflows/README.md +++ b/python/samples/getting_started/workflows/README.md @@ -98,7 +98,7 @@ For observability samples in Agent Framework, see the [observability getting sta | Handoff (Simple) | [orchestration/handoff_simple.py](./orchestration/handoff_simple.py) | Single-tier routing: triage agent routes to specialists, control returns to user after each specialist response | | Handoff (Specialist-to-Specialist) | [orchestration/handoff_specialist_to_specialist.py](./orchestration/handoff_specialist_to_specialist.py) | Multi-tier routing: specialists can hand off to other specialists using `.add_handoff()` fluent API | | Handoff (Return-to-Previous) | [orchestration/handoff_return_to_previous.py](./orchestration/handoff_return_to_previous.py) | Return-to-previous routing: after user input, routes back to the previous specialist instead of coordinator using `.enable_return_to_previous()` | -| Handoff (Autonomous) | [orchestration/handoff_autonomous.py](./orchestration/handoff_autonomous.py) | Autonomous mode: specialists iterate independently until invoking a handoff tool using `.with_interaction_mode("autonomous")` | +| Handoff (Autonomous) | [orchestration/handoff_autonomous.py](./orchestration/handoff_autonomous.py) | Autonomous mode: specialists iterate independently until invoking a handoff tool using `.with_interaction_mode("autonomous", autonomous_turn_limit=N)` | | Magentic Workflow (Multi-Agent) | [orchestration/magentic.py](./orchestration/magentic.py) | Orchestrate multiple agents with Magentic manager and streaming | | Magentic + Human Plan Review | [orchestration/magentic_human_plan_update.py](./orchestration/magentic_human_plan_update.py) | Human reviews/updates the plan before execution | | Magentic + Checkpoint Resume | [orchestration/magentic_checkpoint.py](./orchestration/magentic_checkpoint.py) | Resume Magentic orchestration from saved checkpoints | diff --git a/python/samples/getting_started/workflows/orchestration/handoff_autonomous.py b/python/samples/getting_started/workflows/orchestration/handoff_autonomous.py index 9ea16f39c0..68bf789083 100644 --- a/python/samples/getting_started/workflows/orchestration/handoff_autonomous.py +++ b/python/samples/getting_started/workflows/orchestration/handoff_autonomous.py @@ -34,7 +34,7 @@ Key Concepts: - Autonomous interaction mode: agents iterate until they handoff - - Turn limits: use `with_autonomous_turn_limit(N)` to cap total iterations + - Turn limits: use `with_interaction_mode("autonomous", autonomous_turn_limit=N)` to cap total iterations """ @@ -113,8 +113,7 @@ async def main() -> None: .add_handoff(coordinator, [research_agent, summary_agent]) .add_handoff(research_agent, coordinator) # Research can hand back to coordinator .add_handoff(summary_agent, coordinator) - .with_interaction_mode("autonomous") - .with_autonomous_turn_limit(15) # Limit total turns to prevent runaway + .with_interaction_mode("autonomous", autonomous_turn_limit=15) .with_termination_condition( # Terminate after coordinator provides 5 assistant responses lambda conv: sum(1 for msg in conv if msg.author_name == "coordinator" and msg.role.value == "assistant") From 5d22f74f61d6e63989a13f8a58784c37ae3fe25c Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Thu, 4 Dec 2025 10:20:31 +0900 Subject: [PATCH 3/3] Address feedback --- .../agent_framework/_workflows/_handoff.py | 50 ++++++------------- .../core/tests/workflow/test_handoff.py | 26 +++++----- 2 files changed, 26 insertions(+), 50 deletions(-) diff --git a/python/packages/core/agent_framework/_workflows/_handoff.py b/python/packages/core/agent_framework/_workflows/_handoff.py index 66ea76936f..0a3608d2e6 100644 --- a/python/packages/core/agent_framework/_workflows/_handoff.py +++ b/python/packages/core/agent_framework/_workflows/_handoff.py @@ -1285,7 +1285,7 @@ async def check_termination(conv: list[ChatMessage]) -> bool: def with_interaction_mode( self, - interaction_mode: Literal["human_in_loop", "autonomous"] | None = "human_in_loop", + interaction_mode: Literal["human_in_loop", "autonomous"] = "human_in_loop", *, autonomous_turn_limit: int | None = None, ) -> "HandoffBuilder": @@ -1300,12 +1300,13 @@ def with_interaction_mode( interaction_mode: `"human_in_loop"` (default) requests user input after each agent response that does not trigger a handoff. `"autonomous"` lets agents continue working until they invoke a handoff tool or the turn limit is reached. - `None` resets to the default. + + Keyword Args: autonomous_turn_limit: Maximum number of agent responses before the workflow yields - when in autonomous mode. Only applicable when interaction_mode - is `"autonomous"`. Default is 50. Set to `None` to disable - the limit (use with caution). Raises `ValueError` if provided - when interaction_mode is `"human_in_loop"`. + when in autonomous mode. Only applicable when interaction_mode + is `"autonomous"`. Default is 50. Set to `None` to disable + the limit (use with caution). Ignored with a warning if provided + when interaction_mode is `"human_in_loop"`. Returns: Self for chaining. @@ -1329,44 +1330,21 @@ def with_interaction_mode( # -> Research Agent calls handoff_to_coordinator when done # -> Coordinator provides final response """ - if interaction_mode is None: - self._interaction_mode = "human_in_loop" - return self - if interaction_mode not in ("human_in_loop", "autonomous"): raise ValueError("interaction_mode must be either 'human_in_loop' or 'autonomous'") self._interaction_mode = interaction_mode if autonomous_turn_limit is not None: if interaction_mode != "autonomous": - raise ValueError("autonomous_turn_limit can only be set when interaction_mode is 'autonomous'") - if autonomous_turn_limit <= 0: + logger.warning( + f"autonomous_turn_limit={autonomous_turn_limit} was provided but interaction_mode is " + f"'{interaction_mode}'; ignoring." + ) + elif autonomous_turn_limit <= 0: raise ValueError("autonomous_turn_limit must be positive when provided") - self._autonomous_turn_limit = autonomous_turn_limit - - return self - - def with_autonomous_turn_limit(self, limit: int | None) -> "HandoffBuilder": - """Cap agent responses in autonomous mode before yielding the conversation. - - Note: - This method is provided for backward compatibility. Prefer using - `with_interaction_mode("autonomous", autonomous_turn_limit=N)` instead. - - In autonomous mode, agents (including specialists) continue iterating until they - invoke a handoff tool or this turn limit is reached. Default limit is 50 responses - across the workflow; set to None to disable. - - Args: - limit: Maximum number of agent responses before the workflow yields. - Set to None to disable the limit (use with caution). + else: + self._autonomous_turn_limit = autonomous_turn_limit - Returns: - Self for method chaining. - """ - if limit is not None and limit <= 0: - raise ValueError("autonomous turn limit must be positive when provided") - self._autonomous_turn_limit = limit return self def enable_return_to_previous(self, enabled: bool = True) -> "HandoffBuilder": diff --git a/python/packages/core/tests/workflow/test_handoff.py b/python/packages/core/tests/workflow/test_handoff.py index 542231c452..771abdf325 100644 --- a/python/packages/core/tests/workflow/test_handoff.py +++ b/python/packages/core/tests/workflow/test_handoff.py @@ -296,8 +296,7 @@ async def test_autonomous_interaction_mode_yields_output_without_user_request(): workflow = ( HandoffBuilder(participants=[triage, specialist]) .set_coordinator("triage") - .with_interaction_mode("autonomous") - .with_autonomous_turn_limit(1) + .with_interaction_mode("autonomous", autonomous_turn_limit=1) .build() ) @@ -325,8 +324,7 @@ async def test_autonomous_continues_without_handoff_until_termination(): workflow = ( HandoffBuilder(participants=[worker]) .set_coordinator(worker) - .with_interaction_mode("autonomous") - .with_autonomous_turn_limit(3) + .with_interaction_mode("autonomous", autonomous_turn_limit=3) .with_termination_condition(lambda conv: False) .build() ) @@ -346,8 +344,7 @@ async def test_autonomous_turn_limit_stops_loop(): workflow = ( HandoffBuilder(participants=[worker]) .set_coordinator(worker) - .with_interaction_mode("autonomous") - .with_autonomous_turn_limit(2) + .with_interaction_mode("autonomous", autonomous_turn_limit=2) .with_termination_condition(lambda conv: False) .build() ) @@ -369,8 +366,7 @@ async def test_autonomous_routes_back_to_coordinator_when_specialist_stops(): HandoffBuilder(participants=[triage, specialist]) .set_coordinator(triage) .add_handoff(triage, specialist) - .with_interaction_mode("autonomous") - .with_autonomous_turn_limit(3) + .with_interaction_mode("autonomous", autonomous_turn_limit=3) .with_termination_condition(lambda conv: len(conv) >= 4) .build() ) @@ -399,14 +395,16 @@ async def test_autonomous_mode_with_inline_turn_limit(): assert len(worker.calls) == 2, "Worker should stop after reaching the inline turn limit" -def test_autonomous_turn_limit_requires_autonomous_mode(): - """Verify that autonomous_turn_limit raises an error when mode is human_in_loop.""" +def test_autonomous_turn_limit_ignored_in_human_in_loop_mode(caplog): + """Verify that autonomous_turn_limit logs a warning when mode is human_in_loop.""" worker = _RecordingAgent(name="worker") - with pytest.raises(ValueError, match="autonomous_turn_limit can only be set when interaction_mode is 'autonomous'"): - HandoffBuilder(participants=[worker]).set_coordinator(worker).with_interaction_mode( - "human_in_loop", autonomous_turn_limit=10 - ) + # Should not raise, but should log a warning + HandoffBuilder(participants=[worker]).set_coordinator(worker).with_interaction_mode( + "human_in_loop", autonomous_turn_limit=10 + ) + + assert "autonomous_turn_limit=10 was provided but interaction_mode is 'human_in_loop'; ignoring." in caplog.text def test_autonomous_turn_limit_must_be_positive():