Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 133 additions & 20 deletions python/packages/core/agent_framework/_workflows/_handoff.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@

The handoff pattern models a coordinator agent that optionally routes
control to specialist agents before handing the conversation back to the user.
The flow is intentionally cyclical:
The flow is intentionally cyclical by default:

user input -> coordinator -> optional specialist -> request user input -> ...

An autonomous interaction mode can bypass the user input request and continue routing
responses back to agents until a handoff occurs or termination criteria are met.

Key properties:
- The entire conversation is maintained and reused on every hop
- The coordinator signals a handoff by invoking a tool call that names the specialist
Expand All @@ -19,7 +22,7 @@
import sys
from collections.abc import Awaitable, Callable, Mapping, Sequence
from dataclasses import dataclass, field
from typing import Any
from typing import Any, Literal

from agent_framework import (
AgentProtocol,
Expand Down Expand Up @@ -59,8 +62,8 @@

logger = logging.getLogger(__name__)


_HANDOFF_TOOL_PATTERN = re.compile(r"(?:handoff|transfer)[_\s-]*to[_\s-]*(?P<target>[\w-]+)", re.IGNORECASE)
_DEFAULT_AUTONOMOUS_TURN_LIMIT = 50


def _create_handoff_tool(alias: str, description: str | None = None) -> AIFunction[Any, Any]:
Expand Down Expand Up @@ -291,6 +294,8 @@ def __init__(
id: str,
handoff_tool_targets: Mapping[str, str] | None = None,
return_to_previous: bool = False,
interaction_mode: Literal["human_in_loop", "autonomous"] = "human_in_loop",
autonomous_turn_limit: int | None = None,
) -> None:
"""Create a coordinator that manages routing between specialists and the user."""
super().__init__(id)
Expand All @@ -302,6 +307,9 @@ def __init__(
self._handoff_tool_targets = {k.lower(): v for k, v in (handoff_tool_targets or {}).items()}
self._return_to_previous = return_to_previous
self._current_agent_id: str | None = None # Track the current agent handling conversation
self._interaction_mode = interaction_mode
self._autonomous_turn_limit = autonomous_turn_limit
self._autonomous_turns = 0

def _get_author_name(self) -> str:
"""Get the coordinator name for orchestrator-generated messages."""
Expand Down Expand Up @@ -340,6 +348,7 @@ async def handle_agent_response(
if target is not None:
# Update current agent when handoff occurs
self._current_agent_id = target
self._autonomous_turns = 0
logger.info(f"Handoff detected: {source} -> {target}. Routing control to specialist '{target}'.")

# Clean tool-related content before sending to next agent
Expand All @@ -354,17 +363,38 @@ async def handle_agent_response(

# Update current agent when they respond without handoff
self._current_agent_id = source
logger.info(
f"Agent '{source}' responded without handoff. "
f"Requesting user input. Return-to-previous: {self._return_to_previous}"
)

if await self._check_termination():
# Clean the output conversation for display
cleaned_output = clean_conversation_for_handoff(conversation)
await ctx.yield_output(cleaned_output)
return

if self._interaction_mode == "autonomous":
self._autonomous_turns += 1
if self._autonomous_turn_limit is not None and self._autonomous_turns >= self._autonomous_turn_limit:
logger.info(
f"Autonomous turn limit reached ({self._autonomous_turn_limit}). "
"Yielding conversation and stopping."
)
cleaned_output = clean_conversation_for_handoff(conversation)
await ctx.yield_output(cleaned_output)
return

# In autonomous mode, agents continue iterating until they invoke a handoff tool
logger.info(
f"Agent '{source}' responded without handoff (turn {self._autonomous_turns}). "
"Continuing autonomous execution."
)
cleaned = clean_conversation_for_handoff(conversation)
request = AgentExecutorRequest(messages=cleaned, should_respond=True)
await ctx.send_message(request, target_id=source)
return

logger.info(
f"Agent '{source}' responded without handoff. "
f"Requesting user input. Return-to-previous: {self._return_to_previous}"
)

# Clean conversation before sending to gateway for user input request
# This removes tool messages that shouldn't be shown to users
cleaned_for_display = clean_conversation_for_handoff(conversation)
Expand All @@ -386,6 +416,9 @@ async def handle_user_input(
# Update authoritative conversation
self._conversation = list(message.full_conversation)

# Reset autonomous turn counter on new user input
self._autonomous_turns = 0

# Check termination before sending to agent
if await self._check_termination():
await ctx.yield_output(list(self._conversation))
Expand Down Expand Up @@ -478,11 +511,12 @@ def _snapshot_pattern_metadata(self) -> dict[str, Any]:
Returns:
Dict containing current agent if return-to-previous is enabled
"""
metadata: dict[str, Any] = {}
if self._return_to_previous:
return {
"current_agent_id": self._current_agent_id,
}
return {}
metadata["current_agent_id"] = self._current_agent_id
if self._interaction_mode == "autonomous":
metadata["autonomous_turns"] = self._autonomous_turns
return metadata

@override
def _restore_pattern_metadata(self, metadata: dict[str, Any]) -> None:
Expand All @@ -495,6 +529,8 @@ def _restore_pattern_metadata(self, metadata: dict[str, Any]) -> None:
"""
if self._return_to_previous and "current_agent_id" in metadata:
self._current_agent_id = metadata["current_agent_id"]
if self._interaction_mode == "autonomous" and "autonomous_turns" in metadata:
self._autonomous_turns = metadata["autonomous_turns"]

def _apply_response_metadata(self, conversation: list[ChatMessage], agent_response: AgentRunResponse) -> None:
"""Merge top-level response metadata into the latest assistant message."""
Expand Down Expand Up @@ -604,13 +640,17 @@ class HandoffBuilder:
r"""Fluent builder for conversational handoff workflows with coordinator and specialist agents.

The handoff pattern enables a coordinator agent to route requests to specialist agents.
A termination condition determines when the workflow should stop requesting input and complete.
Interaction mode controls whether the workflow requests user input after each agent response or
completes autonomously once agents finish responding. A termination condition determines when
the workflow should stop requesting input and complete.

Routing Patterns:

**Single-Tier (Default):** Only the coordinator can hand off to specialists. After any specialist
**Single-Tier (Default):** Only the coordinator can hand off to specialists. By default, after any specialist
responds, control returns to the user for more input. This creates a cyclical flow:
user -> coordinator -> [optional specialist] -> user -> coordinator -> ...
Use `with_interaction_mode("autonomous")` to skip requesting additional user input and yield the
final conversation when an agent responds without delegating.

**Multi-Tier (Advanced):** Specialists can hand off to other specialists using `.add_handoff()`.
This provides more flexibility for complex workflows but is less controllable than the single-tier
Expand All @@ -621,13 +661,16 @@ class HandoffBuilder:

Key Features:
- **Automatic handoff detection**: The coordinator invokes a handoff tool whose
arguments (for example ``{"handoff_to": "shipping_agent"}``) identify the specialist to receive control.
arguments (for example `{"handoff_to": "shipping_agent"}`) identify the specialist to receive control.
- **Auto-generated tools**: By default the builder synthesizes `handoff_to_<agent>` tools for the coordinator,
so you don't manually define placeholder functions.
- **Full conversation history**: The entire conversation (including any
`ChatMessage.additional_properties`) is preserved and passed to each agent.
- **Termination control**: By default, terminates after 10 user messages. Override with
`.with_termination_condition(lambda conv: ...)` for custom logic (e.g., detect "goodbye").
- **Interaction modes**: Choose `human_in_loop` (default) to prompt users between agent turns,
or `autonomous` to continue routing back to agents without prompting for user input until a
handoff occurs or a termination/turn limit is reached (default autonomous turn limit: 50).
- **Checkpointing**: Optional persistence for resumable workflows.

Usage (Single-Tier):
Expand Down Expand Up @@ -765,7 +808,7 @@ def __init__(
Participants must have stable names/ids because the workflow maps the
handoff tool arguments to these identifiers. Agent names should match
the strings emitted by the coordinator's handoff tool (e.g., a tool that
outputs ``{\"handoff_to\": \"billing\"}`` requires an agent named ``billing``).
outputs `{\"handoff_to\": \"billing\"}` requires an agent named `billing`).
"""
self._name = name
self._description = description
Expand All @@ -781,6 +824,8 @@ def __init__(
self._auto_register_handoff_tools: bool = True
self._handoff_config: dict[str, list[str]] = {} # Maps agent_id -> [target_agent_ids]
self._return_to_previous: bool = False
self._interaction_mode: Literal["human_in_loop", "autonomous"] = "human_in_loop"
self._autonomous_turn_limit: int | None = _DEFAULT_AUTONOMOUS_TURN_LIMIT

if participants:
self.participants(participants)
Expand Down Expand Up @@ -871,8 +916,10 @@ def set_coordinator(self, agent: str | AgentProtocol | Executor) -> "HandoffBuil
1. Handle the request directly and respond to the user, OR
2. Hand off to a specialist agent by including handoff metadata in the response

After a specialist responds, the workflow automatically returns control to the user,
creating a cyclical flow: user -> coordinator -> [specialist] -> user -> ...
After a specialist responds, the workflow automatically returns control to the user
(default) creating a cyclical flow: user -> coordinator -> [specialist] -> user -> ...
Configure `with_interaction_mode("autonomous")` to continue with the responding agent
without requesting another user turn until a handoff occurs or a termination/turn limit is met.

Args:
agent: The agent to use as the coordinator. Can be:
Expand All @@ -899,8 +946,8 @@ def set_coordinator(self, agent: str | AgentProtocol | Executor) -> "HandoffBuil

Note:
The coordinator determines routing by invoking a handoff tool call whose
arguments identify the target specialist (for example ``{\"handoff_to\": \"billing\"}``).
Decorate the tool with ``approval_mode="always_require"`` to ensure the workflow
arguments identify the target specialist (for example `{\"handoff_to\": \"billing\"}`).
Decorate the tool with `approval_mode="always_require"` to ensure the workflow
intercepts the call before execution and can make the transition.
"""
if not self._executors:
Expand Down Expand Up @@ -1236,6 +1283,70 @@ async def check_termination(conv: list[ChatMessage]) -> bool:
self._termination_condition = condition
return self

def with_interaction_mode(
self,
interaction_mode: Literal["human_in_loop", "autonomous"] = "human_in_loop",
*,
autonomous_turn_limit: int | None = None,
) -> "HandoffBuilder":
"""Choose whether the workflow requests user input or runs autonomously after agent replies.

In autonomous mode, agents (including specialists) continue iterating on their task
until they explicitly invoke a handoff tool or the turn limit is reached. This allows
specialists to perform long-running autonomous tasks (e.g., research, coding, analysis)
without prematurely returning control to the coordinator or user.

Args:
interaction_mode: `"human_in_loop"` (default) requests user input after each agent response
that does not trigger a handoff. `"autonomous"` lets agents continue
working until they invoke a handoff tool or the turn limit is reached.

Keyword Args:
autonomous_turn_limit: Maximum number of agent responses before the workflow yields
when in autonomous mode. Only applicable when interaction_mode
is `"autonomous"`. Default is 50. Set to `None` to disable
the limit (use with caution). Ignored with a warning if provided
when interaction_mode is `"human_in_loop"`.

Returns:
Self for chaining.

Example:

.. code-block:: python

workflow = (
HandoffBuilder(participants=[coordinator, research_agent])
.set_coordinator(coordinator)
.add_handoff(coordinator, research_agent)
.add_handoff(research_agent, coordinator)
.with_interaction_mode("autonomous", autonomous_turn_limit=20)
.build()
)

# Flow: User asks a question
# -> Coordinator routes to Research Agent
# -> Research Agent iterates (researches, analyzes, refines)
# -> Research Agent calls handoff_to_coordinator when done
# -> Coordinator provides final response
"""
if interaction_mode not in ("human_in_loop", "autonomous"):
raise ValueError("interaction_mode must be either 'human_in_loop' or 'autonomous'")
self._interaction_mode = interaction_mode

if autonomous_turn_limit is not None:
if interaction_mode != "autonomous":
logger.warning(
f"autonomous_turn_limit={autonomous_turn_limit} was provided but interaction_mode is "
f"'{interaction_mode}'; ignoring."
)
elif autonomous_turn_limit <= 0:
raise ValueError("autonomous_turn_limit must be positive when provided")
else:
self._autonomous_turn_limit = autonomous_turn_limit

return self

def enable_return_to_previous(self, enabled: bool = True) -> "HandoffBuilder":
"""Enable direct return to the current agent after user input, bypassing the coordinator.

Expand Down Expand Up @@ -1437,6 +1548,8 @@ def _handoff_orchestrator_factory(_: _GroupChatConfig) -> Executor:
id="handoff-coordinator",
handoff_tool_targets=handoff_tool_targets,
return_to_previous=self._return_to_previous,
interaction_mode=self._interaction_mode,
autonomous_turn_limit=self._autonomous_turn_limit,
)

wiring = _GroupChatConfig(
Expand Down
Loading
Loading