diff --git a/python/packages/core/agent_framework/_workflows/__init__.py b/python/packages/core/agent_framework/_workflows/__init__.py index 04623c87d9..42e48c50cf 100644 --- a/python/packages/core/agent_framework/_workflows/__init__.py +++ b/python/packages/core/agent_framework/_workflows/__init__.py @@ -85,6 +85,7 @@ MagenticStallInterventionRequest, StandardMagenticManager, ) +from ._orchestration_request_info import AgentInputRequest, AgentResponseReviewRequest, RequestInfoInterceptor from ._orchestration_state import OrchestrationState from ._request_info_mixin import response_handler from ._runner import Runner @@ -122,6 +123,8 @@ "AgentExecutor", "AgentExecutorRequest", "AgentExecutorResponse", + "AgentInputRequest", + "AgentResponseReviewRequest", "AgentRunEvent", "AgentRunUpdateEvent", "Case", @@ -164,6 +167,7 @@ "Message", "OrchestrationState", "RequestInfoEvent", + "RequestInfoInterceptor", "Runner", "RunnerContext", "SequentialBuilder", diff --git a/python/packages/core/agent_framework/_workflows/_base_group_chat_orchestrator.py b/python/packages/core/agent_framework/_workflows/_base_group_chat_orchestrator.py index 8b49de740c..5576246a8e 100644 --- a/python/packages/core/agent_framework/_workflows/_base_group_chat_orchestrator.py +++ b/python/packages/core/agent_framework/_workflows/_base_group_chat_orchestrator.py @@ -47,7 +47,9 @@ def __init__(self, executor_id: str) -> None: self._max_rounds: int | None = None self._termination_condition: Callable[[list[ChatMessage]], bool | Awaitable[bool]] | None = None - def register_participant_entry(self, name: str, *, entry_id: str, is_agent: bool) -> None: + def register_participant_entry( + self, name: str, *, entry_id: str, is_agent: bool, exit_id: str | None = None + ) -> None: """Record routing details for a participant's entry executor. This method provides a unified interface for registering participants @@ -57,8 +59,10 @@ def register_participant_entry(self, name: str, *, entry_id: str, is_agent: bool name: Participant name (used for selection and tracking) entry_id: Executor ID for this participant's entry point is_agent: Whether this is an AgentExecutor (True) or custom Executor (False) + exit_id: Executor ID for this participant's exit point (where responses come from). + If None, defaults to entry_id. """ - self._registry.register(name, entry_id=entry_id, is_agent=is_agent) + self._registry.register(name, entry_id=entry_id, is_agent=is_agent, exit_id=exit_id) # Conversation state management (shared across all patterns) diff --git a/python/packages/core/agent_framework/_workflows/_concurrent.py b/python/packages/core/agent_framework/_workflows/_concurrent.py index 6b3e1ac05e..f6a7b09e60 100644 --- a/python/packages/core/agent_framework/_workflows/_concurrent.py +++ b/python/packages/core/agent_framework/_workflows/_concurrent.py @@ -14,6 +14,7 @@ from ._checkpoint import CheckpointStorage from ._executor import Executor, handler from ._message_utils import normalize_messages_input +from ._orchestration_request_info import RequestInfoInterceptor from ._workflow import Workflow from ._workflow_builder import WorkflowBuilder from ._workflow_context import WorkflowContext @@ -209,15 +210,18 @@ def summarize(results): workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).with_custom_aggregator(summarize).build() - # Enable checkpoint persistence so runs can resume workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).with_checkpointing(storage).build() + + # Enable request info before aggregation + workflow = ConcurrentBuilder().participants([agent1, agent2]).with_request_info().build() """ def __init__(self) -> None: self._participants: list[AgentProtocol | Executor] = [] self._aggregator: Executor | None = None self._checkpoint_storage: CheckpointStorage | None = None + self._request_info_enabled: bool = False def participants(self, participants: Sequence[AgentProtocol | Executor]) -> "ConcurrentBuilder": r"""Define the parallel participants for this concurrent workflow. @@ -296,12 +300,33 @@ def with_checkpointing(self, checkpoint_storage: CheckpointStorage) -> "Concurre self._checkpoint_storage = checkpoint_storage return self + def with_request_info(self) -> "ConcurrentBuilder": + """Enable request info before aggregation in the workflow. + + When enabled, the workflow pauses after all parallel agents complete, + emitting a RequestInfoEvent that allows the caller to review and optionally + modify the combined results before aggregation. The caller provides feedback + via the standard response_handler/request_info pattern. + + Note: + Unlike SequentialBuilder and GroupChatBuilder, ConcurrentBuilder does not + support per-agent filtering since all agents run in parallel and results + are collected together. The pause occurs once with all agent outputs received. + + Returns: + self: The builder instance for fluent chaining. + """ + self._request_info_enabled = True + return self + def build(self) -> Workflow: r"""Build and validate the concurrent workflow. Wiring pattern: - Dispatcher (internal) fans out the input to all `participants` - - Fan-in aggregator collects `AgentExecutorResponse` objects + - Fan-in collects `AgentExecutorResponse` objects from all participants + - If request info is enabled, the orchestration emits a request info event with outputs from all participants + before sending the outputs to the aggregator - Aggregator yields output and the workflow becomes idle. The output is either: - list[ChatMessage] (default aggregator: one user + one assistant per agent) - custom payload from the provided callback/executor @@ -327,7 +352,16 @@ def build(self) -> Workflow: builder = WorkflowBuilder() builder.set_start_executor(dispatcher) builder.add_fan_out_edges(dispatcher, list(self._participants)) - builder.add_fan_in_edges(list(self._participants), aggregator) + + if self._request_info_enabled: + # Insert interceptor between fan-in and aggregator + # participants -> fan-in -> interceptor -> aggregator + request_info_interceptor = RequestInfoInterceptor(executor_id="request_info") + builder.add_fan_in_edges(list(self._participants), request_info_interceptor) + builder.add_edge(request_info_interceptor, aggregator) + else: + # Direct fan-in to aggregator + builder.add_fan_in_edges(list(self._participants), aggregator) if self._checkpoint_storage is not None: builder = builder.with_checkpointing(self._checkpoint_storage) diff --git a/python/packages/core/agent_framework/_workflows/_group_chat.py b/python/packages/core/agent_framework/_workflows/_group_chat.py index 78ddb5c2eb..eab720a4b1 100644 --- a/python/packages/core/agent_framework/_workflows/_group_chat.py +++ b/python/packages/core/agent_framework/_workflows/_group_chat.py @@ -36,6 +36,7 @@ from ._checkpoint import CheckpointStorage from ._conversation_history import ensure_author, latest_user_message from ._executor import Executor, handler +from ._orchestration_request_info import RequestInfoInterceptor from ._participant_utils import GroupChatParticipantSpec, prepare_participant_metadata, wrap_participant from ._workflow import Workflow from ._workflow_builder import WorkflowBuilder @@ -562,14 +563,36 @@ async def _ingest_participant_message( participant_name: str, message: ChatMessage, ctx: WorkflowContext[AgentExecutorRequest | _GroupChatRequestMessage, list[ChatMessage]], + trailing_messages: list[ChatMessage] | None = None, ) -> None: - """Common response ingestion logic shared by agent and custom participants.""" + """Common response ingestion logic shared by agent and custom participants. + + Args: + participant_name: Name of the participant who sent the message + message: The participant's response message + ctx: Workflow context for routing and output + trailing_messages: Optional list of messages to inject after the participant's + message (e.g., additional input from the RequestInfoInterceptor) + """ if participant_name not in self._participants: raise ValueError(f"Received response from unknown participant '{participant_name}'.") message = ensure_author(message, participant_name) self._conversation.extend((message,)) self._history.append(_GroupChatTurn(participant_name, "agent", message)) + + # Inject any trailing messages (e.g., human input) into the conversation + if trailing_messages: + for trailing_msg in trailing_messages: + self._conversation.extend((trailing_msg,)) + # Record as user input in history + author = trailing_msg.author_name or "human" + self._history.append(_GroupChatTurn(author, "user", trailing_msg)) + logger.debug( + f"Injected human input into group chat conversation: " + f"{trailing_msg.text[:50] if trailing_msg.text else '(empty)'}..." + ) + self._pending_agent = None if await self._complete_on_termination(ctx): @@ -685,14 +708,18 @@ async def _handle_manager_response( to the selected participant. This method implements the core orchestration logic for agent-based managers. + Also handles any human input that was injected into the response's full_conversation + by the human input hook interceptor. + Args: response: AgentExecutor response from manager agent ctx: Workflow context for routing and output Behavior: + - Extracts any human input from the response - Parses manager selection from response - If finish=True: yields final message and completes workflow - - If participant selected: routes request to that participant + - If participant selected: routes request to that participant with human input included - Validates selected participant exists - Enforces round limits if configured @@ -700,6 +727,9 @@ async def _handle_manager_response( ValueError: If manager selects invalid/unknown participant RuntimeError: If manager response cannot be parsed """ + # Extract any human input that was injected by the human input hook + trailing_user_messages = self._extract_trailing_user_messages(response) + selection = self._parse_manager_selection(response) if self._pending_finalization: @@ -753,6 +783,19 @@ async def _handle_manager_response( self._conversation.append(manager_message) self._history.append(_GroupChatTurn(self._manager_name, "manager", manager_message)) + # Inject any human input that was attached to the manager's response + # This ensures the next participant sees the human's guidance + if trailing_user_messages: + for human_msg in trailing_user_messages: + conversation.append(human_msg) + self._conversation.append(human_msg) + author = human_msg.author_name or "human" + self._history.append(_GroupChatTurn(author, "user", human_msg)) + logger.debug( + f"Injected human input after manager selection: " + f"{human_msg.text[:50] if human_msg.text else '(empty)'}..." + ) + if await self._complete_on_termination(ctx): return @@ -808,6 +851,41 @@ def _extract_agent_message(response: AgentExecutorResponse, participant_name: st ) return ensure_author(final_message, participant_name) + @staticmethod + def _extract_trailing_user_messages(response: AgentExecutorResponse) -> list[ChatMessage]: + """Extract any user messages that appear after the last assistant message. + + This is used to capture human input that was injected by the human input hook + interceptor. The hook adds user messages to full_conversation after the agent's + response, so they appear at the end of the sequence. + + Args: + response: AgentExecutor response that may contain trailing user messages + + Returns: + List of user messages that appear after the last assistant message, + or empty list if none found + """ + if not response.full_conversation: + return [] + + # Find index of last assistant message + last_assistant_idx = -1 + for i, msg in enumerate(response.full_conversation): + if msg.role == Role.ASSISTANT: + last_assistant_idx = i + + if last_assistant_idx < 0: + return [] + + # Collect any user messages after the last assistant message + trailing_user: list[ChatMessage] = [] + for msg in response.full_conversation[last_assistant_idx + 1 :]: + if msg.role == Role.USER: + trailing_user.append(msg) + + return trailing_user + async def _handle_task_message( self, task_message: ChatMessage, @@ -979,6 +1057,9 @@ async def handle_agent_executor_response( Routes responses based on whether they come from the manager or a participant: - Manager responses: parsed for speaker selection decisions - Participant responses: ingested as conversation messages + + Also handles any human input that was injected into the response's full_conversation + by the human input hook interceptor. """ participant_name = self._registry.get_participant_name(response.executor_id) if participant_name is None: @@ -994,7 +1075,13 @@ async def handle_agent_executor_response( else: # Regular participant response message = self._extract_agent_message(response, participant_name) - await self._ingest_participant_message(participant_name, message, ctx) + + # Check for human input injected by human input hook + # Human input appears as user messages at the end of full_conversation + # after the agent's assistant message + trailing_user_messages = self._extract_trailing_user_messages(response) + + await self._ingest_participant_message(participant_name, message, ctx, trailing_user_messages) def _default_orchestrator_factory(wiring: _GroupChatConfig) -> Executor: @@ -1089,13 +1176,14 @@ def assemble_group_chat_workflow( manager_entry = manager_pipeline[0] manager_exit = manager_pipeline[-1] - # Register manager with orchestrator + # Register manager with orchestrator (with entry and exit IDs for pipeline routing) register_entry = getattr(orchestrator, "register_participant_entry", None) if callable(register_entry): register_entry( wiring.manager_name, entry_id=manager_entry.id, is_agent=not isinstance(wiring.manager_participant, Executor), + exit_id=manager_exit.id if manager_exit is not manager_entry else None, ) # Wire manager edges: Orchestrator ↔ Manager @@ -1118,10 +1206,13 @@ def assemble_group_chat_workflow( register_entry = getattr(orchestrator, "register_participant_entry", None) if callable(register_entry): + # Register both entry and exit IDs so responses can be routed correctly + # when interceptors are prepended to the pipeline register_entry( name, entry_id=entry_executor.id, is_agent=not isinstance(spec.participant, Executor), + exit_id=exit_executor.id if exit_executor is not entry_executor else None, ) workflow_builder = workflow_builder.add_edge(orchestrator, entry_executor) @@ -1213,6 +1304,30 @@ def select_next_speaker(state: GroupChatStateSnapshot) -> str | None: .build() ) + *Pattern 3: Request info for mid-conversation feedback* + + .. code-block:: python + + from agent_framework import GroupChatBuilder + + # Pause before all participants + workflow = ( + GroupChatBuilder() + .set_select_speakers_func(select_next_speaker) + .participants([researcher, writer]) + .with_request_info() + .build() + ) + + # Pause only before specific participants + workflow = ( + GroupChatBuilder() + .set_select_speakers_func(select_next_speaker) + .participants([researcher, writer, editor]) + .with_request_info(agents=[editor]) # Only pause before editor responds + .build() + ) + **Participant Specification:** Two ways to specify participants: @@ -1262,6 +1377,8 @@ def __init__( self._interceptors: list[_InterceptorSpec] = [] self._orchestrator_factory = group_chat_orchestrator(_orchestrator_factory) self._participant_factory = _participant_factory or _default_participant_factory + self._request_info_enabled: bool = False + self._request_info_filter: set[str] | None = None def _set_manager_function( self, @@ -1338,6 +1455,12 @@ def set_manager( Note: The manager agent's response_format must be ManagerSelectionResponse for structured output. Custom response formats raise ValueError instead of being overridden. + + The manager can be included in :py:meth:`with_request_info` to pause before the manager + runs, allowing human steering of orchestration decisions. If no filter is specified, + the manager is included automatically. To filter explicitly:: + + .with_request_info(agents=[manager, writer]) # Pause before manager and writer """ if self._manager is not None or self._manager_participant is not None: raise ValueError( @@ -1668,6 +1791,54 @@ def with_max_rounds(self, max_rounds: int | None) -> "GroupChatBuilder": self._max_rounds = max_rounds return self + def with_request_info( + self, + *, + agents: Sequence[str | AgentProtocol | Executor] | None = None, + ) -> "GroupChatBuilder": + """Enable request info before participants run in the workflow. + + When enabled, the workflow pauses before each participant runs, emitting + a RequestInfoEvent that allows the caller to review the conversation and + optionally inject guidance before the participant responds. The caller provides + input via the standard response_handler/request_info pattern. + + Args: + agents: Optional filter - only pause before these specific agents/executors. + Accepts agent names (str), agent instances, or executor instances. + If None (default), pauses before every participant. + + Returns: + self: The builder instance for fluent chaining. + + Example: + + .. code-block:: python + + # Pause before all participants + workflow = ( + GroupChatBuilder() + .set_manager(manager) + .participants([optimist, pragmatist, creative]) + .with_request_info() + .build() + ) + + # Pause only before specific participants + workflow = ( + GroupChatBuilder() + .set_manager(manager) + .participants([optimist, pragmatist, creative]) + .with_request_info(agents=[pragmatist]) # Only pause before pragmatist + .build() + ) + """ + from ._orchestration_request_info import resolve_request_info_filter + + self._request_info_enabled = True + self._request_info_filter = resolve_request_info_filter(list(agents) if agents else None) + return self + def _get_participant_metadata(self) -> dict[str, Any]: if self._participant_metadata is None: self._participant_metadata = prepare_participant_metadata( @@ -1754,9 +1925,32 @@ def build(self) -> Workflow: participant_executors=metadata["executors"], ) + # Determine participant factory - wrap if request info is enabled + participant_factory = self._participant_factory + if self._request_info_enabled: + # Create a wrapper factory that adds request info interceptor before each participant + base_factory = participant_factory + agent_filter = self._request_info_filter + + def _factory_with_request_info( + spec: GroupChatParticipantSpec, + config: _GroupChatConfig, + ) -> _GroupChatParticipantPipeline: + pipeline = list(base_factory(spec, config)) + if pipeline: + # Add interceptor executor BEFORE the participant (prepend) + interceptor = RequestInfoInterceptor( + executor_id=f"request_info:{spec.name}", + agent_filter=agent_filter, + ) + pipeline.insert(0, interceptor) + return tuple(pipeline) + + participant_factory = _factory_with_request_info + result = assemble_group_chat_workflow( wiring=wiring, - participant_factory=self._participant_factory, + participant_factory=participant_factory, orchestrator_factory=self._orchestrator_factory, interceptors=self._interceptors, checkpoint_storage=self._checkpoint_storage, diff --git a/python/packages/core/agent_framework/_workflows/_handoff.py b/python/packages/core/agent_framework/_workflows/_handoff.py index 0a3608d2e6..8e0a7aec1e 100644 --- a/python/packages/core/agent_framework/_workflows/_handoff.py +++ b/python/packages/core/agent_framework/_workflows/_handoff.py @@ -45,8 +45,10 @@ from ._group_chat import ( _default_participant_factory, # type: ignore[reportPrivateUsage] _GroupChatConfig, # type: ignore[reportPrivateUsage] + _GroupChatParticipantPipeline, # type: ignore[reportPrivateUsage] assemble_group_chat_workflow, ) +from ._orchestration_request_info import RequestInfoInterceptor from ._orchestrator_helpers import clean_conversation_for_handoff from ._participant_utils import GroupChatParticipantSpec, prepare_participant_metadata, sanitize_identifier from ._request_info_mixin import response_handler @@ -315,6 +317,30 @@ def _get_author_name(self) -> str: """Get the coordinator name for orchestrator-generated messages.""" return "handoff_coordinator" + def _extract_agent_id_from_source(self, source: str | None) -> str | None: + """Extract the original agent ID from the source executor ID. + + When a request info interceptor is in the pipeline, the source will be + like 'request_info:agent_name'. This method extracts the + actual agent ID. + + Args: + source: The source executor ID from the workflow context + + Returns: + The actual agent ID, or the original source if not an interceptor + """ + if source is None: + return None + if source.startswith("request_info:"): + return source[len("request_info:") :] + # TODO(@moonbox3): Remove legacy prefix support in a separate PR (GA cleanup) + if source.startswith("human_review:"): + return source[len("human_review:") :] + if source.startswith("human_input_interceptor:"): + return source[len("human_input_interceptor:") :] + return source + @handler async def handle_agent_response( self, @@ -322,7 +348,8 @@ async def handle_agent_response( ctx: WorkflowContext[AgentExecutorRequest | list[ChatMessage], list[ChatMessage] | _ConversationForUserInput], ) -> None: """Process an agent's response and determine whether to route, request input, or terminate.""" - source = ctx.get_source_executor_id() + raw_source = ctx.get_source_executor_id() + source = self._extract_agent_id_from_source(raw_source) is_starting_agent = source == self._starting_agent_id # On first turn of a run, conversation is empty @@ -400,8 +427,8 @@ async def handle_agent_response( cleaned_for_display = clean_conversation_for_handoff(conversation) # The awaiting_agent_id is the agent that just responded and is awaiting user input - # This is the source of the current response - next_agent_id = source + # This is the source of the current response (fallback to starting agent if source is unknown) + next_agent_id = source or self._starting_agent_id message_to_gateway = _ConversationForUserInput(conversation=cleaned_for_display, next_agent_id=next_agent_id) await ctx.send_message(message_to_gateway, target_id=self._input_gateway_id) # type: ignore[arg-type] @@ -826,6 +853,8 @@ def __init__( self._return_to_previous: bool = False self._interaction_mode: Literal["human_in_loop", "autonomous"] = "human_in_loop" self._autonomous_turn_limit: int | None = _DEFAULT_AUTONOMOUS_TURN_LIMIT + self._request_info_enabled: bool = False + self._request_info_filter: set[str] | None = None if participants: self.participants(participants) @@ -1418,6 +1447,52 @@ def enable_return_to_previous(self, enabled: bool = True) -> "HandoffBuilder": self._return_to_previous = enabled return self + def with_request_info( + self, + *, + agents: Sequence[str | AgentProtocol | Executor] | None = None, + ) -> "HandoffBuilder": + """Enable request info before participants run in the workflow. + + When enabled, the workflow pauses before each participant runs, emitting + a RequestInfoEvent that allows the caller to review the conversation and + optionally inject guidance before the participant responds. The caller provides + input via the standard response_handler/request_info pattern. + + Args: + agents: Optional filter - only pause before these specific agents/executors. + Accepts agent names (str), agent instances, or executor instances. + If None (default), pauses before every participant. + + Returns: + self: The builder instance for fluent chaining. + + Example: + + .. code-block:: python + + # Pause before all participants + workflow = ( + HandoffBuilder(participants=[coordinator, refund, shipping]) + .set_coordinator("coordinator_agent") + .with_request_info() + .build() + ) + + # Pause only before specialist agents (not coordinator) + workflow = ( + HandoffBuilder(participants=[coordinator, refund, shipping]) + .set_coordinator("coordinator_agent") + .with_request_info(agents=[refund, shipping]) + .build() + ) + """ + from ._orchestration_request_info import resolve_request_info_filter + + self._request_info_enabled = True + self._request_info_filter = resolve_request_info_filter(list(agents) if agents else None) + return self + def build(self) -> Workflow: """Construct the final Workflow instance from the configured builder. @@ -1562,9 +1637,33 @@ def _handoff_orchestrator_factory(_: _GroupChatConfig) -> Executor: participant_executors=self._executors, ) + # Determine participant factory - wrap with request info interceptor if enabled + participant_factory: Callable[[GroupChatParticipantSpec, _GroupChatConfig], _GroupChatParticipantPipeline] = ( + _default_participant_factory + ) + if self._request_info_enabled: + base_factory = _default_participant_factory + agent_filter = self._request_info_filter + + def _factory_with_request_info( + spec: GroupChatParticipantSpec, + config: _GroupChatConfig, + ) -> _GroupChatParticipantPipeline: + pipeline = list(base_factory(spec, config)) + if pipeline: + # Add interceptor executor BEFORE the participant (prepend) + interceptor = RequestInfoInterceptor( + executor_id=f"request_info:{spec.name}", + agent_filter=agent_filter, + ) + pipeline.insert(0, interceptor) + return tuple(pipeline) + + participant_factory = _factory_with_request_info + result = assemble_group_chat_workflow( wiring=wiring, - participant_factory=_default_participant_factory, + participant_factory=participant_factory, orchestrator_factory=_handoff_orchestrator_factory, interceptors=(), checkpoint_storage=self._checkpoint_storage, @@ -1575,7 +1674,18 @@ def _handoff_orchestrator_factory(_: _GroupChatConfig) -> Executor: raise TypeError("Expected tuple from assemble_group_chat_workflow with return_builder=True") builder, coordinator = result - builder = builder.add_edge(input_node, starting_executor) + # When request_info is enabled, the input should go through the interceptor first + if self._request_info_enabled: + # Get the entry executor from the builder's registered executors + starting_entry_id = f"request_info:{self._starting_agent_id}" + starting_entry_executor = builder._executors.get(starting_entry_id) # type: ignore + if starting_entry_executor: + builder = builder.add_edge(input_node, starting_entry_executor) + else: + # Fallback to direct connection if interceptor not found + builder = builder.add_edge(input_node, starting_executor) + else: + builder = builder.add_edge(input_node, starting_executor) builder = builder.add_edge(coordinator, user_gateway) builder = builder.add_edge(user_gateway, coordinator) diff --git a/python/packages/core/agent_framework/_workflows/_magentic.py b/python/packages/core/agent_framework/_workflows/_magentic.py index d91cf2a3b8..a24fd77b16 100644 --- a/python/packages/core/agent_framework/_workflows/_magentic.py +++ b/python/packages/core/agent_framework/_workflows/_magentic.py @@ -2089,6 +2089,17 @@ class MagenticBuilder: The builder provides a fluent API for configuring participants, the manager, optional plan review, checkpointing, and event callbacks. + Human-in-the-loop Support: + Magentic provides specialized HITL mechanisms via: + + - `.with_plan_review()` - Review and approve/revise plans before execution + - `.with_human_input_on_stall()` - Intervene when workflow stalls + - Tool approval via `FunctionApprovalRequestContent` - Approve individual tool calls + + These emit `MagenticHumanInterventionRequest` events that provide structured + decision options (APPROVE, REVISE, CONTINUE, REPLAN, GUIDANCE) appropriate + for Magentic's planning-based orchestration. + Usage: .. code-block:: python diff --git a/python/packages/core/agent_framework/_workflows/_orchestration_request_info.py b/python/packages/core/agent_framework/_workflows/_orchestration_request_info.py new file mode 100644 index 0000000000..91c9ec799a --- /dev/null +++ b/python/packages/core/agent_framework/_workflows/_orchestration_request_info.py @@ -0,0 +1,329 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Request info support for high-level builder APIs. + +This module provides a mechanism for pausing workflows to request external input +before agent turns in `SequentialBuilder`, `ConcurrentBuilder`, `GroupChatBuilder`, +and `HandoffBuilder`. + +The design follows the standard `request_info` pattern used throughout the +workflow system, keeping the API consistent and predictable. + +Key components: +- AgentInputRequest: Request type emitted via RequestInfoEvent for pre-agent steering +- RequestInfoInterceptor: Internal executor that pauses workflow before agent runs +""" + +import logging +import uuid +from dataclasses import dataclass, field +from typing import Any + +from .._agents import AgentProtocol +from .._types import ChatMessage, Role +from ._agent_executor import AgentExecutorRequest +from ._executor import Executor, handler +from ._request_info_mixin import response_handler +from ._workflow_context import WorkflowContext + +logger = logging.getLogger(__name__) + + +def resolve_request_info_filter( + agents: list[str | AgentProtocol | Executor] | None, +) -> set[str] | None: + """Resolve a list of agent/executor references to a set of IDs for filtering. + + Args: + agents: List of agent names (str), AgentProtocol instances, or Executor instances. + If None, returns None (meaning no filtering - pause for all). + + Returns: + Set of executor/agent IDs to filter on, or None if no filtering. + """ + if agents is None: + return None + + result: set[str] = set() + for agent in agents: + if isinstance(agent, str): + result.add(agent) + elif isinstance(agent, Executor): + result.add(agent.id) + elif isinstance(agent, AgentProtocol): + name = getattr(agent, "name", None) + if name: + result.add(name) + else: + logger.warning("AgentProtocol without name cannot be used for request_info filtering") + else: + logger.warning(f"Unsupported type for request_info filter: {type(agent).__name__}") + + return result if result else None + + +@dataclass +class AgentInputRequest: + """Request for human input before an agent runs in high-level builder workflows. + + Emitted via RequestInfoEvent when a workflow pauses before an agent executes. + The response is injected into the conversation as a user message to steer + the agent's behavior. + + This is the standard request type used by `.with_request_info()` on + SequentialBuilder, ConcurrentBuilder, GroupChatBuilder, and HandoffBuilder. + + Attributes: + target_agent_id: ID of the agent that is about to run + conversation: Current conversation history the agent will receive + instruction: Optional instruction from the orchestrator (e.g., manager in GroupChat) + metadata: Builder-specific context (stores internal state for resume) + """ + + target_agent_id: str | None + conversation: list[ChatMessage] = field(default_factory=lambda: []) + instruction: str | None = None + metadata: dict[str, Any] = field(default_factory=lambda: {}) + + +# Keep legacy name as alias for backward compatibility +AgentResponseReviewRequest = AgentInputRequest + + +DEFAULT_REQUEST_INFO_ID = "request_info_interceptor" + + +class RequestInfoInterceptor(Executor): + """Internal executor that pauses workflow for human input before agent runs. + + This executor is inserted into the workflow graph by builders when + `.with_request_info()` is called. It intercepts AgentExecutorRequest messages + BEFORE the agent runs and pauses the workflow via `ctx.request_info()` with + an AgentInputRequest. + + When a response is received, the response handler injects the input + as a user message into the conversation and forwards the request to the agent. + + The optional `agent_filter` parameter allows limiting which agents trigger the pause. + If the target agent's ID is not in the filter set, the request is forwarded + without pausing. + """ + + def __init__( + self, + executor_id: str | None = None, + agent_filter: set[str] | None = None, + ) -> None: + """Initialize the request info interceptor executor. + + Args: + executor_id: ID for this executor. If None, generates a unique ID + using the format "request_info_interceptor-". + agent_filter: Optional set of agent/executor IDs to filter on. + If provided, only requests to these agents trigger a pause. + If None (default), all requests trigger a pause. + """ + if executor_id is None: + executor_id = f"{DEFAULT_REQUEST_INFO_ID}-{uuid.uuid4().hex[:8]}" + super().__init__(executor_id) + self._agent_filter = agent_filter + + def _should_pause_for_agent(self, agent_id: str | None) -> bool: + """Check if we should pause for the given agent ID.""" + if self._agent_filter is None: + return True + if agent_id is None: + return False + # Check both the full ID and any name portion after a prefix + # e.g., "groupchat_agent:writer" should match filter "writer" + if agent_id in self._agent_filter: + return True + # Extract name from prefixed IDs like "groupchat_agent:writer" or "request_info:writer" + if ":" in agent_id: + name_part = agent_id.split(":", 1)[1] + if name_part in self._agent_filter: + return True + return False + + def _extract_agent_name_from_executor_id(self) -> str | None: + """Extract the agent name from this interceptor's executor ID. + + The interceptor ID is typically "request_info:", so we + extract the agent name to determine which agent we're intercepting for. + """ + if ":" in self.id: + return self.id.split(":", 1)[1] + return None + + @handler + async def intercept_agent_request( + self, + request: AgentExecutorRequest, + ctx: WorkflowContext[AgentExecutorRequest, Any], + ) -> None: + """Intercept request before agent runs and pause for human input. + + Pauses the workflow and emits a RequestInfoEvent with the current + conversation for steering. If an agent filter is configured and this + agent is not in the filter, the request is forwarded without pausing. + + Args: + request: The request about to be sent to the agent + ctx: Workflow context for requesting info + """ + # Determine the target agent from our executor ID + target_agent = self._extract_agent_name_from_executor_id() + + # Check if we should pause for this agent + if not self._should_pause_for_agent(target_agent): + logger.debug(f"Skipping request_info pause for agent {target_agent} (not in filter)") + await ctx.send_message(request) + return + + conversation = list(request.messages or []) + + input_request = AgentInputRequest( + target_agent_id=target_agent, + conversation=conversation, + instruction=None, # Could be extended to include manager instruction + metadata={"_original_request": request, "_input_type": "AgentExecutorRequest"}, + ) + await ctx.request_info(input_request, str) + + @handler + async def intercept_conversation( + self, + messages: list[ChatMessage], + ctx: WorkflowContext[list[ChatMessage], Any], + ) -> None: + """Intercept conversation before agent runs (used by SequentialBuilder). + + SequentialBuilder passes list[ChatMessage] directly to agents. This handler + intercepts that flow and pauses for human input. + + Args: + messages: The conversation about to be sent to the agent + ctx: Workflow context for requesting info + """ + # Determine the target agent from our executor ID + target_agent = self._extract_agent_name_from_executor_id() + + # Check if we should pause for this agent + if not self._should_pause_for_agent(target_agent): + logger.debug(f"Skipping request_info pause for agent {target_agent} (not in filter)") + await ctx.send_message(messages) + return + + input_request = AgentInputRequest( + target_agent_id=target_agent, + conversation=list(messages), + instruction=None, + metadata={"_original_messages": messages, "_input_type": "list[ChatMessage]"}, + ) + await ctx.request_info(input_request, str) + + @handler + async def intercept_concurrent_requests( + self, + requests: list[AgentExecutorRequest], + ctx: WorkflowContext[list[AgentExecutorRequest], Any], + ) -> None: + """Intercept requests before concurrent agents run. + + This handler is used by ConcurrentBuilder to get human input before + all parallel agents execute. + + Args: + requests: List of requests for all concurrent agents + ctx: Workflow context for requesting info + """ + # Combine conversations for display + combined_conversation: list[ChatMessage] = [] + if requests: + combined_conversation = list(requests[0].messages or []) + + input_request = AgentInputRequest( + target_agent_id=None, # Multiple agents + conversation=combined_conversation, + instruction=None, + metadata={"_original_requests": requests}, + ) + await ctx.request_info(input_request, str) + + @response_handler + async def handle_input_response( + self, + original_request: AgentInputRequest, + # TODO(@moonbox3): Extend to support other content types + response: str, + ctx: WorkflowContext[AgentExecutorRequest | list[ChatMessage], Any], + ) -> None: + """Handle the human input and forward the modified request to the agent. + + Injects the response as a user message into the conversation + and forwards the modified request to the agent. + + Args: + original_request: The AgentInputRequest that triggered the pause + response: The human input text + ctx: Workflow context for continuing the workflow + + TODO: Consider having each orchestration implement its own response handler + for more specialized behavior. + """ + human_message = ChatMessage(role=Role.USER, text=response) + + # Handle concurrent case (list of AgentExecutorRequest) + original_requests: list[AgentExecutorRequest] | None = original_request.metadata.get("_original_requests") + if original_requests is not None: + updated_requests: list[AgentExecutorRequest] = [] + for orig_req in original_requests: + messages = list(orig_req.messages or []) + messages.append(human_message) + updated_requests.append( + AgentExecutorRequest( + messages=messages, + should_respond=orig_req.should_respond, + ) + ) + + logger.debug( + f"Human input received for concurrent workflow, " + f"continuing with {len(updated_requests)} updated requests" + ) + await ctx.send_message(updated_requests) # type: ignore[arg-type] + return + + # Handle list[ChatMessage] case (SequentialBuilder) + original_messages: list[ChatMessage] | None = original_request.metadata.get("_original_messages") + if original_messages is not None: + messages = list(original_messages) + messages.append(human_message) + + logger.debug( + f"Human input received for agent {original_request.target_agent_id}, " + f"forwarding conversation with steering context" + ) + await ctx.send_message(messages) + return + + # Handle AgentExecutorRequest case (GroupChatBuilder) + orig_request: AgentExecutorRequest | None = original_request.metadata.get("_original_request") + if orig_request is not None: + messages = list(orig_request.messages or []) + messages.append(human_message) + + updated_request = AgentExecutorRequest( + messages=messages, + should_respond=orig_request.should_respond, + ) + + logger.debug( + f"Human input received for agent {original_request.target_agent_id}, " + f"forwarding request with steering context" + ) + await ctx.send_message(updated_request) + return + + logger.error("Input response handler missing original request/messages in metadata") + raise RuntimeError("Missing original request or messages in AgentInputRequest metadata") diff --git a/python/packages/core/agent_framework/_workflows/_orchestrator_helpers.py b/python/packages/core/agent_framework/_workflows/_orchestrator_helpers.py index 9da726faf4..14fd68fa46 100644 --- a/python/packages/core/agent_framework/_workflows/_orchestrator_helpers.py +++ b/python/packages/core/agent_framework/_workflows/_orchestrator_helpers.py @@ -140,6 +140,9 @@ class ParticipantRegistry: Provides a clean interface for the common pattern of mapping participant names to executor IDs and tracking which are agents vs custom executors. + + Tracks both entry IDs (where to send requests) and exit IDs (where responses + come from) to support pipeline configurations where these differ. """ def __init__(self) -> None: @@ -154,19 +157,26 @@ def register( *, entry_id: str, is_agent: bool, + exit_id: str | None = None, ) -> None: """Register a participant's routing information. Args: name: Participant name - entry_id: Executor ID for this participant's entry point + entry_id: Executor ID for this participant's entry point (where to send) is_agent: Whether this is an AgentExecutor (True) or custom Executor (False) + exit_id: Executor ID for this participant's exit point (where responses come from). + If None, defaults to entry_id (single-executor pipeline). """ self._participant_entry_ids[name] = entry_id + actual_exit_id = exit_id if exit_id is not None else entry_id if is_agent: self._agent_executor_ids[name] = entry_id + # Map both entry and exit IDs to participant name for response routing self._executor_id_to_participant[entry_id] = name + if actual_exit_id != entry_id: + self._executor_id_to_participant[actual_exit_id] = name else: self._non_agent_participants.add(name) diff --git a/python/packages/core/agent_framework/_workflows/_sequential.py b/python/packages/core/agent_framework/_workflows/_sequential.py index 38fbc53c04..93fc5e991c 100644 --- a/python/packages/core/agent_framework/_workflows/_sequential.py +++ b/python/packages/core/agent_framework/_workflows/_sequential.py @@ -52,6 +52,7 @@ handler, ) from ._message_utils import normalize_messages_input +from ._orchestration_request_info import RequestInfoInterceptor from ._workflow import Workflow from ._workflow_builder import WorkflowBuilder from ._workflow_context import WorkflowContext @@ -76,9 +77,7 @@ async def from_messages( messages: list[str | ChatMessage], ctx: WorkflowContext[list[ChatMessage]], ) -> None: - # Make a copy to avoid mutation downstream - normalized = normalize_messages_input(messages) - await ctx.send_message(list(normalized)) + await ctx.send_message(normalize_messages_input(messages)) class _ResponseToConversation(Executor): @@ -119,11 +118,24 @@ class SequentialBuilder: # Enable checkpoint persistence workflow = SequentialBuilder().participants([agent1, agent2]).with_checkpointing(storage).build() + + # Enable request info for mid-workflow feedback (pauses before each agent) + workflow = SequentialBuilder().participants([agent1, agent2]).with_request_info().build() + + # Enable request info only for specific agents + workflow = ( + SequentialBuilder() + .participants([agent1, agent2, agent3]) + .with_request_info(agents=[agent2]) # Only pause before agent2 + .build() + ) """ def __init__(self) -> None: self._participants: list[AgentProtocol | Executor] = [] self._checkpoint_storage: CheckpointStorage | None = None + self._request_info_enabled: bool = False + self._request_info_filter: set[str] | None = None def participants(self, participants: Sequence[AgentProtocol | Executor]) -> "SequentialBuilder": """Define the ordered participants for this sequential workflow. @@ -157,14 +169,56 @@ def with_checkpointing(self, checkpoint_storage: CheckpointStorage) -> "Sequenti self._checkpoint_storage = checkpoint_storage return self + def with_request_info( + self, + *, + agents: Sequence[str | AgentProtocol | Executor] | None = None, + ) -> "SequentialBuilder": + """Enable request info before agents run in the workflow. + + When enabled, the workflow pauses before each agent runs, emitting + a RequestInfoEvent that allows the caller to review the conversation and + optionally inject guidance before the agent responds. The caller provides + input via the standard response_handler/request_info pattern. + + Args: + agents: Optional filter - only pause before these specific agents/executors. + Accepts agent names (str), agent instances, or executor instances. + If None (default), pauses before every agent. + + Returns: + self: The builder instance for fluent chaining. + + Example: + + .. code-block:: python + + # Pause before all agents + workflow = SequentialBuilder().participants([a1, a2]).with_request_info().build() + + # Pause only before specific agents + workflow = ( + SequentialBuilder() + .participants([drafter, reviewer, finalizer]) + .with_request_info(agents=[reviewer]) # Only pause before reviewer + .build() + ) + """ + from ._orchestration_request_info import resolve_request_info_filter + + self._request_info_enabled = True + self._request_info_filter = resolve_request_info_filter(list(agents) if agents else None) + return self + def build(self) -> Workflow: """Build and validate the sequential workflow. Wiring pattern: - _InputToConversation normalizes the initial input into list[ChatMessage] - For each participant in order: - - If Agent (or AgentExecutor): pass conversation to the agent, then convert response - to conversation via _ResponseToConversation + - If Agent (or AgentExecutor): pass conversation to the agent, then optionally + route through human input interceptor, then convert response to conversation + via _ResponseToConversation - Else (custom Executor): pass conversation directly to the executor - _EndWithConversation yields the final conversation and the workflow becomes idle """ @@ -184,12 +238,22 @@ def build(self) -> Workflow: for p in self._participants: # Agent-like branch: either explicitly an AgentExecutor or any non-AgentExecutor if not (isinstance(p, Executor) and not isinstance(p, AgentExecutor)): - # input conversation -> (agent) -> response -> conversation - builder.add_edge(prior, p) - # Give the adapter a deterministic, self-describing id + # input conversation -> [human_input_interceptor] -> (agent) -> response -> conversation label: str label = p.id if isinstance(p, Executor) else getattr(p, "name", None) or p.__class__.__name__ resp_to_conv = _ResponseToConversation(id=f"to-conversation:{label}") + + if self._request_info_enabled: + # Insert request info interceptor BEFORE the agent + interceptor = RequestInfoInterceptor( + executor_id=f"request_info:{label}", + agent_filter=self._request_info_filter, + ) + builder.add_edge(prior, interceptor) + builder.add_edge(interceptor, p) + else: + builder.add_edge(prior, p) + builder.add_edge(p, resp_to_conv) prior = resp_to_conv elif isinstance(p, Executor): diff --git a/python/packages/core/agent_framework/observability.py b/python/packages/core/agent_framework/observability.py index b86ee3eb4a..f3e1d9bd68 100644 --- a/python/packages/core/agent_framework/observability.py +++ b/python/packages/core/agent_framework/observability.py @@ -521,9 +521,9 @@ def _configure_providers(self, exporters: list["LogRecordExporter | MetricExport logger_provider.add_log_record_processor(BatchLogRecordProcessor(exporter)) should_add_console_exporter = False if should_add_console_exporter: - from opentelemetry.sdk._logs.export import ConsoleLogExporter + from opentelemetry.sdk._logs.export import ConsoleLogRecordExporter - logger_provider.add_log_record_processor(BatchLogRecordProcessor(ConsoleLogExporter())) + logger_provider.add_log_record_processor(BatchLogRecordProcessor(ConsoleLogRecordExporter())) # Attach a handler with the provider to the root logger logger = logging.getLogger() diff --git a/python/packages/core/tests/workflow/test_group_chat.py b/python/packages/core/tests/workflow/test_group_chat.py index 5d11e64c79..a99af64102 100644 --- a/python/packages/core/tests/workflow/test_group_chat.py +++ b/python/packages/core/tests/workflow/test_group_chat.py @@ -1082,3 +1082,106 @@ def test_set_manager_builds_with_agent_manager() -> None: assert isinstance(orchestrator, GroupChatOrchestratorExecutor) assert orchestrator._is_manager_agent() + + +async def test_group_chat_with_request_info_filtering(): + """Test that with_request_info(agents=[...]) only pauses before specified agents run.""" + from agent_framework import AgentInputRequest, RequestInfoEvent + + # Create agents - we want to verify only beta triggers pause + alpha = StubAgent("alpha", "response from alpha") + beta = StubAgent("beta", "response from beta") + + # Manager that selects alpha first, then beta, then finishes + call_count = 0 + + async def selector(state: GroupChatStateSnapshot) -> str | None: + nonlocal call_count + call_count += 1 + if call_count == 1: + return "alpha" + if call_count == 2: + return "beta" + return None + + workflow = ( + GroupChatBuilder() + .set_select_speakers_func(selector, display_name="manager", final_message="done") + .participants(alpha=alpha, beta=beta) + .with_request_info(agents=["beta"]) # Only pause before beta runs + .build() + ) + + # Run until we get a request info event (should be before beta, not alpha) + request_events: list[RequestInfoEvent] = [] + async for event in workflow.run_stream("test task"): + if isinstance(event, RequestInfoEvent) and isinstance(event.data, AgentInputRequest): + request_events.append(event) + # Don't break - let stream complete naturally when paused + + # Should have exactly one request event before beta + assert len(request_events) == 1 + request_event = request_events[0] + + # The target agent should be beta's executor ID (groupchat_agent:beta) + assert request_event.data.target_agent_id is not None + assert "beta" in request_event.data.target_agent_id + + # Continue the workflow with a response + outputs: list[WorkflowOutputEvent] = [] + async for event in workflow.send_responses_streaming({request_event.request_id: "continue please"}): + if isinstance(event, WorkflowOutputEvent): + outputs.append(event) + + # Workflow should complete + assert len(outputs) == 1 + + +async def test_group_chat_with_request_info_no_filter_pauses_all(): + """Test that with_request_info() without agents pauses before all participants.""" + from agent_framework import AgentInputRequest, RequestInfoEvent + + # Create agents + alpha = StubAgent("alpha", "response from alpha") + + # Manager selects alpha then finishes + call_count = 0 + + async def selector(state: GroupChatStateSnapshot) -> str | None: + nonlocal call_count + call_count += 1 + if call_count == 1: + return "alpha" + return None + + workflow = ( + GroupChatBuilder() + .set_select_speakers_func(selector, display_name="manager", final_message="done") + .participants(alpha=alpha) + .with_request_info() # No filter - pause for all + .build() + ) + + # Run until we get a request info event + request_events: list[RequestInfoEvent] = [] + async for event in workflow.run_stream("test task"): + if isinstance(event, RequestInfoEvent) and isinstance(event.data, AgentInputRequest): + request_events.append(event) + break + + # Should pause before alpha + assert len(request_events) == 1 + assert request_events[0].data.target_agent_id is not None + assert "alpha" in request_events[0].data.target_agent_id + + +def test_group_chat_builder_with_request_info_returns_self(): + """Test that with_request_info() returns self for method chaining.""" + builder = GroupChatBuilder() + result = builder.with_request_info() + assert result is builder + + # Also test with agents parameter + builder2 = GroupChatBuilder() + result2 = builder2.with_request_info(agents=["test"]) + assert result2 is builder2 diff --git a/python/packages/core/tests/workflow/test_handoff.py b/python/packages/core/tests/workflow/test_handoff.py index 722885a6cb..0ceccfaf15 100644 --- a/python/packages/core/tests/workflow/test_handoff.py +++ b/python/packages/core/tests/workflow/test_handoff.py @@ -687,6 +687,54 @@ async def mock_get_response(messages: Any, **kwargs: Any) -> ChatResponse: assert str(last_tool_choice) == "required", f"Expected 'required', got {last_tool_choice}" +async def test_handoff_builder_with_request_info(): + """Test that HandoffBuilder supports request info via with_request_info().""" + from agent_framework import AgentInputRequest, RequestInfoEvent + + # Create test agents + coordinator = _RecordingAgent(name="coordinator") + specialist = _RecordingAgent(name="specialist") + + # Build workflow with request info enabled + workflow = ( + HandoffBuilder(participants=[coordinator, specialist]) + .set_coordinator("coordinator") + .with_termination_condition(lambda conv: len([m for m in conv if m.role == Role.USER]) >= 1) + .with_request_info() + .build() + ) + + # Run workflow until it pauses for request info + request_event: RequestInfoEvent | None = None + async for event in workflow.run_stream("Hello"): + if isinstance(event, RequestInfoEvent) and isinstance(event.data, AgentInputRequest): + request_event = event + + # Verify request info was emitted + assert request_event is not None, "Request info should have been emitted" + assert isinstance(request_event.data, AgentInputRequest) + + # Provide response and continue + output_events: list[WorkflowOutputEvent] = [] + async for event in workflow.send_responses_streaming({request_event.request_id: "approved"}): + if isinstance(event, WorkflowOutputEvent): + output_events.append(event) + + # Verify we got output events + assert len(output_events) > 0, "Should produce output events after response" + + +async def test_handoff_builder_with_request_info_method_chaining(): + """Test that with_request_info returns self for method chaining.""" + coordinator = _RecordingAgent(name="coordinator") + + builder = HandoffBuilder(participants=[coordinator]) + result = builder.with_request_info() + + assert result is builder, "with_request_info should return self for chaining" + assert builder._request_info_enabled is True # type: ignore + + async def test_return_to_previous_state_serialization(): """Test that return_to_previous state is properly serialized/deserialized for checkpointing.""" from agent_framework._workflows._handoff import _HandoffCoordinator # type: ignore[reportPrivateUsage] diff --git a/python/packages/core/tests/workflow/test_magentic.py b/python/packages/core/tests/workflow/test_magentic.py index e9f5dcf70d..71cfc6752a 100644 --- a/python/packages/core/tests/workflow/test_magentic.py +++ b/python/packages/core/tests/workflow/test_magentic.py @@ -857,3 +857,22 @@ async def test_magentic_checkpoint_runtime_overrides_buildtime() -> None: assert len(runtime_checkpoints) > 0, "Runtime storage should have checkpoints" assert len(buildtime_checkpoints) == 0, "Build-time storage should have no checkpoints when overridden" + + +def test_magentic_builder_does_not_have_human_input_hook(): + """Test that MagenticBuilder does not expose with_human_input_hook (uses specialized HITL instead). + + Magentic uses specialized human intervention mechanisms: + - with_plan_review() for plan approval + - with_human_input_on_stall() for stall intervention + - Tool approval via FunctionApprovalRequestContent + + These emit MagenticHumanInterventionRequest events with structured decision options. + """ + builder = MagenticBuilder() + + # MagenticBuilder should NOT have the generic human input hook mixin + assert not hasattr(builder, "with_human_input_hook"), ( + "MagenticBuilder should not have with_human_input_hook - " + "use with_plan_review() or with_human_input_on_stall() instead" + ) diff --git a/python/packages/core/tests/workflow/test_orchestration_request_info.py b/python/packages/core/tests/workflow/test_orchestration_request_info.py new file mode 100644 index 0000000000..e5f4d7a11f --- /dev/null +++ b/python/packages/core/tests/workflow/test_orchestration_request_info.py @@ -0,0 +1,168 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Unit tests for request info support in high-level builders.""" + +from typing import Any +from unittest.mock import MagicMock + +from agent_framework import ( + AgentInputRequest, + AgentProtocol, + AgentResponseReviewRequest, + ChatMessage, + RequestInfoInterceptor, + Role, +) +from agent_framework._workflows._executor import Executor, handler +from agent_framework._workflows._orchestration_request_info import resolve_request_info_filter +from agent_framework._workflows._workflow_context import WorkflowContext + + +class DummyExecutor(Executor): + """Dummy executor with a handler for testing.""" + + @handler + async def handle(self, data: str, ctx: WorkflowContext[Any, Any]) -> None: + pass + + +class TestResolveRequestInfoFilter: + """Tests for resolve_request_info_filter function.""" + + def test_returns_none_for_none_input(self): + """Test that None input returns None (no filtering).""" + result = resolve_request_info_filter(None) + assert result is None + + def test_returns_none_for_empty_list(self): + """Test that empty list returns None.""" + result = resolve_request_info_filter([]) + assert result is None + + def test_resolves_string_names(self): + """Test resolving string agent names.""" + result = resolve_request_info_filter(["agent1", "agent2"]) + assert result == {"agent1", "agent2"} + + def test_resolves_executor_ids(self): + """Test resolving Executor instances by ID.""" + exec1 = DummyExecutor(id="executor1") + exec2 = DummyExecutor(id="executor2") + + result = resolve_request_info_filter([exec1, exec2]) + assert result == {"executor1", "executor2"} + + def test_resolves_agent_names(self): + """Test resolving AgentProtocol-like objects by name attribute.""" + agent1 = MagicMock(spec=AgentProtocol) + agent1.name = "writer" + agent2 = MagicMock(spec=AgentProtocol) + agent2.name = "reviewer" + + result = resolve_request_info_filter([agent1, agent2]) + assert result == {"writer", "reviewer"} + + def test_mixed_types(self): + """Test resolving a mix of strings, agents, and executors.""" + agent = MagicMock(spec=AgentProtocol) + agent.name = "writer" + executor = DummyExecutor(id="custom_exec") + + result = resolve_request_info_filter(["manual_name", agent, executor]) + assert result == {"manual_name", "writer", "custom_exec"} + + def test_skips_agent_without_name(self): + """Test that agents without names are skipped.""" + agent_with_name = MagicMock(spec=AgentProtocol) + agent_with_name.name = "valid" + agent_without_name = MagicMock(spec=AgentProtocol) + agent_without_name.name = None + + result = resolve_request_info_filter([agent_with_name, agent_without_name]) + assert result == {"valid"} + + +class TestAgentInputRequest: + """Tests for AgentInputRequest dataclass (formerly AgentResponseReviewRequest).""" + + def test_create_request(self): + """Test creating an AgentInputRequest with all fields.""" + conversation = [ChatMessage(role=Role.USER, text="Hello")] + request = AgentInputRequest( + target_agent_id="test_agent", + conversation=conversation, + instruction="Review this", + metadata={"key": "value"}, + ) + + assert request.target_agent_id == "test_agent" + assert request.conversation == conversation + assert request.instruction == "Review this" + assert request.metadata == {"key": "value"} + + def test_create_request_defaults(self): + """Test creating an AgentInputRequest with default values.""" + request = AgentInputRequest(target_agent_id="test_agent") + + assert request.target_agent_id == "test_agent" + assert request.conversation == [] + assert request.instruction is None + assert request.metadata == {} + + def test_backward_compatibility_alias(self): + """Test that AgentResponseReviewRequest is an alias for AgentInputRequest.""" + assert AgentResponseReviewRequest is AgentInputRequest + + +class TestRequestInfoInterceptor: + """Tests for RequestInfoInterceptor executor.""" + + def test_interceptor_creation_generates_unique_id(self): + """Test creating a RequestInfoInterceptor generates unique IDs.""" + interceptor1 = RequestInfoInterceptor() + interceptor2 = RequestInfoInterceptor() + assert interceptor1.id.startswith("request_info_interceptor-") + assert interceptor2.id.startswith("request_info_interceptor-") + assert interceptor1.id != interceptor2.id + + def test_interceptor_with_custom_id(self): + """Test creating a RequestInfoInterceptor with custom ID.""" + interceptor = RequestInfoInterceptor(executor_id="custom_review") + assert interceptor.id == "custom_review" + + def test_interceptor_with_agent_filter(self): + """Test creating a RequestInfoInterceptor with agent filter.""" + agent_filter = {"agent1", "agent2"} + interceptor = RequestInfoInterceptor( + executor_id="filtered_review", + agent_filter=agent_filter, + ) + assert interceptor.id == "filtered_review" + assert interceptor._agent_filter == agent_filter + + def test_should_pause_for_agent_no_filter(self): + """Test that interceptor pauses for all agents when no filter is set.""" + interceptor = RequestInfoInterceptor() + assert interceptor._should_pause_for_agent("any_agent") is True + assert interceptor._should_pause_for_agent("another_agent") is True + assert interceptor._should_pause_for_agent(None) is True + + def test_should_pause_for_agent_with_filter(self): + """Test that interceptor only pauses for agents in the filter.""" + agent_filter = {"writer", "reviewer"} + interceptor = RequestInfoInterceptor(agent_filter=agent_filter) + + assert interceptor._should_pause_for_agent("writer") is True + assert interceptor._should_pause_for_agent("reviewer") is True + assert interceptor._should_pause_for_agent("drafter") is False + assert interceptor._should_pause_for_agent(None) is False + + def test_should_pause_for_agent_with_prefixed_id(self): + """Test that filter matches agent names in prefixed executor IDs.""" + agent_filter = {"writer"} + interceptor = RequestInfoInterceptor(agent_filter=agent_filter) + + # Should match the name portion after the colon + assert interceptor._should_pause_for_agent("groupchat_agent:writer") is True + assert interceptor._should_pause_for_agent("request_info:writer") is True + assert interceptor._should_pause_for_agent("groupchat_agent:editor") is False diff --git a/python/packages/core/tests/workflow/test_workflow_builder.py b/python/packages/core/tests/workflow/test_workflow_builder.py index b85ca5d787..a037bf51b6 100644 --- a/python/packages/core/tests/workflow/test_workflow_builder.py +++ b/python/packages/core/tests/workflow/test_workflow_builder.py @@ -111,7 +111,8 @@ def test_add_agent_with_custom_parameters(): builder = WorkflowBuilder() # Add agent with custom parameters - result = builder.add_agent(agent, output_response=True, id="my_custom_id") + with pytest.deprecated_call(): + result = builder.add_agent(agent, output_response=True, id="my_custom_id") # Verify that add_agent returns the builder for chaining assert result is builder @@ -133,7 +134,8 @@ def test_add_agent_reuses_same_wrapper(): builder = WorkflowBuilder() # Add agent with specific parameters - builder.add_agent(agent, output_response=True, id="agent_exec") + with pytest.deprecated_call(): + builder.add_agent(agent, output_response=True, id="agent_exec") # Use the same agent instance in add_edge - should reuse the same wrapper builder.set_start_executor(agent) @@ -158,8 +160,9 @@ def test_add_agent_then_use_in_edges(): builder = WorkflowBuilder() # Add agents with specific settings - builder.add_agent(agent1, output_response=False, id="exec1") - builder.add_agent(agent2, output_response=True, id="exec2") + with pytest.deprecated_call(): + builder.add_agent(agent1, output_response=False, id="exec1") + builder.add_agent(agent2, output_response=True, id="exec2") # Use the same agent instances to create edges workflow = builder.set_start_executor(agent1).add_edge(agent1, agent2).build() @@ -183,7 +186,8 @@ def test_add_agent_without_explicit_id_uses_agent_name(): agent = DummyAgent(id="agent_x", name="named_agent") builder = WorkflowBuilder() - result = builder.add_agent(agent) + with pytest.deprecated_call(): + result = builder.add_agent(agent) # Verify that add_agent returns the builder for chaining assert result is builder @@ -203,10 +207,11 @@ def test_add_agent_duplicate_id_raises_error(): builder = WorkflowBuilder() # Add first agent - builder.add_agent(agent1) + with pytest.deprecated_call(): + builder.add_agent(agent1) # Adding second agent with same name should raise ValueError - with pytest.raises(ValueError, match="Duplicate executor ID"): + with pytest.deprecated_call(), pytest.raises(ValueError, match="Duplicate executor ID"): builder.add_agent(agent2) diff --git a/python/samples/SAMPLE_GUIDELINES.md b/python/samples/SAMPLE_GUIDELINES.md index 05c567a008..e8c1589ef0 100644 --- a/python/samples/SAMPLE_GUIDELINES.md +++ b/python/samples/SAMPLE_GUIDELINES.md @@ -33,7 +33,7 @@ Try to over-document the samples. This includes comments in the code, README.md For the getting started samples and the concept samples, we should have the following: 1. A README.md file is included in each set of samples that explains the purpose of the samples and the setup required to run them. -2. A summary should be included at the top of the file that explains the purpose of the sample and required components/concepts to understand the sample. For example: +2. A summary should be included underneath the imports that explains the purpose of the sample and required components/concepts to understand the sample. For example: ```python ''' diff --git a/python/samples/getting_started/workflows/README.md b/python/samples/getting_started/workflows/README.md index f22d993669..e1e18eab91 100644 --- a/python/samples/getting_started/workflows/README.md +++ b/python/samples/getting_started/workflows/README.md @@ -78,9 +78,22 @@ Once comfortable with these, explore the rest of the samples below. | Sample | File | Concepts | |---|---|---| -| Human-In-The-Loop (Guessing Game) | [human-in-the-loop/guessing_game_with_human_input.py](./human-in-the-loop/guessing_game_with_human_input.py) | Interactive request/response prompts with a human | -| Azure Agents Tool Feedback Loop | [agents/azure_chat_agents_tool_calls_with_feedback.py](./agents/azure_chat_agents_tool_calls_with_feedback.py) | Two-agent workflow that streams tool calls and pauses for human guidance between passes | +| Human-In-The-Loop (Guessing Game) | [human-in-the-loop/guessing_game_with_human_input.py](./human-in-the-loop/guessing_game_with_human_input.py) | Interactive request/response prompts with a human via `ctx.request_info()` | | Agents with Approval Requests in Workflows | [human-in-the-loop/agents_with_approval_requests.py](./human-in-the-loop/agents_with_approval_requests.py) | Agents that create approval requests during workflow execution and wait for human approval to proceed | +| SequentialBuilder Request Info | [human-in-the-loop/sequential_request_info.py](./human-in-the-loop/sequential_request_info.py) | Request info for agent responses mid-workflow using `.with_request_info()` on SequentialBuilder | +| ConcurrentBuilder Request Info | [human-in-the-loop/concurrent_request_info.py](./human-in-the-loop/concurrent_request_info.py) | Review concurrent agent outputs before aggregation using `.with_request_info()` on ConcurrentBuilder | +| GroupChatBuilder Request Info | [human-in-the-loop/group_chat_request_info.py](./human-in-the-loop/group_chat_request_info.py) | Steer group discussions with periodic guidance using `.with_request_info()` on GroupChatBuilder | + + +### tool-approval + +Tool approval samples demonstrate using `@ai_function(approval_mode="always_require")` to gate sensitive tool executions with human approval. These work with the high-level builder APIs. + +| Sample | File | Concepts | +|---|---|---| +| SequentialBuilder Tool Approval | [tool-approval/sequential_builder_tool_approval.py](./tool-approval/sequential_builder_tool_approval.py) | Sequential workflow with tool approval gates for sensitive operations | +| ConcurrentBuilder Tool Approval | [tool-approval/concurrent_builder_tool_approval.py](./tool-approval/concurrent_builder_tool_approval.py) | Concurrent workflow with tool approvals across parallel agents | +| GroupChatBuilder Tool Approval | [tool-approval/group_chat_builder_tool_approval.py](./tool-approval/group_chat_builder_tool_approval.py) | Group chat workflow with tool approval for multi-agent collaboration | ### observability diff --git a/python/samples/getting_started/workflows/human-in-the-loop/concurrent_request_info.py b/python/samples/getting_started/workflows/human-in-the-loop/concurrent_request_info.py new file mode 100644 index 0000000000..c8c4f40e41 --- /dev/null +++ b/python/samples/getting_started/workflows/human-in-the-loop/concurrent_request_info.py @@ -0,0 +1,198 @@ +# Copyright (c) Microsoft. All rights reserved. + +""" +Sample: Request Info with ConcurrentBuilder + +This sample demonstrates using the `.with_request_info()` method to pause a +ConcurrentBuilder workflow AFTER all parallel agents complete but BEFORE +aggregation, allowing human review and modification of the combined results. + +Purpose: +Show how to use the request info API that pauses after concurrent agents run, +allowing review and steering of results before they are aggregated. + +Demonstrate: +- Configuring request info with `.with_request_info()` +- Reviewing outputs from multiple concurrent agents +- Injecting human guidance after agents execute but before aggregation + +Prerequisites: +- Azure OpenAI configured for AzureOpenAIChatClient with required environment variables +- Authentication via azure-identity (run az login before executing) +""" + +import asyncio +from typing import Any + +from agent_framework import ( + AgentInputRequest, + ChatMessage, + ConcurrentBuilder, + RequestInfoEvent, + Role, + WorkflowOutputEvent, + WorkflowRunState, + WorkflowStatusEvent, +) +from agent_framework._workflows._agent_executor import AgentExecutorResponse +from agent_framework.azure import AzureOpenAIChatClient +from azure.identity import AzureCliCredential + +# Store chat client at module level for aggregator access +_chat_client: AzureOpenAIChatClient | None = None + + +async def aggregate_with_synthesis(results: list[AgentExecutorResponse]) -> Any: + """Custom aggregator that synthesizes concurrent agent outputs using an LLM. + + This aggregator extracts the outputs from each parallel agent and uses the + chat client to create a unified summary, incorporating any human feedback + that was injected into the conversation. + + Args: + results: List of responses from all concurrent agents + + Returns: + The synthesized summary text + """ + if not _chat_client: + return "Error: Chat client not initialized" + + # Extract each agent's final output + expert_sections: list[str] = [] + human_guidance = "" + + for r in results: + try: + messages = getattr(r.agent_run_response, "messages", []) + final_text = messages[-1].text if messages and hasattr(messages[-1], "text") else "(no content)" + expert_sections.append(f"{getattr(r, 'executor_id', 'analyst')}:\n{final_text}") + + # Check for human feedback in the conversation (will be last user message if present) + if r.full_conversation: + for msg in reversed(r.full_conversation): + if msg.role == Role.USER and msg.text and "perspectives" not in msg.text.lower(): + human_guidance = msg.text + break + except Exception: + expert_sections.append(f"{getattr(r, 'executor_id', 'analyst')}: (error extracting output)") + + # Build prompt with human guidance if provided + guidance_text = f"\n\nHuman guidance: {human_guidance}" if human_guidance else "" + + system_msg = ChatMessage( + Role.SYSTEM, + text=( + "You are a synthesis expert. Consolidate the following analyst perspectives " + "into one cohesive, balanced summary (3-4 sentences). If human guidance is provided, " + "prioritize aspects as directed." + ), + ) + user_msg = ChatMessage(Role.USER, text="\n\n".join(expert_sections) + guidance_text) + + response = await _chat_client.get_response([system_msg, user_msg]) + return response.messages[-1].text if response.messages else "" + + +async def main() -> None: + global _chat_client + _chat_client = AzureOpenAIChatClient(credential=AzureCliCredential()) + + # Create agents that analyze from different perspectives + technical_analyst = _chat_client.create_agent( + name="technical_analyst", + instructions=( + "You are a technical analyst. When given a topic, provide a technical " + "perspective focusing on implementation details, performance, and architecture. " + "Keep your analysis to 2-3 sentences." + ), + ) + + business_analyst = _chat_client.create_agent( + name="business_analyst", + instructions=( + "You are a business analyst. When given a topic, provide a business " + "perspective focusing on ROI, market impact, and strategic value. " + "Keep your analysis to 2-3 sentences." + ), + ) + + user_experience_analyst = _chat_client.create_agent( + name="ux_analyst", + instructions=( + "You are a UX analyst. When given a topic, provide a user experience " + "perspective focusing on usability, accessibility, and user satisfaction. " + "Keep your analysis to 2-3 sentences." + ), + ) + + # Build workflow with request info enabled and custom aggregator + workflow = ( + ConcurrentBuilder() + .participants([technical_analyst, business_analyst, user_experience_analyst]) + .with_aggregator(aggregate_with_synthesis) + .with_request_info() + .build() + ) + + # Run the workflow with human-in-the-loop + pending_responses: dict[str, str] | None = None + workflow_complete = False + + print("Starting multi-perspective analysis workflow...") + print("=" * 60) + + while not workflow_complete: + # Run or continue the workflow + stream = ( + workflow.send_responses_streaming(pending_responses) + if pending_responses + else workflow.run_stream("Analyze the impact of large language models on software development.") + ) + + pending_responses = None + + # Process events + async for event in stream: + if isinstance(event, RequestInfoEvent): + if isinstance(event.data, AgentInputRequest): + # Display pre-execution context for steering concurrent agents + print("\n" + "-" * 40) + print("INPUT REQUESTED (BEFORE CONCURRENT AGENTS)") + print("-" * 40) + print(f"About to call agents: {event.data.target_agent_id}") + print("Conversation context:") + recent = ( + event.data.conversation[-2:] if len(event.data.conversation) > 2 else event.data.conversation + ) + for msg in recent: + role = msg.role.value if msg.role else "unknown" + text = (msg.text or "")[:150] + print(f" [{role}]: {text}...") + print("-" * 40) + + # Get human input to steer all agents + user_input = input("Your guidance for the analysts (or 'skip' to continue): ") # noqa: ASYNC250 + if user_input.lower() == "skip": + user_input = "Please analyze objectively from your unique perspective." + + pending_responses = {event.request_id: user_input} + print("(Resuming workflow...)") + + elif isinstance(event, WorkflowOutputEvent): + print("\n" + "=" * 60) + print("WORKFLOW COMPLETE") + print("=" * 60) + print("Aggregated output:") + # Custom aggregator returns a string + if event.data: + print(event.data) + workflow_complete = True + + elif isinstance(event, WorkflowStatusEvent): + if event.state == WorkflowRunState.IDLE: + workflow_complete = True + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/samples/getting_started/workflows/human-in-the-loop/group_chat_request_info.py b/python/samples/getting_started/workflows/human-in-the-loop/group_chat_request_info.py new file mode 100644 index 0000000000..c3a193a6a8 --- /dev/null +++ b/python/samples/getting_started/workflows/human-in-the-loop/group_chat_request_info.py @@ -0,0 +1,175 @@ +# Copyright (c) Microsoft. All rights reserved. + +""" +Sample: Request Info with GroupChatBuilder + +This sample demonstrates using the `.with_request_info()` method to pause a +GroupChatBuilder workflow BEFORE specific participants speak. By using the +`agents=` filter parameter, you can target only certain participants rather +than pausing before every turn. + +Purpose: +Show how to use the request info API with selective filtering to pause before +specific participants speak, allowing human input to steer their response. + +Demonstrate: +- Configuring request info with `.with_request_info(agents=[...])` +- Using agent filtering to reduce interruptions +- Steering agent behavior with pre-agent human input + +Prerequisites: +- Azure OpenAI configured for AzureOpenAIChatClient with required environment variables +- Authentication via azure-identity (run az login before executing) +""" + +import asyncio + +from agent_framework import ( + AgentInputRequest, + AgentRunUpdateEvent, + ChatMessage, + GroupChatBuilder, + RequestInfoEvent, + WorkflowOutputEvent, + WorkflowRunState, + WorkflowStatusEvent, +) +from agent_framework.azure import AzureOpenAIChatClient +from azure.identity import AzureCliCredential + + +async def main() -> None: + chat_client = AzureOpenAIChatClient(credential=AzureCliCredential()) + + # Create agents for a group discussion + optimist = chat_client.create_agent( + name="optimist", + instructions=( + "You are an optimistic team member. You see opportunities and potential " + "in ideas. Engage constructively with the discussion, building on others' " + "points while maintaining a positive outlook. Keep responses to 2-3 sentences." + ), + ) + + pragmatist = chat_client.create_agent( + name="pragmatist", + instructions=( + "You are a pragmatic team member. You focus on practical implementation " + "and realistic timelines. Sometimes you disagree with overly optimistic views. " + "Keep responses to 2-3 sentences." + ), + ) + + creative = chat_client.create_agent( + name="creative", + instructions=( + "You are a creative team member. You propose innovative solutions and " + "think outside the box. You may suggest alternatives to conventional approaches. " + "Keep responses to 2-3 sentences." + ), + ) + + # Manager orchestrates the discussion + manager = chat_client.create_agent( + name="manager", + instructions=( + "You are a discussion manager coordinating a team conversation between optimist, " + "pragmatist, and creative. Your job is to select who speaks next.\n\n" + "RULES:\n" + "1. Rotate through ALL participants - do not favor any single participant\n" + "2. Each participant should speak at least once before any participant speaks twice\n" + "3. If human feedback redirects the topic, acknowledge it and continue rotating\n" + "4. Continue for at least 5 participant turns before concluding\n" + "5. Do NOT select the same participant twice in a row" + ), + ) + + # Build workflow with request info enabled + # Using agents= filter to only pause before pragmatist speaks (not every turn) + workflow = ( + GroupChatBuilder() + .set_manager(manager=manager, display_name="Discussion Manager") + .participants([optimist, pragmatist, creative]) + .with_max_rounds(6) + .with_request_info(agents=[pragmatist]) # Only pause before pragmatist speaks + .build() + ) + + # Run the workflow with human-in-the-loop + pending_responses: dict[str, str] | None = None + workflow_complete = False + current_agent: str | None = None # Track current streaming agent + + print("Starting group discussion workflow...") + print("=" * 60) + + while not workflow_complete: + # Run or continue the workflow + stream = ( + workflow.send_responses_streaming(pending_responses) + if pending_responses + else workflow.run_stream( + "Discuss how our team should approach adopting AI tools for productivity. " + "Consider benefits, risks, and implementation strategies." + ) + ) + + pending_responses = None + + # Process events + async for event in stream: + if isinstance(event, AgentRunUpdateEvent): + # Show all agent responses as they stream + if event.data and event.data.text: + agent_name = event.data.author_name or "unknown" + # Print agent name header only when agent changes + if agent_name != current_agent: + current_agent = agent_name + print(f"\n[{agent_name}]: ", end="", flush=True) + print(event.data.text, end="", flush=True) + + elif isinstance(event, RequestInfoEvent): + current_agent = None # Reset for next agent + if isinstance(event.data, AgentInputRequest): + # Display pre-agent context for human input + print("\n" + "-" * 40) + print("INPUT REQUESTED") + print(f"About to call agent: {event.data.target_agent_id}") + print("-" * 40) + print("Conversation context:") + recent = ( + event.data.conversation[-3:] if len(event.data.conversation) > 3 else event.data.conversation + ) + for msg in recent: + role = msg.role.value if msg.role else "unknown" + text = (msg.text or "")[:100] + print(f" [{role}]: {text}...") + print("-" * 40) + + # Get human input to steer the agent + user_input = input("Steer the discussion (or 'skip' to continue): ") # noqa: ASYNC250 + if user_input.lower() == "skip": + user_input = "Please continue the discussion naturally." + + pending_responses = {event.request_id: user_input} + print("(Resuming discussion...)") + + elif isinstance(event, WorkflowOutputEvent): + print("\n" + "=" * 60) + print("DISCUSSION COMPLETE") + print("=" * 60) + print("Final conversation:") + if event.data: + messages: list[ChatMessage] = event.data[-4:] + for msg in messages: + role = msg.role.value if msg.role else "unknown" + text = (msg.text or "")[:200] + print(f"[{role}]: {text}...") + workflow_complete = True + + elif isinstance(event, WorkflowStatusEvent) and event.state == WorkflowRunState.IDLE: + workflow_complete = True + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/samples/getting_started/workflows/human-in-the-loop/sequential_request_info.py b/python/samples/getting_started/workflows/human-in-the-loop/sequential_request_info.py new file mode 100644 index 0000000000..55c8652984 --- /dev/null +++ b/python/samples/getting_started/workflows/human-in-the-loop/sequential_request_info.py @@ -0,0 +1,128 @@ +# Copyright (c) Microsoft. All rights reserved. + +""" +Sample: Request Info with SequentialBuilder + +This sample demonstrates using the `.with_request_info()` method to pause a +SequentialBuilder workflow BEFORE each agent runs, allowing external input +(e.g., human steering) before the agent responds. + +Purpose: +Show how to use the request info API that pauses before every agent response, +using the standard request_info pattern for consistency. + +Demonstrate: +- Configuring request info with `.with_request_info()` +- Handling RequestInfoEvent with AgentInputRequest data +- Injecting responses back into the workflow via send_responses_streaming + +Prerequisites: +- Azure OpenAI configured for AzureOpenAIChatClient with required environment variables +- Authentication via azure-identity (run az login before executing) +""" + +import asyncio + +from agent_framework import ( + AgentInputRequest, + ChatMessage, + RequestInfoEvent, + SequentialBuilder, + WorkflowOutputEvent, + WorkflowRunState, + WorkflowStatusEvent, +) +from agent_framework.azure import AzureOpenAIChatClient +from azure.identity import AzureCliCredential + + +async def main() -> None: + chat_client = AzureOpenAIChatClient(credential=AzureCliCredential()) + + # Create agents for a sequential document review workflow + drafter = chat_client.create_agent( + name="drafter", + instructions=("You are a document drafter. When given a topic, create a brief draft (2-3 sentences)."), + ) + + editor = chat_client.create_agent( + name="editor", + instructions=( + "You are an editor. Review the draft and suggest improvements. " + "Incorporate any human feedback that was provided." + ), + ) + + finalizer = chat_client.create_agent( + name="finalizer", + instructions=( + "You are a finalizer. Take the edited content and create a polished final version. " + "Incorporate any additional feedback provided." + ), + ) + + # Build workflow with request info enabled (pauses before each agent) + workflow = SequentialBuilder().participants([drafter, editor, finalizer]).with_request_info().build() + + # Run the workflow with request info handling + pending_responses: dict[str, str] | None = None + workflow_complete = False + + print("Starting document review workflow...") + print("=" * 60) + + while not workflow_complete: + # Run or continue the workflow + stream = ( + workflow.send_responses_streaming(pending_responses) + if pending_responses + else workflow.run_stream("Write a brief introduction to artificial intelligence.") + ) + + pending_responses = None + + # Process events + async for event in stream: + if isinstance(event, RequestInfoEvent): + if isinstance(event.data, AgentInputRequest): + # Display pre-agent context for steering + print("\n" + "-" * 40) + print("REQUEST INFO: INPUT REQUESTED") + print(f"About to call agent: {event.data.target_agent_id}") + print("-" * 40) + print("Conversation context:") + recent = ( + event.data.conversation[-2:] if len(event.data.conversation) > 2 else event.data.conversation + ) + for msg in recent: + role = msg.role.value if msg.role else "unknown" + text = (msg.text or "")[:150] + print(f" [{role}]: {text}...") + print("-" * 40) + + # Get input to steer the agent + user_input = input("Your guidance (or 'skip' to continue): ") # noqa: ASYNC250 + if user_input.lower() == "skip": + user_input = "Please continue naturally." + + pending_responses = {event.request_id: user_input} + print("(Resuming workflow...)") + + elif isinstance(event, WorkflowOutputEvent): + print("\n" + "=" * 60) + print("WORKFLOW COMPLETE") + print("=" * 60) + print("Final output:") + if event.data: + messages: list[ChatMessage] = event.data[-3:] + for msg in messages: + role = msg.role.value if msg.role else "unknown" + print(f"[{role}]: {msg.text}") + workflow_complete = True + + elif isinstance(event, WorkflowStatusEvent) and event.state == WorkflowRunState.IDLE: + workflow_complete = True + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/samples/getting_started/workflows/tool-approval/concurrent_builder_tool_approval.py b/python/samples/getting_started/workflows/tool-approval/concurrent_builder_tool_approval.py new file mode 100644 index 0000000000..e4092414fc --- /dev/null +++ b/python/samples/getting_started/workflows/tool-approval/concurrent_builder_tool_approval.py @@ -0,0 +1,183 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +from typing import Annotated + +from agent_framework import ( + ChatMessage, + ConcurrentBuilder, + FunctionApprovalRequestContent, + FunctionApprovalResponseContent, + RequestInfoEvent, + WorkflowOutputEvent, + WorkflowRunState, + WorkflowStatusEvent, + ai_function, +) +from agent_framework.openai import OpenAIChatClient + +""" +Sample: Concurrent Workflow with Tool Approval Requests + +This sample demonstrates how to use ConcurrentBuilder with tools that require human +approval before execution. Multiple agents run in parallel, and any tool requiring +approval will pause the workflow until the human responds. + +This sample works as follows: +1. A ConcurrentBuilder workflow is created with two agents running in parallel. +2. One agent has a tool requiring approval (financial transaction). +3. The other agent has only non-approval tools (market data lookup). +4. Both agents receive the same task and work concurrently. +5. When the financial agent tries to execute a trade, it triggers an approval request. +6. The sample simulates human approval and the workflow completes. +7. Results from both agents are aggregated and output. + +Purpose: +Show how tool call approvals work in parallel execution scenarios where only some +agents have sensitive tools. + +Demonstrate: +- Combining agents with and without approval-required tools in concurrent workflows. +- Handling RequestInfoEvent during concurrent agent execution. +- Understanding that approval pauses only the agent that triggered it, not all agents. + +Prerequisites: +- OpenAI or Azure OpenAI configured with the required environment variables. +- Basic familiarity with ConcurrentBuilder and streaming workflow events. +""" + + +# 1. Define tools for the research agent (no approval required) +@ai_function +def get_stock_price(symbol: Annotated[str, "The stock ticker symbol"]) -> str: + """Get the current stock price for a given symbol.""" + # Mock data for demonstration + prices = {"AAPL": 175.50, "GOOGL": 140.25, "MSFT": 378.90, "AMZN": 178.75} + price = prices.get(symbol.upper(), 100.00) + return f"{symbol.upper()}: ${price:.2f}" + + +@ai_function +def get_market_sentiment(symbol: Annotated[str, "The stock ticker symbol"]) -> str: + """Get market sentiment analysis for a stock.""" + # Mock sentiment data + return f"Market sentiment for {symbol.upper()}: Bullish (72% positive mentions in last 24h)" + + +# 2. Define tools for the trading agent (approval required for trades) +@ai_function(approval_mode="always_require") +def execute_trade( + symbol: Annotated[str, "The stock ticker symbol"], + action: Annotated[str, "Either 'buy' or 'sell'"], + quantity: Annotated[int, "Number of shares to trade"], +) -> str: + """Execute a stock trade. Requires human approval due to financial impact.""" + return f"Trade executed: {action.upper()} {quantity} shares of {symbol.upper()}" + + +@ai_function +def get_portfolio_balance() -> str: + """Get current portfolio balance and available funds.""" + return "Portfolio: $50,000 invested, $10,000 cash available" + + +async def main() -> None: + # 3. Create two agents with different tool sets + chat_client = OpenAIChatClient() + + research_agent = chat_client.create_agent( + name="ResearchAgent", + instructions=( + "You are a market research analyst. Analyze stock data and provide " + "recommendations based on price and sentiment. Do not execute trades." + ), + tools=[get_stock_price, get_market_sentiment], + ) + + trading_agent = chat_client.create_agent( + name="TradingAgent", + instructions=( + "You are a trading assistant. When asked to buy or sell shares, you MUST " + "call the execute_trade function to complete the transaction. Check portfolio " + "balance first, then execute the requested trade." + ), + tools=[get_portfolio_balance, execute_trade], + ) + + # 4. Build a concurrent workflow with both agents + # ConcurrentBuilder requires at least 2 participants for fan-out + workflow = ConcurrentBuilder().participants([research_agent, trading_agent]).build() + + # 5. Start the workflow - both agents will process the same task in parallel + print("Starting concurrent workflow with tool approval...") + print("Two agents will analyze MSFT - one for research, one for trading.") + print("-" * 60) + + # Phase 1: Run workflow and collect all events (stream ends at IDLE or IDLE_WITH_PENDING_REQUESTS) + request_info_events: list[RequestInfoEvent] = [] + workflow_completed_without_approvals = False + async for event in workflow.run_stream("Analyze MSFT stock and if sentiment is positive, buy 10 shares."): + if isinstance(event, RequestInfoEvent): + request_info_events.append(event) + if isinstance(event.data, FunctionApprovalRequestContent): + print(f"\nApproval requested for tool: {event.data.function_call.name}") + print(f" Arguments: {event.data.function_call.arguments}") + elif isinstance(event, WorkflowStatusEvent) and event.state == WorkflowRunState.IDLE: + workflow_completed_without_approvals = True + + # 6. Handle approval requests (if any) + if request_info_events: + responses: dict[str, FunctionApprovalResponseContent] = {} + for request_event in request_info_events: + if isinstance(request_event.data, FunctionApprovalRequestContent): + print(f"\nSimulating human approval for: {request_event.data.function_call.name}") + # Create approval response + responses[request_event.request_id] = request_event.data.create_response(approved=True) + + if responses: + # Phase 2: Send all approvals and continue workflow + output: list[ChatMessage] | None = None + async for event in workflow.send_responses_streaming(responses): + if isinstance(event, WorkflowOutputEvent): + output = event.data + + if output: + print("\n" + "-" * 60) + print("Workflow completed. Aggregated results from both agents:") + for msg in output: + if hasattr(msg, "author_name") and msg.author_name: + print(f"\n[{msg.author_name}]:") + text = msg.text[:300] + "..." if len(msg.text) > 300 else msg.text + if text: + print(f" {text}") + elif workflow_completed_without_approvals: + print("\nWorkflow completed without requiring approvals.") + print("(The trading agent may have only checked balance without executing a trade)") + + """ + Sample Output: + Starting concurrent workflow with tool approval... + Two agents will analyze MSFT - one for research, one for trading. + ------------------------------------------------------------ + + Approval requested for tool: execute_trade + Arguments: {"symbol": "MSFT", "action": "buy", "quantity": 10} + Simulating human approval for: execute_trade + + ------------------------------------------------------------ + Workflow completed. Aggregated results from both agents: + + [ResearchAgent]: + MSFT is currently trading at $175.50 with bullish market sentiment + (72% positive mentions). Based on the positive sentiment, this could + be a good opportunity to consider buying. + + [TradingAgent]: + I've checked your portfolio balance ($10,000 cash available) and + executed the trade: BUY 10 shares of MSFT at approximately $175.50 + per share, totaling ~$1,755. + """ + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/samples/getting_started/workflows/tool-approval/group_chat_builder_tool_approval.py b/python/samples/getting_started/workflows/tool-approval/group_chat_builder_tool_approval.py new file mode 100644 index 0000000000..565002c794 --- /dev/null +++ b/python/samples/getting_started/workflows/tool-approval/group_chat_builder_tool_approval.py @@ -0,0 +1,206 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +from typing import Annotated + +from agent_framework import ( + FunctionApprovalRequestContent, + GroupChatBuilder, + GroupChatStateSnapshot, + RequestInfoEvent, + ai_function, +) +from agent_framework.openai import OpenAIChatClient + +""" +Sample: Group Chat Workflow with Tool Approval Requests + +This sample demonstrates how to use GroupChatBuilder with tools that require human +approval before execution. A group of specialized agents collaborate on a task, and +sensitive tool calls trigger human-in-the-loop approval. + +This sample works as follows: +1. A GroupChatBuilder workflow is created with multiple specialized agents. +2. A selector function determines which agent speaks next based on conversation state. +3. Agents collaborate on a software deployment task. +4. When the deployment agent tries to deploy to production, it triggers an approval request. +5. The sample simulates human approval and the workflow completes. + +Purpose: +Show how tool call approvals integrate with multi-agent group chat workflows where +different agents have different levels of tool access. + +Demonstrate: +- Using set_select_speakers_func with agents that have approval-required tools. +- Handling RequestInfoEvent in group chat scenarios. +- Multi-round group chat with tool approval interruption and resumption. + +Prerequisites: +- OpenAI or Azure OpenAI configured with the required environment variables. +- Basic familiarity with GroupChatBuilder and streaming workflow events. +""" + + +# 1. Define tools for different agents +@ai_function +def run_tests(test_suite: Annotated[str, "Name of the test suite to run"]) -> str: + """Run automated tests for the application.""" + return f"Test suite '{test_suite}' completed: 47 passed, 0 failed, 0 skipped" + + +@ai_function +def check_staging_status() -> str: + """Check the current status of the staging environment.""" + return "Staging environment: Healthy, Version 2.3.0 deployed, All services running" + + +@ai_function(approval_mode="always_require") +def deploy_to_production( + version: Annotated[str, "The version to deploy"], + components: Annotated[str, "Comma-separated list of components to deploy"], +) -> str: + """Deploy specified components to production. Requires human approval.""" + return f"Production deployment complete: Version {version}, Components: {components}" + + +@ai_function +def create_rollback_plan(version: Annotated[str, "The version being deployed"]) -> str: + """Create a rollback plan for the deployment.""" + return ( + f"Rollback plan created for version {version}: " + "Automated rollback to v2.2.0 if health checks fail within 5 minutes" + ) + + +# 2. Define the speaker selector function +def select_next_speaker(state: GroupChatStateSnapshot) -> str | None: + """Select the next speaker based on the conversation flow. + + This simple selector follows a predefined flow: + 1. QA Engineer runs tests + 2. DevOps Engineer checks staging and creates rollback plan + 3. DevOps Engineer deploys to production (triggers approval) + """ + round_index: int = state["round_index"] + + # Define the conversation flow + speaker_order: list[str] = [ + "QAEngineer", # Round 0: Run tests + "DevOpsEngineer", # Round 1: Check staging, create rollback + "DevOpsEngineer", # Round 2: Deploy to production (approval required) + ] + + if round_index >= len(speaker_order): + return None # End the conversation + + return speaker_order[round_index] + + +async def main() -> None: + # 3. Create specialized agents + chat_client = OpenAIChatClient() + + qa_engineer = chat_client.create_agent( + name="QAEngineer", + instructions=( + "You are a QA engineer responsible for running tests before deployment. " + "Run the appropriate test suites and report results clearly." + ), + tools=[run_tests], + ) + + devops_engineer = chat_client.create_agent( + name="DevOpsEngineer", + instructions=( + "You are a DevOps engineer responsible for deployments. First check staging " + "status and create a rollback plan, then proceed with production deployment. " + "Always ensure safety measures are in place before deploying." + ), + tools=[check_staging_status, create_rollback_plan, deploy_to_production], + ) + + # 4. Build a group chat workflow with the selector function + workflow = ( + GroupChatBuilder() + # Optionally, use `.set_manager(...)` to customize the group chat manager + .set_select_speakers_func(select_next_speaker) + .participants([qa_engineer, devops_engineer]) + .with_max_rounds(5) + .build() + ) + + # 5. Start the workflow + print("Starting group chat workflow for software deployment...") + print("Agents: QA Engineer, DevOps Engineer") + print("-" * 60) + + # Phase 1: Run workflow and collect all events (stream ends at IDLE or IDLE_WITH_PENDING_REQUESTS) + request_info_events: list[RequestInfoEvent] = [] + async for event in workflow.run_stream( + "We need to deploy version 2.4.0 to production. Please coordinate the deployment." + ): + if isinstance(event, RequestInfoEvent): + request_info_events.append(event) + if isinstance(event.data, FunctionApprovalRequestContent): + print("\n[APPROVAL REQUIRED]") + print(f" Tool: {event.data.function_call.name}") + print(f" Arguments: {event.data.function_call.arguments}") + + # 6. Handle approval requests + if request_info_events: + for request_event in request_info_events: + if isinstance(request_event.data, FunctionApprovalRequestContent): + print("\n" + "=" * 60) + print("Human review required for production deployment!") + print("In a real scenario, you would review the deployment details here.") + print("Simulating approval for demo purposes...") + print("=" * 60) + + # Create approval response + approval_response = request_event.data.create_response(approved=True) + + # Phase 2: Send approval and continue workflow + async for _ in workflow.send_responses_streaming({request_event.request_id: approval_response}): + pass # Consume all events + + print("\n" + "-" * 60) + print("Deployment workflow completed successfully!") + print("All agents have finished their tasks.") + else: + print("\nWorkflow completed without requiring production deployment approval.") + + """ + Sample Output: + Starting group chat workflow for software deployment... + Agents: QA Engineer, DevOps Engineer + ------------------------------------------------------------ + + [QAEngineer]: Running the integration test suite to verify the application + before deployment... Test suite 'integration' completed: 47 passed, 0 failed. + All tests passing - ready for deployment. + + [DevOpsEngineer]: Checking staging environment status... Staging is healthy + with version 2.3.0. Creating rollback plan for version 2.4.0... Rollback plan + created with automated rollback to v2.2.0 if health checks fail. + + [APPROVAL REQUIRED] + Tool: deploy_to_production + Arguments: {"version": "2.4.0", "components": "api,web,worker"} + + ============================================================ + Human review required for production deployment! + In a real scenario, you would review the deployment details here. + Simulating approval for demo purposes... + ============================================================ + + [DevOpsEngineer]: Production deployment complete! Version 2.4.0 has been + successfully deployed with components: api, web, worker. + + ------------------------------------------------------------ + Deployment workflow completed successfully! + All agents have finished their tasks. + """ + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/samples/getting_started/workflows/tool-approval/sequential_builder_tool_approval.py b/python/samples/getting_started/workflows/tool-approval/sequential_builder_tool_approval.py new file mode 100644 index 0000000000..b4b134c75e --- /dev/null +++ b/python/samples/getting_started/workflows/tool-approval/sequential_builder_tool_approval.py @@ -0,0 +1,144 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +from typing import Annotated + +from agent_framework import ( + ChatMessage, + FunctionApprovalRequestContent, + RequestInfoEvent, + SequentialBuilder, + WorkflowOutputEvent, + ai_function, +) +from agent_framework.openai import OpenAIChatClient + +""" +Sample: Sequential Workflow with Tool Approval Requests + +This sample demonstrates how to use SequentialBuilder with tools that require human +approval before execution. The approval flow uses the existing @ai_function decorator +with approval_mode="always_require" to trigger human-in-the-loop interactions. + +This sample works as follows: +1. A SequentialBuilder workflow is created with a single agent that has tools requiring approval. +2. The agent receives a user task and determines it needs to call a sensitive tool. +3. The tool call triggers a FunctionApprovalRequestContent, pausing the workflow. +4. The sample simulates human approval by responding to the RequestInfoEvent. +5. Once approved, the tool executes and the agent completes its response. +6. The workflow outputs the final conversation with all messages. + +Purpose: +Show how tool call approvals integrate seamlessly with SequentialBuilder without +requiring any additional builder configuration. + +Demonstrate: +- Using @ai_function(approval_mode="always_require") for sensitive operations. +- Handling RequestInfoEvent with FunctionApprovalRequestContent in sequential workflows. +- Resuming workflow execution after approval via send_responses_streaming. + +Prerequisites: +- OpenAI or Azure OpenAI configured with the required environment variables. +- Basic familiarity with SequentialBuilder and streaming workflow events. +""" + + +# 1. Define tools - one requiring approval, one that doesn't +@ai_function(approval_mode="always_require") +def execute_database_query( + query: Annotated[str, "The SQL query to execute against the production database"], +) -> str: + """Execute a SQL query against the production database. Requires human approval.""" + # In a real implementation, this would execute the query + return f"Query executed successfully. Results: 3 rows affected by '{query}'" + + +@ai_function +def get_database_schema() -> str: + """Get the current database schema. Does not require approval.""" + return """ + Tables: + - users (id, name, email, created_at) + - orders (id, user_id, total, status, created_at) + - products (id, name, price, stock) + """ + + +async def main() -> None: + # 2. Create the agent with tools (approval mode is set per-tool via decorator) + chat_client = OpenAIChatClient() + database_agent = chat_client.create_agent( + name="DatabaseAgent", + instructions=( + "You are a database assistant. You can view the database schema and execute " + "queries. Always check the schema before running queries. Be careful with " + "queries that modify data." + ), + tools=[get_database_schema, execute_database_query], + ) + + # 3. Build a sequential workflow with the agent + workflow = SequentialBuilder().participants([database_agent]).build() + + # 4. Start the workflow with a user task + print("Starting sequential workflow with tool approval...") + print("-" * 60) + + # Phase 1: Run workflow and collect all events (stream ends at IDLE or IDLE_WITH_PENDING_REQUESTS) + request_info_events: list[RequestInfoEvent] = [] + async for event in workflow.run_stream( + "Check the schema and then update all orders with status 'pending' to 'processing'" + ): + if isinstance(event, RequestInfoEvent): + request_info_events.append(event) + if isinstance(event.data, FunctionApprovalRequestContent): + print(f"\nApproval requested for tool: {event.data.function_call.name}") + print(f" Arguments: {event.data.function_call.arguments}") + + # 5. Handle approval requests + if request_info_events: + for request_event in request_info_events: + if isinstance(request_event.data, FunctionApprovalRequestContent): + # In a real application, you would prompt the user here + print("\nSimulating human approval (auto-approving for demo)...") + + # Create approval response + approval_response = request_event.data.create_response(approved=True) + + # Phase 2: Send approval and continue workflow + output: list[ChatMessage] | None = None + async for event in workflow.send_responses_streaming({request_event.request_id: approval_response}): + if isinstance(event, WorkflowOutputEvent): + output = event.data + + if output: + print("\n" + "-" * 60) + print("Workflow completed. Final conversation:") + for msg in output: + role = msg.role.value if hasattr(msg.role, "value") else msg.role + text = msg.text[:200] + "..." if len(msg.text) > 200 else msg.text + print(f" [{role}]: {text}") + else: + print("No approval requests were generated (schema check may have been sufficient).") + + """ + Sample Output: + Starting sequential workflow with tool approval... + ------------------------------------------------------------ + + Approval requested for tool: execute_database_query + Arguments: {"query": "UPDATE orders SET status = 'processing' WHERE status = 'pending'"} + + Simulating human approval (auto-approving for demo)... + + ------------------------------------------------------------ + Workflow completed. Final conversation: + [user]: Check the schema and then update all orders with status 'pending' to 'processing' + [assistant]: I've checked the schema and executed the update query. The query + "UPDATE orders SET status = 'processing' WHERE status = 'pending'" + was executed successfully, affecting 3 rows. + """ + + +if __name__ == "__main__": + asyncio.run(main())