diff --git a/python/packages/core/agent_framework/_workflows/_handoff.py b/python/packages/core/agent_framework/_workflows/_handoff.py index 054c53f6e3..0a3608d2e6 100644 --- a/python/packages/core/agent_framework/_workflows/_handoff.py +++ b/python/packages/core/agent_framework/_workflows/_handoff.py @@ -4,10 +4,13 @@ The handoff pattern models a coordinator agent that optionally routes control to specialist agents before handing the conversation back to the user. -The flow is intentionally cyclical: +The flow is intentionally cyclical by default: user input -> coordinator -> optional specialist -> request user input -> ... +An autonomous interaction mode can bypass the user input request and continue routing +responses back to agents until a handoff occurs or termination criteria are met. + Key properties: - The entire conversation is maintained and reused on every hop - The coordinator signals a handoff by invoking a tool call that names the specialist @@ -19,7 +22,7 @@ import sys from collections.abc import Awaitable, Callable, Mapping, Sequence from dataclasses import dataclass, field -from typing import Any +from typing import Any, Literal from agent_framework import ( AgentProtocol, @@ -59,8 +62,8 @@ logger = logging.getLogger(__name__) - _HANDOFF_TOOL_PATTERN = re.compile(r"(?:handoff|transfer)[_\s-]*to[_\s-]*(?P[\w-]+)", re.IGNORECASE) +_DEFAULT_AUTONOMOUS_TURN_LIMIT = 50 def _create_handoff_tool(alias: str, description: str | None = None) -> AIFunction[Any, Any]: @@ -291,6 +294,8 @@ def __init__( id: str, handoff_tool_targets: Mapping[str, str] | None = None, return_to_previous: bool = False, + interaction_mode: Literal["human_in_loop", "autonomous"] = "human_in_loop", + autonomous_turn_limit: int | None = None, ) -> None: """Create a coordinator that manages routing between specialists and the user.""" super().__init__(id) @@ -302,6 +307,9 @@ def __init__( self._handoff_tool_targets = {k.lower(): v for k, v in (handoff_tool_targets or {}).items()} self._return_to_previous = return_to_previous self._current_agent_id: str | None = None # Track the current agent handling conversation + self._interaction_mode = interaction_mode + self._autonomous_turn_limit = autonomous_turn_limit + self._autonomous_turns = 0 def _get_author_name(self) -> str: """Get the coordinator name for orchestrator-generated messages.""" @@ -340,6 +348,7 @@ async def handle_agent_response( if target is not None: # Update current agent when handoff occurs self._current_agent_id = target + self._autonomous_turns = 0 logger.info(f"Handoff detected: {source} -> {target}. Routing control to specialist '{target}'.") # Clean tool-related content before sending to next agent @@ -354,17 +363,38 @@ async def handle_agent_response( # Update current agent when they respond without handoff self._current_agent_id = source - logger.info( - f"Agent '{source}' responded without handoff. " - f"Requesting user input. Return-to-previous: {self._return_to_previous}" - ) - if await self._check_termination(): # Clean the output conversation for display cleaned_output = clean_conversation_for_handoff(conversation) await ctx.yield_output(cleaned_output) return + if self._interaction_mode == "autonomous": + self._autonomous_turns += 1 + if self._autonomous_turn_limit is not None and self._autonomous_turns >= self._autonomous_turn_limit: + logger.info( + f"Autonomous turn limit reached ({self._autonomous_turn_limit}). " + "Yielding conversation and stopping." + ) + cleaned_output = clean_conversation_for_handoff(conversation) + await ctx.yield_output(cleaned_output) + return + + # In autonomous mode, agents continue iterating until they invoke a handoff tool + logger.info( + f"Agent '{source}' responded without handoff (turn {self._autonomous_turns}). " + "Continuing autonomous execution." + ) + cleaned = clean_conversation_for_handoff(conversation) + request = AgentExecutorRequest(messages=cleaned, should_respond=True) + await ctx.send_message(request, target_id=source) + return + + logger.info( + f"Agent '{source}' responded without handoff. " + f"Requesting user input. Return-to-previous: {self._return_to_previous}" + ) + # Clean conversation before sending to gateway for user input request # This removes tool messages that shouldn't be shown to users cleaned_for_display = clean_conversation_for_handoff(conversation) @@ -386,6 +416,9 @@ async def handle_user_input( # Update authoritative conversation self._conversation = list(message.full_conversation) + # Reset autonomous turn counter on new user input + self._autonomous_turns = 0 + # Check termination before sending to agent if await self._check_termination(): await ctx.yield_output(list(self._conversation)) @@ -478,11 +511,12 @@ def _snapshot_pattern_metadata(self) -> dict[str, Any]: Returns: Dict containing current agent if return-to-previous is enabled """ + metadata: dict[str, Any] = {} if self._return_to_previous: - return { - "current_agent_id": self._current_agent_id, - } - return {} + metadata["current_agent_id"] = self._current_agent_id + if self._interaction_mode == "autonomous": + metadata["autonomous_turns"] = self._autonomous_turns + return metadata @override def _restore_pattern_metadata(self, metadata: dict[str, Any]) -> None: @@ -495,6 +529,8 @@ def _restore_pattern_metadata(self, metadata: dict[str, Any]) -> None: """ if self._return_to_previous and "current_agent_id" in metadata: self._current_agent_id = metadata["current_agent_id"] + if self._interaction_mode == "autonomous" and "autonomous_turns" in metadata: + self._autonomous_turns = metadata["autonomous_turns"] def _apply_response_metadata(self, conversation: list[ChatMessage], agent_response: AgentRunResponse) -> None: """Merge top-level response metadata into the latest assistant message.""" @@ -604,13 +640,17 @@ class HandoffBuilder: r"""Fluent builder for conversational handoff workflows with coordinator and specialist agents. The handoff pattern enables a coordinator agent to route requests to specialist agents. - A termination condition determines when the workflow should stop requesting input and complete. + Interaction mode controls whether the workflow requests user input after each agent response or + completes autonomously once agents finish responding. A termination condition determines when + the workflow should stop requesting input and complete. Routing Patterns: - **Single-Tier (Default):** Only the coordinator can hand off to specialists. After any specialist + **Single-Tier (Default):** Only the coordinator can hand off to specialists. By default, after any specialist responds, control returns to the user for more input. This creates a cyclical flow: user -> coordinator -> [optional specialist] -> user -> coordinator -> ... + Use `with_interaction_mode("autonomous")` to skip requesting additional user input and yield the + final conversation when an agent responds without delegating. **Multi-Tier (Advanced):** Specialists can hand off to other specialists using `.add_handoff()`. This provides more flexibility for complex workflows but is less controllable than the single-tier @@ -621,13 +661,16 @@ class HandoffBuilder: Key Features: - **Automatic handoff detection**: The coordinator invokes a handoff tool whose - arguments (for example ``{"handoff_to": "shipping_agent"}``) identify the specialist to receive control. + arguments (for example `{"handoff_to": "shipping_agent"}`) identify the specialist to receive control. - **Auto-generated tools**: By default the builder synthesizes `handoff_to_` tools for the coordinator, so you don't manually define placeholder functions. - **Full conversation history**: The entire conversation (including any `ChatMessage.additional_properties`) is preserved and passed to each agent. - **Termination control**: By default, terminates after 10 user messages. Override with `.with_termination_condition(lambda conv: ...)` for custom logic (e.g., detect "goodbye"). + - **Interaction modes**: Choose `human_in_loop` (default) to prompt users between agent turns, + or `autonomous` to continue routing back to agents without prompting for user input until a + handoff occurs or a termination/turn limit is reached (default autonomous turn limit: 50). - **Checkpointing**: Optional persistence for resumable workflows. Usage (Single-Tier): @@ -765,7 +808,7 @@ def __init__( Participants must have stable names/ids because the workflow maps the handoff tool arguments to these identifiers. Agent names should match the strings emitted by the coordinator's handoff tool (e.g., a tool that - outputs ``{\"handoff_to\": \"billing\"}`` requires an agent named ``billing``). + outputs `{\"handoff_to\": \"billing\"}` requires an agent named `billing`). """ self._name = name self._description = description @@ -781,6 +824,8 @@ def __init__( self._auto_register_handoff_tools: bool = True self._handoff_config: dict[str, list[str]] = {} # Maps agent_id -> [target_agent_ids] self._return_to_previous: bool = False + self._interaction_mode: Literal["human_in_loop", "autonomous"] = "human_in_loop" + self._autonomous_turn_limit: int | None = _DEFAULT_AUTONOMOUS_TURN_LIMIT if participants: self.participants(participants) @@ -871,8 +916,10 @@ def set_coordinator(self, agent: str | AgentProtocol | Executor) -> "HandoffBuil 1. Handle the request directly and respond to the user, OR 2. Hand off to a specialist agent by including handoff metadata in the response - After a specialist responds, the workflow automatically returns control to the user, - creating a cyclical flow: user -> coordinator -> [specialist] -> user -> ... + After a specialist responds, the workflow automatically returns control to the user + (default) creating a cyclical flow: user -> coordinator -> [specialist] -> user -> ... + Configure `with_interaction_mode("autonomous")` to continue with the responding agent + without requesting another user turn until a handoff occurs or a termination/turn limit is met. Args: agent: The agent to use as the coordinator. Can be: @@ -899,8 +946,8 @@ def set_coordinator(self, agent: str | AgentProtocol | Executor) -> "HandoffBuil Note: The coordinator determines routing by invoking a handoff tool call whose - arguments identify the target specialist (for example ``{\"handoff_to\": \"billing\"}``). - Decorate the tool with ``approval_mode="always_require"`` to ensure the workflow + arguments identify the target specialist (for example `{\"handoff_to\": \"billing\"}`). + Decorate the tool with `approval_mode="always_require"` to ensure the workflow intercepts the call before execution and can make the transition. """ if not self._executors: @@ -1236,6 +1283,70 @@ async def check_termination(conv: list[ChatMessage]) -> bool: self._termination_condition = condition return self + def with_interaction_mode( + self, + interaction_mode: Literal["human_in_loop", "autonomous"] = "human_in_loop", + *, + autonomous_turn_limit: int | None = None, + ) -> "HandoffBuilder": + """Choose whether the workflow requests user input or runs autonomously after agent replies. + + In autonomous mode, agents (including specialists) continue iterating on their task + until they explicitly invoke a handoff tool or the turn limit is reached. This allows + specialists to perform long-running autonomous tasks (e.g., research, coding, analysis) + without prematurely returning control to the coordinator or user. + + Args: + interaction_mode: `"human_in_loop"` (default) requests user input after each agent response + that does not trigger a handoff. `"autonomous"` lets agents continue + working until they invoke a handoff tool or the turn limit is reached. + + Keyword Args: + autonomous_turn_limit: Maximum number of agent responses before the workflow yields + when in autonomous mode. Only applicable when interaction_mode + is `"autonomous"`. Default is 50. Set to `None` to disable + the limit (use with caution). Ignored with a warning if provided + when interaction_mode is `"human_in_loop"`. + + Returns: + Self for chaining. + + Example: + + .. code-block:: python + + workflow = ( + HandoffBuilder(participants=[coordinator, research_agent]) + .set_coordinator(coordinator) + .add_handoff(coordinator, research_agent) + .add_handoff(research_agent, coordinator) + .with_interaction_mode("autonomous", autonomous_turn_limit=20) + .build() + ) + + # Flow: User asks a question + # -> Coordinator routes to Research Agent + # -> Research Agent iterates (researches, analyzes, refines) + # -> Research Agent calls handoff_to_coordinator when done + # -> Coordinator provides final response + """ + if interaction_mode not in ("human_in_loop", "autonomous"): + raise ValueError("interaction_mode must be either 'human_in_loop' or 'autonomous'") + self._interaction_mode = interaction_mode + + if autonomous_turn_limit is not None: + if interaction_mode != "autonomous": + logger.warning( + f"autonomous_turn_limit={autonomous_turn_limit} was provided but interaction_mode is " + f"'{interaction_mode}'; ignoring." + ) + elif autonomous_turn_limit <= 0: + raise ValueError("autonomous_turn_limit must be positive when provided") + else: + self._autonomous_turn_limit = autonomous_turn_limit + + return self + def enable_return_to_previous(self, enabled: bool = True) -> "HandoffBuilder": """Enable direct return to the current agent after user input, bypassing the coordinator. @@ -1437,6 +1548,8 @@ def _handoff_orchestrator_factory(_: _GroupChatConfig) -> Executor: id="handoff-coordinator", handoff_tool_targets=handoff_tool_targets, return_to_previous=self._return_to_previous, + interaction_mode=self._interaction_mode, + autonomous_turn_limit=self._autonomous_turn_limit, ) wiring = _GroupChatConfig( diff --git a/python/packages/core/tests/workflow/test_handoff.py b/python/packages/core/tests/workflow/test_handoff.py index 3bbed7681e..771abdf325 100644 --- a/python/packages/core/tests/workflow/test_handoff.py +++ b/python/packages/core/tests/workflow/test_handoff.py @@ -288,6 +288,140 @@ async def test_tool_call_handoff_detection_with_text_hint(): assert len(specialist.calls[0]) >= 2 +async def test_autonomous_interaction_mode_yields_output_without_user_request(): + """Ensure autonomous interaction mode yields output without requesting user input.""" + triage = _RecordingAgent(name="triage", handoff_to="specialist") + specialist = _RecordingAgent(name="specialist") + + workflow = ( + HandoffBuilder(participants=[triage, specialist]) + .set_coordinator("triage") + .with_interaction_mode("autonomous", autonomous_turn_limit=1) + .build() + ) + + events = await _drain(workflow.run_stream("Package arrived broken")) + assert len(triage.calls) == 1 + assert len(specialist.calls) == 1 + requests = [ev for ev in events if isinstance(ev, RequestInfoEvent)] + assert not requests, "Autonomous mode should not request additional user input" + + outputs = [ev for ev in events if isinstance(ev, WorkflowOutputEvent)] + assert outputs, "Autonomous mode should yield a workflow output" + + final_conversation = outputs[-1].data + assert isinstance(final_conversation, list) + conversation_list = cast(list[ChatMessage], final_conversation) + assert any( + msg.role == Role.ASSISTANT and (msg.text or "").startswith("specialist reply") for msg in conversation_list + ) + + +async def test_autonomous_continues_without_handoff_until_termination(): + """Autonomous mode should keep invoking the same agent when no handoff occurs.""" + worker = _RecordingAgent(name="worker") + + workflow = ( + HandoffBuilder(participants=[worker]) + .set_coordinator(worker) + .with_interaction_mode("autonomous", autonomous_turn_limit=3) + .with_termination_condition(lambda conv: False) + .build() + ) + + events = await _drain(workflow.run_stream("Start")) + outputs = [ev for ev in events if isinstance(ev, WorkflowOutputEvent)] + assert outputs, "Autonomous mode should yield output after termination condition" + assert len(worker.calls) == 3, "Worker should be invoked multiple times without user input" + requests = [ev for ev in events if isinstance(ev, RequestInfoEvent)] + assert not requests, "Autonomous mode should not request user input" + + +async def test_autonomous_turn_limit_stops_loop(): + """Autonomous mode should stop when the configured turn limit is reached.""" + worker = _RecordingAgent(name="worker") + + workflow = ( + HandoffBuilder(participants=[worker]) + .set_coordinator(worker) + .with_interaction_mode("autonomous", autonomous_turn_limit=2) + .with_termination_condition(lambda conv: False) + .build() + ) + + events = await _drain(workflow.run_stream("Start")) + outputs = [ev for ev in events if isinstance(ev, WorkflowOutputEvent)] + assert outputs, "Turn limit should force a workflow output" + assert len(worker.calls) == 2, "Worker should stop after reaching the turn limit" + requests = [ev for ev in events if isinstance(ev, RequestInfoEvent)] + assert not requests, "Autonomous mode should not request user input" + + +async def test_autonomous_routes_back_to_coordinator_when_specialist_stops(): + """Specialist without handoff should route back to coordinator in autonomous mode.""" + triage = _RecordingAgent(name="triage", handoff_to="specialist") + specialist = _RecordingAgent(name="specialist") + + workflow = ( + HandoffBuilder(participants=[triage, specialist]) + .set_coordinator(triage) + .add_handoff(triage, specialist) + .with_interaction_mode("autonomous", autonomous_turn_limit=3) + .with_termination_condition(lambda conv: len(conv) >= 4) + .build() + ) + + events = await _drain(workflow.run_stream("Issue")) + outputs = [ev for ev in events if isinstance(ev, WorkflowOutputEvent)] + assert outputs, "Workflow should complete without user input" + assert len(specialist.calls) >= 1, "Specialist should run without handoff" + + +async def test_autonomous_mode_with_inline_turn_limit(): + """Autonomous mode should respect turn limit passed via with_interaction_mode.""" + worker = _RecordingAgent(name="worker") + + workflow = ( + HandoffBuilder(participants=[worker]) + .set_coordinator(worker) + .with_interaction_mode("autonomous", autonomous_turn_limit=2) + .with_termination_condition(lambda conv: False) + .build() + ) + + events = await _drain(workflow.run_stream("Start")) + outputs = [ev for ev in events if isinstance(ev, WorkflowOutputEvent)] + assert outputs, "Turn limit should force a workflow output" + assert len(worker.calls) == 2, "Worker should stop after reaching the inline turn limit" + + +def test_autonomous_turn_limit_ignored_in_human_in_loop_mode(caplog): + """Verify that autonomous_turn_limit logs a warning when mode is human_in_loop.""" + worker = _RecordingAgent(name="worker") + + # Should not raise, but should log a warning + HandoffBuilder(participants=[worker]).set_coordinator(worker).with_interaction_mode( + "human_in_loop", autonomous_turn_limit=10 + ) + + assert "autonomous_turn_limit=10 was provided but interaction_mode is 'human_in_loop'; ignoring." in caplog.text + + +def test_autonomous_turn_limit_must_be_positive(): + """Verify that autonomous_turn_limit raises an error when <= 0.""" + worker = _RecordingAgent(name="worker") + + with pytest.raises(ValueError, match="autonomous_turn_limit must be positive"): + HandoffBuilder(participants=[worker]).set_coordinator(worker).with_interaction_mode( + "autonomous", autonomous_turn_limit=0 + ) + + with pytest.raises(ValueError, match="autonomous_turn_limit must be positive"): + HandoffBuilder(participants=[worker]).set_coordinator(worker).with_interaction_mode( + "autonomous", autonomous_turn_limit=-5 + ) + + def test_build_fails_without_coordinator(): """Verify that build() raises ValueError when set_coordinator() was not called.""" triage = _RecordingAgent(name="triage") diff --git a/python/samples/autogen-migration/orchestrations/02_selector_group_chat.py b/python/samples/autogen-migration/orchestrations/02_selector_group_chat.py index 097680a3b7..6c5425f49f 100644 --- a/python/samples/autogen-migration/orchestrations/02_selector_group_chat.py +++ b/python/samples/autogen-migration/orchestrations/02_selector_group_chat.py @@ -86,9 +86,12 @@ async def run_agent_framework() -> None: workflow = ( GroupChatBuilder() .participants([python_expert, javascript_expert, database_expert]) - .set_prompt_based_manager( - chat_client=client, - instructions="Based on the conversation, select the most appropriate expert to respond next.", + .set_manager( + manager=client.create_agent( + name="selector_manager", + instructions="Based on the conversation, select the most appropriate expert to respond next.", + ), + display_name="SelectorManager", ) .with_max_rounds(1) .build() diff --git a/python/samples/getting_started/workflows/README.md b/python/samples/getting_started/workflows/README.md index 3146f3f38b..5997e4891b 100644 --- a/python/samples/getting_started/workflows/README.md +++ b/python/samples/getting_started/workflows/README.md @@ -99,6 +99,7 @@ For observability samples in Agent Framework, see the [observability getting sta | Handoff (Simple) | [orchestration/handoff_simple.py](./orchestration/handoff_simple.py) | Single-tier routing: triage agent routes to specialists, control returns to user after each specialist response | | Handoff (Specialist-to-Specialist) | [orchestration/handoff_specialist_to_specialist.py](./orchestration/handoff_specialist_to_specialist.py) | Multi-tier routing: specialists can hand off to other specialists using `.add_handoff()` fluent API | | Handoff (Return-to-Previous) | [orchestration/handoff_return_to_previous.py](./orchestration/handoff_return_to_previous.py) | Return-to-previous routing: after user input, routes back to the previous specialist instead of coordinator using `.enable_return_to_previous()` | +| Handoff (Autonomous) | [orchestration/handoff_autonomous.py](./orchestration/handoff_autonomous.py) | Autonomous mode: specialists iterate independently until invoking a handoff tool using `.with_interaction_mode("autonomous", autonomous_turn_limit=N)` | | Magentic Workflow (Multi-Agent) | [orchestration/magentic.py](./orchestration/magentic.py) | Orchestrate multiple agents with Magentic manager and streaming | | Magentic + Human Plan Review | [orchestration/magentic_human_plan_update.py](./orchestration/magentic_human_plan_update.py) | Human reviews/updates the plan before execution | | Magentic + Checkpoint Resume | [orchestration/magentic_checkpoint.py](./orchestration/magentic_checkpoint.py) | Resume Magentic orchestration from saved checkpoints | diff --git a/python/samples/getting_started/workflows/agents/group_chat_workflow_as_agent.py b/python/samples/getting_started/workflows/agents/group_chat_workflow_as_agent.py index ff147df453..c94b3004d8 100644 --- a/python/samples/getting_started/workflows/agents/group_chat_workflow_as_agent.py +++ b/python/samples/getting_started/workflows/agents/group_chat_workflow_as_agent.py @@ -38,7 +38,7 @@ async def main() -> None: workflow = ( GroupChatBuilder() - .set_prompt_based_manager(chat_client=OpenAIChatClient(), display_name="Coordinator") + .set_manager(manager=OpenAIChatClient().create_agent(), display_name="Coordinator") .participants(researcher=researcher, writer=writer) .build() ) diff --git a/python/samples/getting_started/workflows/orchestration/handoff_autonomous.py b/python/samples/getting_started/workflows/orchestration/handoff_autonomous.py new file mode 100644 index 0000000000..68bf789083 --- /dev/null +++ b/python/samples/getting_started/workflows/orchestration/handoff_autonomous.py @@ -0,0 +1,146 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +import logging +from collections.abc import AsyncIterable +from typing import cast + +from agent_framework import ( + AgentRunUpdateEvent, + ChatAgent, + ChatMessage, + HandoffBuilder, + WorkflowEvent, + WorkflowOutputEvent, +) +from agent_framework.azure import AzureOpenAIChatClient +from azure.identity import AzureCliCredential + +logging.basicConfig(level=logging.ERROR) + +"""Sample: Autonomous handoff workflow with agent iteration. + +This sample demonstrates `with_interaction_mode("autonomous")`, where agents continue +iterating on their task until they explicitly invoke a handoff tool. This allows +specialists to perform long-running autonomous work (research, coding, analysis) +without prematurely returning control to the coordinator or user. + +Routing Pattern: + User -> Coordinator -> Specialist (iterates N times) -> Handoff -> Final Output + +Prerequisites: + - `az login` (Azure CLI authentication) + - Environment variables for AzureOpenAIChatClient (AZURE_OPENAI_ENDPOINT, etc.) + +Key Concepts: + - Autonomous interaction mode: agents iterate until they handoff + - Turn limits: use `with_interaction_mode("autonomous", autonomous_turn_limit=N)` to cap total iterations +""" + + +def create_agents( + chat_client: AzureOpenAIChatClient, +) -> tuple[ChatAgent, ChatAgent, ChatAgent]: + """Create coordinator and specialists for autonomous iteration.""" + coordinator = chat_client.create_agent( + instructions=( + "You are a coordinator. Route user requests to either research_agent or summary_agent. " + "Always call exactly one handoff tool with a short routing acknowledgement. " + "If unsure, default to research_agent. Never request information yourself. " + "After a specialist hands off back to you, provide a concise final summary and stop." + ), + name="coordinator", + ) + + research_agent = chat_client.create_agent( + instructions=( + "You are a research specialist that explores topics thoroughly. " + "When given a research task, break it down into multiple aspects and explore each one. " + "Continue your research across multiple responses - don't try to finish everything in one response. " + "After each response, think about what else needs to be explored. " + "When you have covered the topic comprehensively (at least 3-4 different aspects), " + "call the handoff tool to return to coordinator with your findings. " + "Keep each individual response focused on one aspect." + ), + name="research_agent", + ) + + summary_agent = chat_client.create_agent( + instructions=( + "You summarize research findings. Provide a concise, well-organized summary. " + "When done, hand off to coordinator." + ), + name="summary_agent", + ) + + return coordinator, research_agent, summary_agent + + +async def _drain(stream: AsyncIterable[WorkflowEvent]) -> list[WorkflowEvent]: + """Collect all events from an async stream.""" + return [event async for event in stream] + + +def _print_conversation(events: list[WorkflowEvent]) -> None: + """Print the final conversation snapshot from workflow output events.""" + for event in events: + if isinstance(event, AgentRunUpdateEvent): + print(event.data, flush=True, end="") + elif isinstance(event, WorkflowOutputEvent): + conversation = cast(list[ChatMessage], event.data) + print("\n=== Final Conversation (Autonomous with Iteration) ===") + for message in conversation: + speaker = message.author_name or message.role.value + text_preview = message.text[:200] + "..." if len(message.text) > 200 else message.text + print(f"- {speaker}: {text_preview}") + print(f"\nTotal messages: {len(conversation)}") + print("=====================================================") + + +async def main() -> None: + """Run an autonomous handoff workflow with specialist iteration enabled.""" + chat_client = AzureOpenAIChatClient(credential=AzureCliCredential()) + coordinator, research_agent, summary_agent = create_agents(chat_client) + + # Build the workflow with autonomous mode + # In autonomous mode, agents continue iterating until they invoke a handoff tool + workflow = ( + HandoffBuilder( + name="autonomous_iteration_handoff", + participants=[coordinator, research_agent, summary_agent], + ) + .set_coordinator(coordinator) + .add_handoff(coordinator, [research_agent, summary_agent]) + .add_handoff(research_agent, coordinator) # Research can hand back to coordinator + .add_handoff(summary_agent, coordinator) + .with_interaction_mode("autonomous", autonomous_turn_limit=15) + .with_termination_condition( + # Terminate after coordinator provides 5 assistant responses + lambda conv: sum(1 for msg in conv if msg.author_name == "coordinator" and msg.role.value == "assistant") + >= 5 + ) + .build() + ) + + initial_request = "Research the key benefits and challenges of renewable energy adoption." + print("Initial request:", initial_request) + print("\nExpecting multiple iterations from the research agent...\n") + + events = await _drain(workflow.run_stream(initial_request)) + _print_conversation(events) + + """ + Expected behavior: + - Coordinator routes to research_agent. + - Research agent iterates multiple times, exploring different aspects of renewable energy. + - Each iteration adds to the conversation without returning to coordinator. + - After thorough research, research_agent calls handoff to coordinator. + - Coordinator provides final summary. + + In autonomous mode, agents continue working until they invoke a handoff tool, + allowing the research_agent to perform 3-4+ responses before handing off. + """ + + +if __name__ == "__main__": + asyncio.run(main())