From c3b6183516c753e8daca4008da67c4d1a9a9bb07 Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Tue, 2 Dec 2025 14:42:44 +0900 Subject: [PATCH 1/6] Provide way for HITL with magentic --- .../agent_framework/_workflows/__init__.py | 16 + .../agent_framework/_workflows/_magentic.py | 552 +++++++++++++++--- .../core/tests/workflow/test_magentic.py | 59 +- .../getting_started/workflows/README.md | 1 + .../agents/magentic_workflow_as_agent.py | 10 +- .../workflows/orchestration/magentic.py | 10 +- .../orchestration/magentic_checkpoint.py | 10 +- .../magentic_human_in_the_loop.py | 324 ++++++++++ .../magentic_human_plan_update.py | 51 +- .../orchestrations/magentic.py | 10 +- 10 files changed, 914 insertions(+), 129 deletions(-) create mode 100644 python/samples/getting_started/workflows/orchestration/magentic_human_in_the_loop.py diff --git a/python/packages/core/agent_framework/_workflows/__init__.py b/python/packages/core/agent_framework/_workflows/__init__.py index 990264df41..cce4355779 100644 --- a/python/packages/core/agent_framework/_workflows/__init__.py +++ b/python/packages/core/agent_framework/_workflows/__init__.py @@ -74,10 +74,18 @@ ORCH_MSG_KIND_USER_TASK, MagenticBuilder, MagenticContext, + MagenticHumanInputRequest, + MagenticHumanInterventionDecision, + MagenticHumanInterventionKind, + MagenticHumanInterventionReply, + MagenticHumanInterventionRequest, MagenticManagerBase, MagenticPlanReviewDecision, MagenticPlanReviewReply, MagenticPlanReviewRequest, + MagenticStallInterventionDecision, + MagenticStallInterventionReply, + MagenticStallInterventionRequest, StandardMagenticManager, ) from ._orchestration_state import OrchestrationState @@ -144,10 +152,18 @@ "InProcRunnerContext", "MagenticBuilder", "MagenticContext", + "MagenticHumanInputRequest", + "MagenticHumanInterventionDecision", + "MagenticHumanInterventionKind", + "MagenticHumanInterventionReply", + "MagenticHumanInterventionRequest", "MagenticManagerBase", "MagenticPlanReviewDecision", "MagenticPlanReviewReply", "MagenticPlanReviewRequest", + "MagenticStallInterventionDecision", + "MagenticStallInterventionReply", + "MagenticStallInterventionRequest", "ManagerDirectiveModel", "ManagerSelectionRequest", "ManagerSelectionResponse", diff --git a/python/packages/core/agent_framework/_workflows/_magentic.py b/python/packages/core/agent_framework/_workflows/_magentic.py index 1a6aaf2999..c84c71e416 100644 --- a/python/packages/core/agent_framework/_workflows/_magentic.py +++ b/python/packages/core/agent_framework/_workflows/_magentic.py @@ -17,7 +17,6 @@ AgentProtocol, AgentRunResponse, AgentRunResponseUpdate, - ChatClientProtocol, ChatMessage, Role, ) @@ -373,29 +372,129 @@ def from_dict(cls, value: dict[str, Any]) -> "_MagenticResponseMessage": return cls(body=body, target_agent=target_agent, broadcast=broadcast) +# region Human Intervention Types + + +class MagenticHumanInterventionKind(str, Enum): + """The kind of human intervention being requested.""" + + PLAN_REVIEW = "plan_review" # Review and approve/revise the initial plan + TOOL_APPROVAL = "tool_approval" # Approve a tool/function call + STALL = "stall" # Workflow has stalled and needs guidance + + +class MagenticHumanInterventionDecision(str, Enum): + """Decision options for human intervention responses.""" + + APPROVE = "approve" # Approve (plan review, tool approval) + REVISE = "revise" # Request revision with feedback (plan review) + REJECT = "reject" # Reject/deny (tool approval) + CONTINUE = "continue" # Continue with current state (stall) + REPLAN = "replan" # Trigger replanning (stall) + GUIDANCE = "guidance" # Provide guidance text (stall, tool approval) + + +@dataclass +class _MagenticHumanInterventionRequest: + """Unified request for human intervention in a Magentic workflow. + + This request is emitted when the workflow needs human input. The `kind` field + indicates what type of intervention is needed, and the relevant fields are + populated based on the kind. + + Attributes: + request_id: Unique identifier for correlating responses + kind: The type of intervention needed (plan_review, tool_approval, stall) + + # Plan review fields + task_text: The task description (plan_review) + facts_text: Extracted facts from the task (plan_review) + plan_text: The proposed or current plan (plan_review, stall) + round_index: Number of review rounds so far (plan_review) + + # Tool approval fields + agent_id: The agent requesting input (tool_approval) + prompt: Description of what input is needed (tool_approval) + context: Additional context (tool_approval) + conversation_snapshot: Recent conversation history (tool_approval) + + # Stall intervention fields + stall_count: Number of consecutive stall rounds (stall) + max_stall_count: Threshold that triggered intervention (stall) + stall_reason: Description of why progress stalled (stall) + last_agent: Last active agent (stall) + """ + + request_id: str = field(default_factory=lambda: str(uuid4())) + kind: MagenticHumanInterventionKind = MagenticHumanInterventionKind.PLAN_REVIEW + + # Plan review fields + task_text: str = "" + facts_text: str = "" + plan_text: str = "" + round_index: int = 0 + + # Tool approval fields + agent_id: str = "" + prompt: str = "" + context: str | None = None + conversation_snapshot: list[ChatMessage] = field(default_factory=list) # type: ignore + + # Stall intervention fields + stall_count: int = 0 + max_stall_count: int = 3 + stall_reason: str = "" + last_agent: str = "" + + +@dataclass +class _MagenticHumanInterventionReply: + """Unified reply to a human intervention request. + + The relevant fields depend on the original request kind and the decision made. + + Attributes: + decision: The human's decision (approve, revise, continue, replan, guidance) + edited_plan_text: New plan text if directly editing (plan_review with approve/revise) + comments: Feedback for revision or guidance text (plan_review, stall with guidance) + response_text: Free-form response text (tool_approval) + """ + + decision: MagenticHumanInterventionDecision + edited_plan_text: str | None = None + comments: str | None = None + response_text: str | None = None + + +# Backward compatibility aliases for existing types @dataclass class _MagenticPlanReviewRequest: - """Internal: Human-in-the-loop request to review and optionally edit the plan before execution.""" + """[DEPRECATED] Use MagenticHumanInterventionRequest with kind=PLAN_REVIEW instead.""" request_id: str = field(default_factory=lambda: str(uuid4())) task_text: str = "" facts_text: str = "" plan_text: str = "" - round_index: int = 0 # number of review rounds so far + round_index: int = 0 class MagenticPlanReviewDecision(str, Enum): + """[DEPRECATED] Use MagenticHumanInterventionDecision instead.""" + APPROVE = "approve" REVISE = "revise" @dataclass class _MagenticPlanReviewReply: - """Internal: Human reply to a plan review request.""" + """[DEPRECATED] Use MagenticHumanInterventionReply instead.""" decision: MagenticPlanReviewDecision - edited_plan_text: str | None = None # if supplied, becomes the new plan text verbatim - comments: str | None = None # guidance for replan if no edited text provided + edited_plan_text: str | None = None + comments: str | None = None + + +# endregion Human Intervention Types @dataclass @@ -650,10 +749,9 @@ class StandardMagenticManager(MagenticManagerBase): def __init__( self, - chat_client: ChatClientProtocol, + agent: AgentProtocol, task_ledger: _MagenticTaskLedger | None = None, *, - instructions: str | None = None, task_ledger_facts_prompt: str | None = None, task_ledger_plan_prompt: str | None = None, task_ledger_full_prompt: str | None = None, @@ -669,11 +767,11 @@ def __init__( """Initialize the Standard Magentic Manager. Args: - chat_client: The chat client to use for LLM calls. - instructions: Instructions for the orchestrator agent. + agent: An agent instance to use for LLM calls. The agent's configured + options (temperature, seed, instructions, etc.) will be applied. + task_ledger: Optional task ledger for managing task state. Keyword Args: - task_ledger: Optional task ledger for managing task state. task_ledger_facts_prompt: Optional prompt for the task ledger facts. task_ledger_plan_prompt: Optional prompt for the task ledger plan. task_ledger_full_prompt: Optional prompt for the full task ledger. @@ -692,8 +790,7 @@ def __init__( max_round_count=max_round_count, ) - self.chat_client: ChatClientProtocol = chat_client - self.instructions: str | None = instructions + self._agent: AgentProtocol = agent self.task_ledger: _MagenticTaskLedger | None = task_ledger # Prompts may be overridden if needed @@ -717,34 +814,20 @@ async def _complete( self, messages: list[ChatMessage], ) -> ChatMessage: - """Call the underlying ChatClientProtocol directly and return the last assistant message. + """Call the underlying agent and return the last assistant message. - If manager instructions are provided, they are injected as a SYSTEM message - at the start of the request to guide the model consistently without needing - an intermediate Agent wrapper. + The agent's run method is called which applies the agent's configured options + (temperature, seed, instructions, etc.). """ - # Prepend system instructions if present - request_messages: list[ChatMessage] = [] - if self.instructions: - request_messages.append(ChatMessage(role=Role.SYSTEM, text=self.instructions)) - request_messages.extend(messages) - - # Invoke the chat client non-streaming API - response = await self.chat_client.get_response(request_messages) - try: - out_messages: list[ChatMessage] | None = list(response.messages) # type: ignore[assignment] - except Exception: - out_messages = None - + response: AgentRunResponse = await self._agent.run(messages) + out_messages = response.messages if response else None if out_messages: last = out_messages[-1] return ChatMessage( - role=last.role or Role.ASSISTANT, - text=last.text or "", + role=last.role, + text=last.text, author_name=last.author_name or MAGENTIC_MANAGER_NAME, ) - - # Fallback if no messages return ChatMessage(role=Role.ASSISTANT, text="No output produced.", author_name=MAGENTIC_MANAGER_NAME) async def plan(self, magentic_context: MagenticContext) -> ChatMessage: @@ -918,6 +1001,7 @@ class MagenticOrchestratorExecutor(BaseGroupChatOrchestrator): _plan_review_round: int _max_plan_review_rounds: int _terminated: bool + _enable_stall_intervention: bool def __init__( self, @@ -926,6 +1010,7 @@ def __init__( *, require_plan_signoff: bool = False, max_plan_review_rounds: int = 10, + enable_stall_intervention: bool = False, executor_id: str | None = None, ) -> None: """Initializes a new instance of the MagenticOrchestratorExecutor. @@ -935,6 +1020,7 @@ def __init__( participants: A dictionary of participant IDs to their names. require_plan_signoff: Whether to require plan sign-off from a human. max_plan_review_rounds: The maximum number of plan review rounds. + enable_stall_intervention: Whether to request human input on stalls instead of auto-replan. executor_id: An optional executor ID. """ super().__init__(executor_id or f"magentic_orchestrator_{uuid4().hex[:8]}") @@ -945,6 +1031,7 @@ def __init__( self._require_plan_signoff = require_plan_signoff self._plan_review_round = 0 self._max_plan_review_rounds = max_plan_review_rounds + self._enable_stall_intervention = enable_stall_intervention # Registry of agent executors for internal coordination (e.g., resets) self._agent_executors = {} # Terminal state marker to stop further processing after completion/limits @@ -1210,22 +1297,45 @@ async def handle_response_message( await self._run_inner_loop(context) @response_handler - async def handle_plan_review_response( + async def handle_human_intervention_response( self, - original_request: _MagenticPlanReviewRequest, - response: _MagenticPlanReviewReply, + original_request: _MagenticHumanInterventionRequest, + response: _MagenticHumanInterventionReply, context: WorkflowContext[ - # may broadcast ledger next, or ask for another round of review - _MagenticResponseMessage | _MagenticRequestMessage | _MagenticPlanReviewRequest, list[ChatMessage] + _MagenticResponseMessage | _MagenticRequestMessage | _MagenticHumanInterventionRequest, list[ChatMessage] ], ) -> None: + """Handle unified human intervention responses. + + Routes the response to the appropriate handler based on the original request kind. + """ if getattr(self, "_terminated", False): return if self._context is None: return - if response.decision == MagenticPlanReviewDecision.APPROVE: + if original_request.kind == MagenticHumanInterventionKind.PLAN_REVIEW: + await self._handle_plan_review_response(original_request, response, context) + elif original_request.kind == MagenticHumanInterventionKind.STALL: + await self._handle_stall_intervention_response(original_request, response, context) + # TOOL_APPROVAL is handled by MagenticAgentExecutor, not the orchestrator + + async def _handle_plan_review_response( + self, + original_request: _MagenticHumanInterventionRequest, + response: _MagenticHumanInterventionReply, + context: WorkflowContext[ + _MagenticResponseMessage | _MagenticRequestMessage | _MagenticHumanInterventionRequest, list[ChatMessage] + ], + ) -> None: + """Handle plan review response.""" + if self._context is None: + return + + is_approve = response.decision == MagenticHumanInterventionDecision.APPROVE + + if is_approve: # Close the review loop on approval (no further plan review requests this run) self._require_plan_signoff = False # If the user supplied an edited plan, adopt it @@ -1246,13 +1356,11 @@ async def handle_plan_review_response( text=combined, author_name=MAGENTIC_MANAGER_NAME, ) - # If approved with comments but no edited text, apply comments via replan and proceed (no extra review) + # If approved with comments but no edited text, apply comments via replan and proceed elif response.comments: - # Record the human feedback for grounding self._context.chat_history.append( ChatMessage(role=Role.USER, text=f"Human plan feedback: {response.comments}") ) - # Ask the manager to replan based on comments; proceed immediately self._task_ledger = await self._manager.replan(self._context.clone(deep=True)) # Record the signed-off plan (no broadcast) @@ -1272,9 +1380,7 @@ async def handle_plan_review_response( self._plan_review_round += 1 if self._plan_review_round > self._max_plan_review_rounds: logger.warning("Magentic Orchestrator: Max plan review rounds reached. Proceeding with current plan.") - # Stop any further plan review requests for the rest of this run self._require_plan_signoff = False - # Add a clear note to the conversation so users know review is closed notice = ChatMessage( role=Role.ASSISTANT, text=( @@ -1287,7 +1393,6 @@ async def handle_plan_review_response( await self._emit_orchestrator_message(context, notice, ORCH_MSG_KIND_NOTICE) if self._task_ledger: self._context.chat_history.append(self._task_ledger) - # No further review requests; proceed directly into coordination ctx2 = cast( WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, list[ChatMessage]], context, @@ -1295,12 +1400,11 @@ async def handle_plan_review_response( await self._run_inner_loop(ctx2) return - # If the user provided an edited plan, adopt it directly and ask them to confirm once more + # If the user provided an edited plan, adopt it and ask for confirmation if response.edited_plan_text: mgr_ledger2 = getattr(self._manager, "task_ledger", None) if mgr_ledger2 is not None: mgr_ledger2.plan.text = response.edited_plan_text - # Rebuild combined message for preview in the next review request team_text = _team_block(self._participants) combined = self._manager.task_ledger_full_prompt.format( task=self._context.task.text, @@ -1312,16 +1416,67 @@ async def handle_plan_review_response( await self._send_plan_review_request(cast(WorkflowContext, context)) return - # Else pass comments into the chat history and replan with the manager + # Else pass comments into the chat history and replan if response.comments: self._context.chat_history.append( ChatMessage(role=Role.USER, text=f"Human plan feedback: {response.comments}") ) - # Ask the manager to replan; this only adjusts the plan stage, not a full reset self._task_ledger = await self._manager.replan(self._context.clone(deep=True)) await self._send_plan_review_request(cast(WorkflowContext, context)) + async def _handle_stall_intervention_response( + self, + original_request: _MagenticHumanInterventionRequest, + response: _MagenticHumanInterventionReply, + context: WorkflowContext[ + _MagenticResponseMessage | _MagenticRequestMessage | _MagenticHumanInterventionRequest, list[ChatMessage] + ], + ) -> None: + """Handle stall intervention response.""" + if self._context is None: + return + + ctx = self._context + logger.info( + f"Stall intervention response: decision={response.decision.value}, " + f"stall_count was {original_request.stall_count}" + ) + + if response.decision == MagenticHumanInterventionDecision.CONTINUE: + ctx.stall_count = 0 + ctx2 = cast( + WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, list[ChatMessage]], + context, + ) + await self._run_inner_loop(ctx2) + return + + if response.decision == MagenticHumanInterventionDecision.REPLAN: + ctx2 = cast( + WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, list[ChatMessage]], + context, + ) + await self._reset_and_replan(ctx2) + return + + if response.decision == MagenticHumanInterventionDecision.GUIDANCE: + ctx.stall_count = 0 + guidance = response.comments or response.response_text + if guidance: + guidance_msg = ChatMessage( + role=Role.USER, + text=f"Human guidance to help with stall: {guidance}", + ) + ctx.chat_history.append(guidance_msg) + await self._emit_orchestrator_message(context, guidance_msg, ORCH_MSG_KIND_NOTICE) + ctx2 = cast( + WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, list[ChatMessage]], + context, + ) + await self._run_inner_loop(ctx2) + return + async def _run_outer_loop( self, context: WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, list[ChatMessage]], @@ -1399,7 +1554,34 @@ async def _run_inner_loop_helper( ctx.stall_count = max(0, ctx.stall_count - 1) if ctx.stall_count > self._manager.max_stall_count: - logger.info("Magentic Orchestrator: Stalling detected. Resetting and replanning") + logger.info("Magentic Orchestrator: Stalling detected after %d rounds", ctx.stall_count) + if self._enable_stall_intervention: + # Request human intervention instead of auto-replan + is_progress = current_progress_ledger.is_progress_being_made.answer + is_loop = current_progress_ledger.is_in_loop.answer + stall_reason = "No progress being made" if not is_progress else "" + if is_loop: + loop_msg = "Agents appear to be in a loop" + stall_reason = f"{stall_reason}; {loop_msg}" if stall_reason else loop_msg + next_speaker_val = current_progress_ledger.next_speaker.answer + last_agent = next_speaker_val if isinstance(next_speaker_val, str) else "" + # Get facts and plan from manager's task ledger + mgr_ledger = getattr(self._manager, "task_ledger", None) + facts_text = mgr_ledger.facts.text if mgr_ledger else "" + plan_text = mgr_ledger.plan.text if mgr_ledger else "" + request = _MagenticHumanInterventionRequest( + kind=MagenticHumanInterventionKind.STALL, + stall_count=ctx.stall_count, + max_stall_count=self._manager.max_stall_count, + task_text=ctx.task.text if ctx.task else "", + facts_text=facts_text, + plan_text=plan_text, + last_agent=last_agent, + stall_reason=stall_reason, + ) + await context.request_info(request, _MagenticHumanInterventionReply) + return + # Default behavior: auto-replan await self._reset_and_replan(context) return @@ -1515,7 +1697,7 @@ async def _check_within_limits_or_complete( return True async def _send_plan_review_request(self, context: WorkflowContext) -> None: - """Send a PlanReviewRequest.""" + """Send a human intervention request for plan review.""" # If plan sign-off is disabled (e.g., ran out of review rounds), do nothing if not self._require_plan_signoff: return @@ -1524,13 +1706,14 @@ async def _send_plan_review_request(self, context: WorkflowContext) -> None: plan_text = ledger.plan.text if ledger else "" task_text = self._context.task.text if self._context else "" - req = _MagenticPlanReviewRequest( + req = _MagenticHumanInterventionRequest( + kind=MagenticHumanInterventionKind.PLAN_REVIEW, task_text=task_text, facts_text=facts_text, plan_text=plan_text, round_index=self._plan_review_round, ) - await context.request_info(req, _MagenticPlanReviewReply) + await context.request_info(req, _MagenticHumanInterventionReply) # region Magentic Executors @@ -1543,17 +1726,22 @@ class MagenticAgentExecutor(Executor): - Receiving task ledger broadcasts - Responding to specific agent requests - Resetting agent state when needed + - Surfacing tool approval requests (user_input_requests) as HITL events """ def __init__( self, agent: AgentProtocol | Executor, agent_id: str, + enable_human_input: bool = False, ) -> None: super().__init__(f"agent_{agent_id}") self._agent = agent self._agent_id = agent_id self._chat_history: list[ChatMessage] = [] + self._enable_human_input = enable_human_input + self._pending_human_input_request: _MagenticHumanInterventionRequest | None = None + self._current_request_message: _MagenticRequestMessage | None = None @override async def on_checkpoint_save(self) -> dict[str, Any]: @@ -1634,6 +1822,9 @@ async def handle_request_message( logger.info("Agent %s: Received request to respond", self._agent_id) + # Store the original request message for potential continuation after human input + self._current_request_message = message + # Add persona adoption message with appropriate role persona_role = self._get_persona_adoption_role() persona_msg = ChatMessage( @@ -1650,20 +1841,23 @@ async def handle_request_message( from agent_framework import BaseAgent as _AF_AgentBase # local import to avoid cycles if not isinstance(self._agent, _AF_AgentBase): - response = ChatMessage( + response: ChatMessage = ChatMessage( role=Role.ASSISTANT, text=f"{self._agent_id} is a workflow executor and cannot be invoked directly.", author_name=self._agent_id, ) self._chat_history.append(response) await self._emit_agent_message_event(context, response) + await context.send_message(_MagenticResponseMessage(body=response)) else: # Invoke the agent - response = await self._invoke_agent(context) - self._chat_history.append(response) - - # Send response back to orchestrator - await context.send_message(_MagenticResponseMessage(body=response)) + agent_response = await self._invoke_agent(context) + if agent_response is None: + # Agent is waiting for human input - don't send response yet + return + self._chat_history.append(agent_response) + # Send response back to orchestrator + await context.send_message(_MagenticResponseMessage(body=agent_response)) except Exception as e: logger.warning("Agent %s invoke failed: %s", self._agent_id, e) @@ -1680,6 +1874,55 @@ def reset(self) -> None: """Reset the internal chat history of the agent (internal operation).""" logger.debug("Agent %s: Resetting chat history", self._agent_id) self._chat_history.clear() + self._pending_human_input_request = None + self._current_request_message = None + + @response_handler + async def handle_tool_approval_response( + self, + original_request: _MagenticHumanInterventionRequest, + response: _MagenticHumanInterventionReply, + context: WorkflowContext[_MagenticResponseMessage, AgentRunResponse], + ) -> None: + """Handle human response for tool approval and continue agent execution. + + When a human provides input in response to a tool approval request, + this handler processes the response, adds it to the conversation, and + sends a response back to the orchestrator. + + Args: + original_request: The original human intervention request + response: The human's response + context: The workflow context + """ + response_text = response.response_text or response.comments or "" + logger.info( + "Agent %s: Received tool approval for request %s: %s", + original_request.agent_id, + original_request.request_id, + response_text[:50] if response_text else "", + ) + + self._pending_human_input_request = None + + # Add the human response to the conversation with context from the original request + human_response_msg = ChatMessage( + role=Role.USER, + text=f"Human response to '{original_request.prompt}': {response_text}", + author_name="human", + ) + self._chat_history.append(human_response_msg) + + # Create a response message indicating human input was received + agent_response = ChatMessage( + role=Role.ASSISTANT, + text=f"Received human input for: {original_request.prompt}. Continuing with the task.", + author_name=original_request.agent_id, + ) + self._chat_history.append(agent_response) + + # Send response back to orchestrator to continue the workflow + await context.send_message(_MagenticResponseMessage(body=agent_response)) async def _emit_agent_delta_event( self, @@ -1687,10 +1930,12 @@ async def _emit_agent_delta_event( update: AgentRunResponseUpdate, ) -> None: # Add metadata to identify this as an agent streaming update - if update.additional_properties is None: - update.additional_properties = {} - update.additional_properties["magentic_event_type"] = MAGENTIC_EVENT_TYPE_AGENT_DELTA - update.additional_properties["agent_id"] = self._agent_id + props = update.additional_properties + if props is None: + props = {} + update.additional_properties = props + props["magentic_event_type"] = MAGENTIC_EVENT_TYPE_AGENT_DELTA + props["agent_id"] = self._agent_id # Emit AgentRunUpdateEvent with the agent response update await ctx.add_event(AgentRunUpdateEvent(executor_id=self._agent_id, data=update)) @@ -1707,8 +1952,12 @@ async def _emit_agent_message_event( async def _invoke_agent( self, ctx: WorkflowContext[_MagenticResponseMessage, AgentRunResponse], - ) -> ChatMessage: - """Invoke the wrapped agent and return a response.""" + ) -> ChatMessage | None: + """Invoke the wrapped agent and return a response. + + Returns: + ChatMessage with the agent's response, or None if waiting for human input. + """ logger.debug(f"Agent {self._agent_id}: Running with {len(self._chat_history)} messages") updates: list[AgentRunResponseUpdate] = [] @@ -1720,6 +1969,32 @@ async def _invoke_agent( run_result: AgentRunResponse = AgentRunResponse.from_agent_run_response_updates(updates) + # Handle human input requests (tool approval) - always surface these events + if run_result.user_input_requests: + for user_input_request in run_result.user_input_requests: + # Build a prompt from the request + prompt = "Human input required" + context_text = None + + # Extract information from the request to build a useful prompt + if hasattr(user_input_request, "function_call"): + fn_call = user_input_request.function_call + prompt = f"Approve function call: {fn_call.name}" + if fn_call.arguments: + context_text = f"Arguments: {fn_call.arguments}" + + # Create and send the human intervention request for tool approval + request = _MagenticHumanInterventionRequest( + kind=MagenticHumanInterventionKind.TOOL_APPROVAL, + agent_id=self._agent_id, + prompt=prompt, + context=context_text, + conversation_snapshot=list(self._chat_history[-5:]), + ) + self._pending_human_input_request = request + await ctx.request_info(request, _MagenticHumanInterventionReply) + return None # Signal that we're waiting for human input + messages: list[ChatMessage] | None = None with contextlib.suppress(Exception): messages = list(run_result.messages) # type: ignore[assignment] @@ -1807,6 +2082,8 @@ def __init__(self) -> None: self._manager: MagenticManagerBase | None = None self._enable_plan_review: bool = False self._checkpoint_storage: CheckpointStorage | None = None + self._enable_human_input: bool = False + self._enable_stall_intervention: bool = False def participants(self, **participants: AgentProtocol | Executor) -> Self: """Add participant agents or executors to the Magentic workflow. @@ -1892,6 +2169,87 @@ def with_plan_review(self, enable: bool = True) -> "MagenticBuilder": self._enable_plan_review = enable return self + def with_human_input(self, enable: bool = True) -> "MagenticBuilder": + """[DEPRECATED] This method is no longer needed. + + Tool approval requests (user_input_requests) are now ALWAYS surfaced as + MagenticHumanInputRequest events. You do not need to call this method. + + This method is kept for backward compatibility but has no effect. + + Note: + For human intervention during workflow stalls, use :meth:`with_human_input_on_stall`. + + Args: + enable: Has no effect (kept for backward compatibility) + + Returns: + Self for method chaining + """ + self._enable_human_input = enable + return self + + def with_human_input_on_stall(self, enable: bool = True) -> "MagenticBuilder": + """Enable human intervention when the workflow detects a stall. + + When enabled, instead of automatically replanning when the workflow detects + that agents are not making progress or are stuck in a loop, the workflow will + pause and emit a MagenticStallInterventionRequest event. A human can then + decide to continue, trigger replanning, or provide guidance. + + This is useful for: + - Workflows where automatic replanning may not resolve the issue + - Scenarios requiring human judgment about workflow direction + - Debugging stuck workflows with human insight + - Complex tasks where human guidance can help agents get back on track + + When stall detection triggers (based on max_stall_count), instead of calling + _reset_and_replan automatically, the workflow will: + 1. Emit a MagenticHumanInterventionRequest with kind=STALL + 2. Wait for human response via send_responses_streaming + 3. Take action based on the human's decision (continue, replan, or guidance) + + Args: + enable: Whether to enable stall intervention (default True) + + Returns: + Self for method chaining + + Usage: + + .. code-block:: python + + workflow = ( + MagenticBuilder() + .participants(agent1=agent1) + .with_standard_manager(chat_client=client, max_stall_count=3) + .with_human_input_on_stall(enable=True) + .build() + ) + + # During execution, handle human intervention requests + async for event in workflow.run_stream("task"): + if isinstance(event, RequestInfoEvent): + if event.request_type is MagenticHumanInterventionRequest: + request = event.data + if request.kind == MagenticHumanInterventionKind.STALL: + print(f"Workflow stalled: {request.stall_reason}") + reply = MagenticHumanInterventionReply( + decision=MagenticHumanInterventionDecision.GUIDANCE, + comments="Focus on completing the current step first", + ) + responses = {event.request_id: reply} + async for ev in workflow.send_responses_streaming(responses): + ... + + See Also: + - :class:`MagenticHumanInterventionRequest`: Unified request type + - :class:`MagenticHumanInterventionDecision`: Decision options + - :meth:`with_standard_manager`: Configure max_stall_count for stall detection + """ + self._enable_stall_intervention = enable + return self + def with_checkpointing(self, checkpoint_storage: CheckpointStorage) -> "MagenticBuilder": """Enable workflow state persistence using the provided checkpoint storage. @@ -1943,9 +2301,8 @@ def with_standard_manager( manager: MagenticManagerBase | None = None, *, # Constructor args for StandardMagenticManager when manager is not provided - chat_client: ChatClientProtocol | None = None, + agent: AgentProtocol | None = None, task_ledger: _MagenticTaskLedger | None = None, - instructions: str | None = None, # Prompt overrides task_ledger_facts_prompt: str | None = None, task_ledger_plan_prompt: str | None = None, @@ -1966,18 +2323,18 @@ def with_standard_manager( 1. **Provide existing manager**: Pass a pre-configured manager instance (custom or standard) for full control over behavior - 2. **Auto-create standard manager**: Pass chat_client and options to automatically - create a StandardMagenticManager with specified configuration + 2. **Auto-create with agent**: Pass an agent to automatically create a + StandardMagenticManager that uses the agent's configured instructions and + options (temperature, seed, etc.) Args: manager: Pre-configured manager instance (StandardMagenticManager or custom MagenticManagerBase subclass). If provided, all other arguments are ignored. - chat_client: LLM chat client for generating plans and decisions. Required if - manager is not provided. + agent: Agent instance for generating plans and decisions. The agent's + configured instructions and options (temperature, seed, etc.) will be + applied. task_ledger: Optional custom task ledger implementation for specialized prompting or structured output requirements - instructions: System instructions prepended to all manager prompts to guide - behavior and set expectations task_ledger_facts_prompt: Custom prompt template for extracting facts from task description task_ledger_plan_prompt: Custom prompt template for generating initial plan @@ -2002,25 +2359,30 @@ def with_standard_manager( Self for method chaining Raises: - ValueError: If manager is None and chat_client is also None + ValueError: If manager is None and agent is not provided. - Usage with auto-created manager: + Usage with agent (recommended): .. code-block:: python - from azure.ai.projects.aio import AIProjectClient + from agent_framework import ChatAgent, ChatOptions + from agent_framework.openai import OpenAIChatClient - project_client = AIProjectClient.from_connection_string(...) - chat_client = project_client.inference.get_chat_completions_client() + # Configure manager agent with specific options and instructions + manager_agent = ChatAgent( + name="Coordinator", + chat_client=OpenAIChatClient(model_id="gpt-4o"), + chat_options=ChatOptions(temperature=0.3, seed=42), + instructions="Be concise and focus on accuracy", + ) workflow = ( MagenticBuilder() .participants(agent1=agent1, agent2=agent2) .with_standard_manager( - chat_client=chat_client, + agent=manager_agent, max_round_count=20, max_stall_count=3, - instructions="Be concise and focus on accuracy", ) .build() ) @@ -2046,7 +2408,7 @@ async def plan(self, context: MagenticContext) -> ChatMessage: MagenticBuilder() .participants(coder=coder_agent, reviewer=reviewer_agent) .with_standard_manager( - chat_client=chat_client, + agent=manager_agent, task_ledger_plan_prompt="Create a detailed step-by-step plan...", progress_ledger_prompt="Assess progress and decide next action...", max_stall_count=2, @@ -2059,20 +2421,18 @@ async def plan(self, context: MagenticContext) -> ChatMessage: - Custom managers can implement alternative selection strategies - Prompt templates support Jinja2-style variable substitution - Stall detection helps prevent infinite loops in stuck scenarios + - The agent's instructions are used as system instructions for all manager prompts """ if manager is not None: self._manager = manager return self - if chat_client is None: - raise ValueError( - "chat_client is required when manager is not provided: with_standard_manager(chat_client=...)" - ) + if agent is None: + raise ValueError("agent is required when manager is not provided: with_standard_manager(agent=...)") self._manager = StandardMagenticManager( - chat_client=chat_client, + agent=agent, task_ledger=task_ledger, - instructions=instructions, task_ledger_facts_prompt=task_ledger_facts_prompt, task_ledger_plan_prompt=task_ledger_plan_prompt, task_ledger_full_prompt=task_ledger_full_prompt, @@ -2104,15 +2464,19 @@ def build(self) -> Workflow: # Type narrowing: we already checked self._manager is not None above manager: MagenticManagerBase = self._manager # type: ignore[assignment] + enable_stall_intervention = self._enable_stall_intervention def _orchestrator_factory(wiring: _GroupChatConfig) -> Executor: return MagenticOrchestratorExecutor( manager=manager, participants=participant_descriptions, require_plan_signoff=self._enable_plan_review, + enable_stall_intervention=enable_stall_intervention, executor_id="magentic_orchestrator", ) + enable_human_input = self._enable_human_input + def _participant_factory( spec: GroupChatParticipantSpec, wiring: _GroupChatConfig, @@ -2120,6 +2484,7 @@ def _participant_factory( agent_executor = MagenticAgentExecutor( spec.participant, spec.name, + enable_human_input=enable_human_input, ) orchestrator = wiring.orchestrator if isinstance(orchestrator, MagenticOrchestratorExecutor): @@ -2367,6 +2732,15 @@ def __getattr__(self, name: str) -> Any: # endregion Magentic Workflow -# Public aliases for types needed by users implementing custom plan review handlers +# Public aliases for unified human intervention types +MagenticHumanInterventionRequest = _MagenticHumanInterventionRequest +MagenticHumanInterventionReply = _MagenticHumanInterventionReply + +# Backward compatibility aliases (deprecated) MagenticPlanReviewRequest = _MagenticPlanReviewRequest MagenticPlanReviewReply = _MagenticPlanReviewReply +# Old aliases - point to unified types for compatibility +MagenticHumanInputRequest = _MagenticHumanInterventionRequest # type: ignore +MagenticStallInterventionRequest = _MagenticHumanInterventionRequest # type: ignore +MagenticStallInterventionReply = _MagenticHumanInterventionReply # type: ignore +MagenticStallInterventionDecision = MagenticHumanInterventionDecision # type: ignore diff --git a/python/packages/core/tests/workflow/test_magentic.py b/python/packages/core/tests/workflow/test_magentic.py index b41d243a3e..2ac65e3bc2 100644 --- a/python/packages/core/tests/workflow/test_magentic.py +++ b/python/packages/core/tests/workflow/test_magentic.py @@ -12,12 +12,12 @@ AgentRunResponseUpdate, AgentRunUpdateEvent, BaseAgent, - ChatClientProtocol, ChatMessage, - ChatResponse, - ChatResponseUpdate, Executor, MagenticBuilder, + MagenticHumanInterventionDecision, + MagenticHumanInterventionReply, + MagenticHumanInterventionRequest, MagenticManagerBase, MagenticPlanReviewDecision, MagenticPlanReviewReply, @@ -220,15 +220,14 @@ async def test_magentic_workflow_plan_review_approval_to_completion(): req_event: RequestInfoEvent | None = None async for ev in wf.run_stream("do work"): - if isinstance(ev, RequestInfoEvent) and ev.request_type is MagenticPlanReviewRequest: + if isinstance(ev, RequestInfoEvent) and ev.request_type is MagenticHumanInterventionRequest: req_event = ev assert req_event is not None completed = False output: list[ChatMessage] | None = None - async for ev in wf.send_responses_streaming( - responses={req_event.request_id: MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.APPROVE)} - ): + reply = MagenticHumanInterventionReply(decision=MagenticHumanInterventionDecision.APPROVE) + async for ev in wf.send_responses_streaming(responses={req_event.request_id: reply}): if isinstance(ev, WorkflowStatusEvent) and ev.state == WorkflowRunState.IDLE: completed = True elif isinstance(ev, WorkflowOutputEvent): @@ -265,7 +264,7 @@ async def replan(self, magentic_context: MagenticContext) -> ChatMessage: # typ # Wait for the initial plan review request req_event: RequestInfoEvent | None = None async for ev in wf.run_stream("do work"): - if isinstance(ev, RequestInfoEvent) and ev.request_type is MagenticPlanReviewRequest: + if isinstance(ev, RequestInfoEvent) and ev.request_type is MagenticHumanInterventionRequest: req_event = ev assert req_event is not None @@ -274,13 +273,13 @@ async def replan(self, magentic_context: MagenticContext) -> ChatMessage: # typ completed = False async for ev in wf.send_responses_streaming( responses={ - req_event.request_id: MagenticPlanReviewReply( - decision=MagenticPlanReviewDecision.APPROVE, + req_event.request_id: MagenticHumanInterventionReply( + decision=MagenticHumanInterventionDecision.APPROVE, comments="Looks good; consider Z", ) } ): - if isinstance(ev, RequestInfoEvent) and ev.request_type is MagenticPlanReviewRequest: + if isinstance(ev, RequestInfoEvent) and ev.request_type is MagenticHumanInterventionRequest: saw_second_review = True if isinstance(ev, WorkflowStatusEvent) and ev.state == WorkflowRunState.IDLE: completed = True @@ -338,7 +337,7 @@ async def test_magentic_checkpoint_resume_round_trip(): task_text = "checkpoint task" req_event: RequestInfoEvent | None = None async for ev in wf.run_stream(task_text): - if isinstance(ev, RequestInfoEvent) and ev.request_type is MagenticPlanReviewRequest: + if isinstance(ev, RequestInfoEvent) and ev.request_type is MagenticHumanInterventionRequest: req_event = ev assert req_event is not None @@ -359,13 +358,13 @@ async def test_magentic_checkpoint_resume_round_trip(): orchestrator = next(exec for exec in wf_resume.executors.values() if isinstance(exec, MagenticOrchestratorExecutor)) - reply = MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.APPROVE) + reply = MagenticHumanInterventionReply(decision=MagenticHumanInterventionDecision.APPROVE) completed: WorkflowOutputEvent | None = None req_event = None async for event in wf_resume.run_stream( resume_checkpoint.checkpoint_id, ): - if isinstance(event, RequestInfoEvent) and event.request_type is MagenticPlanReviewRequest: + if isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest: req_event = event assert req_event is not None @@ -430,25 +429,29 @@ async def test_magentic_agent_executor_on_checkpoint_save_and_restore_roundtrip( from agent_framework import StandardMagenticManager # noqa: E402 -class _StubChatClient(ChatClientProtocol): - @property - def additional_properties(self) -> dict[str, Any]: - """Get additional properties associated with the client.""" - return {} +class _StubManagerAgent(BaseAgent): + """Stub agent for testing StandardMagenticManager.""" - async def get_response(self, messages, **kwargs): # type: ignore[override] - return ChatResponse(messages=[ChatMessage(role=Role.ASSISTANT, text="ok")]) + async def run( + self, + request: None, + context: WorkflowContext | None = None, + ) -> AgentRunResponse: + return AgentRunResponse(messages=[ChatMessage(role=Role.ASSISTANT, text="ok")]) - def get_streaming_response(self, messages, **kwargs) -> AsyncIterable[ChatResponseUpdate]: # type: ignore[override] - async def _gen(): - if False: - yield ChatResponseUpdate() # pragma: no cover + def run_streaming( + self, + request: None, + context: WorkflowContext | None = None, + ) -> AsyncIterable[AgentRunResponseUpdate]: + async def _gen() -> AsyncIterable[AgentRunResponseUpdate]: + yield AgentRunResponseUpdate(message_deltas=[ChatMessage(role=Role.ASSISTANT, text="ok")]) return _gen() async def test_standard_manager_plan_and_replan_via_complete_monkeypatch(): - mgr = StandardMagenticManager(chat_client=_StubChatClient()) + mgr = StandardMagenticManager(agent=_StubManagerAgent()) async def fake_complete_plan(messages: list[ChatMessage], **kwargs: Any) -> ChatMessage: # Return a different response depending on call order length @@ -481,7 +484,7 @@ async def fake_complete_replan(messages: list[ChatMessage], **kwargs: Any) -> Ch async def test_standard_manager_progress_ledger_success_and_error(): - mgr = StandardMagenticManager(chat_client=_StubChatClient()) + mgr = StandardMagenticManager(agent=_StubManagerAgent()) ctx = MagenticContext( task=ChatMessage(role=Role.USER, text="task"), participant_descriptions={"alice": "desc"}, @@ -718,7 +721,7 @@ async def test_magentic_checkpoint_resume_rejects_participant_renames(): req_event: RequestInfoEvent | None = None async for event in workflow.run_stream("task"): - if isinstance(event, RequestInfoEvent) and event.request_type is MagenticPlanReviewRequest: + if isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest: req_event = event assert req_event is not None diff --git a/python/samples/getting_started/workflows/README.md b/python/samples/getting_started/workflows/README.md index 3146f3f38b..89d4e53102 100644 --- a/python/samples/getting_started/workflows/README.md +++ b/python/samples/getting_started/workflows/README.md @@ -101,6 +101,7 @@ For observability samples in Agent Framework, see the [observability getting sta | Handoff (Return-to-Previous) | [orchestration/handoff_return_to_previous.py](./orchestration/handoff_return_to_previous.py) | Return-to-previous routing: after user input, routes back to the previous specialist instead of coordinator using `.enable_return_to_previous()` | | Magentic Workflow (Multi-Agent) | [orchestration/magentic.py](./orchestration/magentic.py) | Orchestrate multiple agents with Magentic manager and streaming | | Magentic + Human Plan Review | [orchestration/magentic_human_plan_update.py](./orchestration/magentic_human_plan_update.py) | Human reviews/updates the plan before execution | +| Magentic + Human Input (HITL) | [orchestration/magentic_human_in_the_loop.py](./orchestration/magentic_human_in_the_loop.py) | Plan review plus human input during agent execution | | Magentic + Checkpoint Resume | [orchestration/magentic_checkpoint.py](./orchestration/magentic_checkpoint.py) | Resume Magentic orchestration from saved checkpoints | | Sequential Orchestration (Agents) | [orchestration/sequential_agents.py](./orchestration/sequential_agents.py) | Chain agents sequentially with shared conversation context | | Sequential Orchestration (Custom Executor) | [orchestration/sequential_custom_executors.py](./orchestration/sequential_custom_executors.py) | Mix agents with a summarizer that appends a compact summary | diff --git a/python/samples/getting_started/workflows/agents/magentic_workflow_as_agent.py b/python/samples/getting_started/workflows/agents/magentic_workflow_as_agent.py index 091f37c055..f6dd8ca83d 100644 --- a/python/samples/getting_started/workflows/agents/magentic_workflow_as_agent.py +++ b/python/samples/getting_started/workflows/agents/magentic_workflow_as_agent.py @@ -48,13 +48,21 @@ async def main() -> None: tools=HostedCodeInterpreterTool(), ) + # Create a manager agent for orchestration + manager_agent = ChatAgent( + name="MagenticManager", + description="Orchestrator that coordinates the research and coding workflow", + instructions="You coordinate a team to complete complex tasks efficiently.", + chat_client=OpenAIChatClient(), + ) + print("\nBuilding Magentic Workflow...") workflow = ( MagenticBuilder() .participants(researcher=researcher_agent, coder=coder_agent) .with_standard_manager( - chat_client=OpenAIChatClient(), + agent=manager_agent, max_round_count=10, max_stall_count=3, max_reset_count=2, diff --git a/python/samples/getting_started/workflows/orchestration/magentic.py b/python/samples/getting_started/workflows/orchestration/magentic.py index 0e265cb931..213486706a 100644 --- a/python/samples/getting_started/workflows/orchestration/magentic.py +++ b/python/samples/getting_started/workflows/orchestration/magentic.py @@ -65,6 +65,14 @@ async def main() -> None: tools=HostedCodeInterpreterTool(), ) + # Create a manager agent for orchestration + manager_agent = ChatAgent( + name="MagenticManager", + description="Orchestrator that coordinates the research and coding workflow", + instructions="You coordinate a team to complete complex tasks efficiently.", + chat_client=OpenAIChatClient(), + ) + print("\nBuilding Magentic Workflow...") # State used by on_agent_stream callback @@ -75,7 +83,7 @@ async def main() -> None: MagenticBuilder() .participants(researcher=researcher_agent, coder=coder_agent) .with_standard_manager( - chat_client=OpenAIChatClient(), + agent=manager_agent, max_round_count=10, max_stall_count=3, max_reset_count=2, diff --git a/python/samples/getting_started/workflows/orchestration/magentic_checkpoint.py b/python/samples/getting_started/workflows/orchestration/magentic_checkpoint.py index de7d794b19..b34d73238e 100644 --- a/python/samples/getting_started/workflows/orchestration/magentic_checkpoint.py +++ b/python/samples/getting_started/workflows/orchestration/magentic_checkpoint.py @@ -69,6 +69,14 @@ def build_workflow(checkpoint_storage: FileCheckpointStorage): chat_client=AzureOpenAIChatClient(credential=AzureCliCredential()), ) + # Create a manager agent for orchestration + manager_agent = ChatAgent( + name="MagenticManager", + description="Orchestrator that coordinates the research and writing workflow", + instructions="You coordinate a team to complete complex tasks efficiently.", + chat_client=AzureOpenAIChatClient(credential=AzureCliCredential()), + ) + # The builder wires in the Magentic orchestrator, sets the plan review path, and # stores the checkpoint backend so the runtime knows where to persist snapshots. return ( @@ -76,7 +84,7 @@ def build_workflow(checkpoint_storage: FileCheckpointStorage): .participants(researcher=researcher, writer=writer) .with_plan_review() .with_standard_manager( - chat_client=AzureOpenAIChatClient(credential=AzureCliCredential()), + agent=manager_agent, max_round_count=10, max_stall_count=3, ) diff --git a/python/samples/getting_started/workflows/orchestration/magentic_human_in_the_loop.py b/python/samples/getting_started/workflows/orchestration/magentic_human_in_the_loop.py new file mode 100644 index 0000000000..1021e1371f --- /dev/null +++ b/python/samples/getting_started/workflows/orchestration/magentic_human_in_the_loop.py @@ -0,0 +1,324 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +import logging +from typing import cast + +from agent_framework import ( + MAGENTIC_EVENT_TYPE_AGENT_DELTA, + MAGENTIC_EVENT_TYPE_ORCHESTRATOR, + AgentRunUpdateEvent, + ChatAgent, + ChatMessage, + MagenticBuilder, + MagenticHumanInterventionDecision, + MagenticHumanInterventionKind, + MagenticHumanInterventionReply, + MagenticHumanInterventionRequest, + RequestInfoEvent, + WorkflowOutputEvent, +) +from agent_framework.openai import OpenAIChatClient + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +""" +Sample: Magentic Orchestration with Human-in-the-Loop + +This sample demonstrates the unified human intervention pattern in Magentic workflows. +All HITL scenarios use a single request/reply type with a `kind` field: + +1. PLAN_REVIEW: Human reviews and approves/revises the initial plan before execution +2. TOOL_APPROVAL: Agents request approval for tool/function calls (always surfaced) +3. STALL: Human intervention when workflow detects agents are not making progress + +Key types: +- MagenticHumanInterventionRequest: Unified request with `kind` field +- MagenticHumanInterventionReply: Unified reply with `decision` field +- MagenticHumanInterventionKind: PLAN_REVIEW, TOOL_APPROVAL, STALL +- MagenticHumanInterventionDecision: APPROVE, REVISE, CONTINUE, REPLAN, GUIDANCE + +Key behaviors demonstrated: +- with_plan_review(): Enables plan review before starting +- with_human_input_on_stall(): Enables human intervention when workflow stalls +- Tool approval requests are always surfaced (foundational behavior) +- Single unified handler for all intervention types + +Use cases: +- Plan review: Validating the orchestrator's understanding of requirements +- Tool approval: High-stakes decisions requiring human approval +- Stall intervention: Complex tasks where human guidance helps agents get back on track + +Prerequisites: +- OpenAI credentials configured for `OpenAIChatClient`. +""" + + +async def main() -> None: + # Create a research agent that may need human clarification + researcher_agent = ChatAgent( + name="ResearcherAgent", + description="Specialist in research and information gathering", + instructions=( + "You are a Researcher. When you encounter ambiguous or unclear aspects " + "of a research task, ask for clarification before proceeding. " + "You find information without additional computation or quantitative analysis." + ), + chat_client=OpenAIChatClient(model_id="gpt-4o"), + ) + + # Create an analyst agent that processes and summarizes information + analyst_agent = ChatAgent( + name="AnalystAgent", + description="Data analyst who processes and summarizes research findings", + instructions=( + "You are an Analyst. You take research findings and create clear, " + "structured summaries with actionable insights. When the analysis " + "direction is unclear, request guidance from the user." + ), + chat_client=OpenAIChatClient(model_id="gpt-4o"), + ) + + # Create a manager agent with specific model options for deterministic planning + manager_agent = ChatAgent( + name="MagenticManager", + description="Orchestrator that coordinates the research and analysis workflow", + instructions="You coordinate a team to complete research and analysis tasks efficiently.", + chat_client=OpenAIChatClient(model_id="gpt-4o"), + ) + + print("\nBuilding Magentic Workflow with HITL capabilities...") + + # Build workflow with plan review and stall intervention enabled + # Note: Tool approval (user_input_requests) is always surfaced - no need to enable + workflow = ( + MagenticBuilder() + .participants(researcher=researcher_agent, analyst=analyst_agent) + .with_standard_manager( + agent=manager_agent, # Use agent for control over model behavior + max_round_count=10, + max_stall_count=3, + max_reset_count=2, + ) + .with_plan_review() # Enable plan review before execution + .with_human_input_on_stall() # Enable human intervention when workflow stalls + .build() + ) + + task = ( + "Research the latest developments in sustainable aviation fuel (SAF) technology. " + "Focus on production methods, current adoption rates, and major challenges. " + "Then analyze the findings and provide recommendations for airline companies " + "considering SAF adoption." + ) + + print(f"\nTask: {task}") + print("\nStarting workflow execution...") + print("=" * 60) + + try: + pending_request: RequestInfoEvent | None = None + pending_responses: dict[str, object] | None = None + completed = False + workflow_output: str | None = None + + last_stream_agent_id: str | None = None + stream_line_open: bool = False + + while not completed: + # Use streaming for both initial run and response sending + if pending_responses is not None: + stream = workflow.send_responses_streaming(pending_responses) + else: + stream = workflow.run_stream(task) + + # Collect events from the stream + async for event in stream: + if isinstance(event, AgentRunUpdateEvent): + props = event.data.additional_properties if event.data else None + event_type = props.get("magentic_event_type") if props else None + + if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR: + kind = props.get("orchestrator_message_kind", "") if props else "" + text = event.data.text if event.data else "" + if stream_line_open: + print() + stream_line_open = False + print(f"\n[ORCHESTRATOR: {kind}]\n{text}\n{'-' * 40}") + elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA: + agent_id = props.get("agent_id", "unknown") if props else "unknown" + if last_stream_agent_id != agent_id or not stream_line_open: + if stream_line_open: + print() + print(f"\n[{agent_id}]: ", end="", flush=True) + last_stream_agent_id = agent_id + stream_line_open = True + if event.data and event.data.text: + print(event.data.text, end="", flush=True) + + # Handle Plan Review Request + elif isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest: + if stream_line_open: + print() + stream_line_open = False + pending_request = event + req = cast(MagenticHumanInterventionRequest, event.data) + + if req.kind == MagenticHumanInterventionKind.PLAN_REVIEW: + print("\n" + "=" * 60) + print("PLAN REVIEW REQUEST") + print("=" * 60) + if req.plan_text: + print(f"\nProposed Plan:\n{req.plan_text}") + print() + + elif req.kind == MagenticHumanInterventionKind.TOOL_APPROVAL: + print("\n" + "=" * 60) + print("TOOL APPROVAL REQUESTED") + print("=" * 60) + print(f"\nAgent: {req.agent_id}") + print(f"Request: {req.prompt}") + if req.context: + print(f"Context: {req.context}") + print() + + elif req.kind == MagenticHumanInterventionKind.STALL: + print("\n" + "=" * 60) + print("STALL INTERVENTION REQUESTED") + print("=" * 60) + print(f"\nWorkflow appears stalled after {req.stall_count} rounds") + print(f"Reason: {req.stall_reason}") + if req.last_agent: + print(f"Last active agent: {req.last_agent}") + if req.plan_text: + print(f"\nCurrent plan:\n{req.plan_text}") + print() + + elif isinstance(event, WorkflowOutputEvent): + if stream_line_open: + print() + stream_line_open = False + workflow_output = event.data if event.data else None + completed = True + + if stream_line_open: + print() + stream_line_open = False + pending_responses = None + + # Handle pending requests + if pending_request is not None: + req = cast(MagenticHumanInterventionRequest, pending_request.data) + reply: MagenticHumanInterventionReply | None = None + + if req.kind == MagenticHumanInterventionKind.PLAN_REVIEW: + # Handle plan review + print("Plan review options:") + print("1. approve - Approve the plan as-is") + print("2. approve with comments - Approve with feedback") + print("3. revise - Request revision with feedback") + print("4. exit - Exit the workflow") + + while True: + choice = input("Enter your choice (1-4): ").strip().lower() # noqa: ASYNC250 + if choice in ["approve", "1"]: + reply = MagenticHumanInterventionReply(decision=MagenticHumanInterventionDecision.APPROVE) + break + if choice in ["approve with comments", "2"]: + comments = input("Enter your comments: ").strip() # noqa: ASYNC250 + reply = MagenticHumanInterventionReply( + decision=MagenticHumanInterventionDecision.APPROVE, + comments=comments if comments else None, + ) + break + if choice in ["revise", "3"]: + comments = input("Enter feedback for revision: ").strip() # noqa: ASYNC250 + reply = MagenticHumanInterventionReply( + decision=MagenticHumanInterventionDecision.REVISE, + comments=comments if comments else None, + ) + break + if choice in ["exit", "4"]: + print("Exiting workflow...") + return + print("Invalid choice. Please enter a number 1-4.") + + elif req.kind == MagenticHumanInterventionKind.TOOL_APPROVAL: + # Handle tool approval request + print("Tool approval options:") + print("1. approve - Allow the tool call") + print("2. deny - Reject the tool call") + print("3. guidance - Provide guidance instead") + print("4. exit - Exit the workflow") + + while True: + choice = input("Enter your choice (1-4): ").strip().lower() # noqa: ASYNC250 + if choice in ["approve", "1"]: + reply = MagenticHumanInterventionReply(decision=MagenticHumanInterventionDecision.APPROVE) + break + if choice in ["deny", "2"]: + reply = MagenticHumanInterventionReply(decision=MagenticHumanInterventionDecision.REJECT) + break + if choice in ["guidance", "3"]: + guidance = input("Enter your guidance: ").strip() # noqa: ASYNC250 + reply = MagenticHumanInterventionReply( + decision=MagenticHumanInterventionDecision.GUIDANCE, + comments=guidance if guidance else None, + ) + break + if choice in ["exit", "4"]: + print("Exiting workflow...") + return + print("Invalid choice. Please enter a number 1-4.") + + elif req.kind == MagenticHumanInterventionKind.STALL: + # Handle stall intervention request + print("Stall intervention options:") + print("1. continue - Continue with current plan (reset stall counter)") + print("2. replan - Trigger automatic replanning") + print("3. guidance - Provide guidance to help agents get back on track") + print("4. exit - Exit the workflow") + + while True: + choice = input("Enter your choice (1-4): ").strip().lower() # noqa: ASYNC250 + if choice in ["continue", "1"]: + reply = MagenticHumanInterventionReply(decision=MagenticHumanInterventionDecision.CONTINUE) + break + if choice in ["replan", "2"]: + reply = MagenticHumanInterventionReply(decision=MagenticHumanInterventionDecision.REPLAN) + break + if choice in ["guidance", "3"]: + guidance = input("Enter your guidance: ").strip() # noqa: ASYNC250 + reply = MagenticHumanInterventionReply( + decision=MagenticHumanInterventionDecision.GUIDANCE, + comments=guidance if guidance else None, + ) + break + if choice in ["exit", "4"]: + print("Exiting workflow...") + return + print("Invalid choice. Please enter a number 1-4.") + + if reply is not None: + pending_responses = {pending_request.request_id: reply} + pending_request = None + + # Show final result + print("\n" + "=" * 60) + print("WORKFLOW COMPLETED") + print("=" * 60) + if workflow_output: + # workflow_output is a list[ChatMessage] + messages = cast(list[ChatMessage], workflow_output) + if messages: + final_msg = messages[-1] + print(f"\nFinal Result:\n{final_msg.text}") + + except Exception as e: + print(f"Workflow execution failed: {e}") + logger.exception("Workflow exception", exc_info=e) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/samples/getting_started/workflows/orchestration/magentic_human_plan_update.py b/python/samples/getting_started/workflows/orchestration/magentic_human_plan_update.py index 52df5b24c6..6a3055d0e9 100644 --- a/python/samples/getting_started/workflows/orchestration/magentic_human_plan_update.py +++ b/python/samples/getting_started/workflows/orchestration/magentic_human_plan_update.py @@ -66,6 +66,14 @@ async def main() -> None: tools=HostedCodeInterpreterTool(), ) + # Create a manager agent for the orchestration + manager_agent = ChatAgent( + name="MagenticManager", + description="Orchestrator that coordinates the research and coding workflow", + instructions="You coordinate a team to complete complex tasks efficiently.", + chat_client=OpenAIChatClient(), + ) + # Callbacks def on_exception(exception: Exception) -> None: print(f"Exception occurred: {exception}") @@ -80,7 +88,7 @@ def on_exception(exception: Exception) -> None: MagenticBuilder() .participants(researcher=researcher_agent, coder=coder_agent) .with_standard_manager( - chat_client=OpenAIChatClient(), + agent=manager_agent, max_round_count=10, max_stall_count=3, max_reset_count=2, @@ -154,21 +162,48 @@ def on_exception(exception: Exception) -> None: # Get human input for plan review decision print("Plan review options:") print("1. approve - Approve the plan as-is") - print("2. revise - Request revision of the plan") - print("3. exit - Exit the workflow") + print("2. approve with comments - Approve with feedback for the manager") + print("3. revise - Request revision with your feedback") + print("4. edit - Directly edit the plan text") + print("5. exit - Exit the workflow") while True: - choice = input("Enter your choice (approve/revise/exit): ").strip().lower() # noqa: ASYNC250 + choice = input("Enter your choice (1-5): ").strip().lower() # noqa: ASYNC250 if choice in ["approve", "1"]: reply = MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.APPROVE) break - if choice in ["revise", "2"]: - reply = MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.REVISE) + if choice in ["approve with comments", "2"]: + comments = input("Enter your comments for the manager: ").strip() # noqa: ASYNC250 + reply = MagenticPlanReviewReply( + decision=MagenticPlanReviewDecision.APPROVE, + comments=comments if comments else None, + ) + break + if choice in ["revise", "3"]: + comments = input("Enter feedback for revising the plan: ").strip() # noqa: ASYNC250 + reply = MagenticPlanReviewReply( + decision=MagenticPlanReviewDecision.REVISE, + comments=comments if comments else None, + ) + break + if choice in ["edit", "4"]: + print("Enter your edited plan (end with an empty line):") + lines = [] + while True: + line = input() # noqa: ASYNC250 + if line == "": + break + lines.append(line) + edited_plan = "\n".join(lines) + reply = MagenticPlanReviewReply( + decision=MagenticPlanReviewDecision.REVISE, + edited_plan_text=edited_plan if edited_plan else None, + ) break - if choice in ["exit", "3"]: + if choice in ["exit", "5"]: print("Exiting workflow...") return - print("Invalid choice. Please enter 'approve', 'revise', or 'exit'.") + print("Invalid choice. Please enter a number 1-5.") pending_responses = {pending_request.request_id: reply} pending_request = None diff --git a/python/samples/semantic-kernel-migration/orchestrations/magentic.py b/python/samples/semantic-kernel-migration/orchestrations/magentic.py index f67620273a..87094a2047 100644 --- a/python/samples/semantic-kernel-migration/orchestrations/magentic.py +++ b/python/samples/semantic-kernel-migration/orchestrations/magentic.py @@ -136,10 +136,18 @@ async def run_agent_framework_example(prompt: str) -> str | None: tools=HostedCodeInterpreterTool(), ) + # Create a manager agent for orchestration + manager_agent = ChatAgent( + name="MagenticManager", + description="Orchestrator that coordinates the research and coding workflow", + instructions="You coordinate a team to complete complex tasks efficiently.", + chat_client=OpenAIChatClient(), + ) + workflow = ( MagenticBuilder() .participants(researcher=researcher, coder=coder) - .with_standard_manager(chat_client=OpenAIChatClient()) + .with_standard_manager(agent=manager_agent) .build() ) From 8e4916d77f29d4d2bf7efd678cc85b242f32bcc5 Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Tue, 2 Dec 2025 16:10:34 +0900 Subject: [PATCH 2/6] support tool call approvals and hitl stall replan --- .../agent_framework/_workflows/_magentic.py | 96 +++--- .../getting_started/workflows/README.md | 2 +- .../magentic_agent_clarification.py | 230 +++++++++++++ .../magentic_human_in_the_loop.py | 324 ------------------ 4 files changed, 283 insertions(+), 369 deletions(-) create mode 100644 python/samples/getting_started/workflows/orchestration/magentic_agent_clarification.py delete mode 100644 python/samples/getting_started/workflows/orchestration/magentic_human_in_the_loop.py diff --git a/python/packages/core/agent_framework/_workflows/_magentic.py b/python/packages/core/agent_framework/_workflows/_magentic.py index c84c71e416..0a8e21e088 100644 --- a/python/packages/core/agent_framework/_workflows/_magentic.py +++ b/python/packages/core/agent_framework/_workflows/_magentic.py @@ -18,6 +18,8 @@ AgentRunResponse, AgentRunResponseUpdate, ChatMessage, + FunctionApprovalRequestContent, + FunctionResultContent, Role, ) @@ -1733,14 +1735,13 @@ def __init__( self, agent: AgentProtocol | Executor, agent_id: str, - enable_human_input: bool = False, ) -> None: super().__init__(f"agent_{agent_id}") self._agent = agent self._agent_id = agent_id self._chat_history: list[ChatMessage] = [] - self._enable_human_input = enable_human_input self._pending_human_input_request: _MagenticHumanInterventionRequest | None = None + self._pending_tool_request: FunctionApprovalRequestContent | None = None self._current_request_message: _MagenticRequestMessage | None = None @override @@ -1875,6 +1876,7 @@ def reset(self) -> None: logger.debug("Agent %s: Resetting chat history", self._agent_id) self._chat_history.clear() self._pending_human_input_request = None + self._pending_tool_request = None self._current_request_message = None @response_handler @@ -1887,8 +1889,9 @@ async def handle_tool_approval_response( """Handle human response for tool approval and continue agent execution. When a human provides input in response to a tool approval request, - this handler processes the response, adds it to the conversation, and - sends a response back to the orchestrator. + this handler processes the response, creates a FunctionResultContent with + the human's answer, adds it to the conversation, and re-invokes the agent + to continue execution. Args: original_request: The original human intervention request @@ -1903,26 +1906,52 @@ async def handle_tool_approval_response( response_text[:50] if response_text else "", ) + # Get the pending tool request to extract call_id + pending_tool_request = self._pending_tool_request self._pending_human_input_request = None + self._pending_tool_request = None - # Add the human response to the conversation with context from the original request - human_response_msg = ChatMessage( - role=Role.USER, - text=f"Human response to '{original_request.prompt}': {response_text}", - author_name="human", - ) - self._chat_history.append(human_response_msg) + if pending_tool_request is not None: + # Create a FunctionResultContent with the human's response + function_result = FunctionResultContent( + call_id=pending_tool_request.function_call.call_id, + result=response_text, + ) + # Add the function result as a message to continue the conversation + result_msg = ChatMessage( + role=Role.USER, + contents=[function_result], + ) + self._chat_history.append(result_msg) - # Create a response message indicating human input was received - agent_response = ChatMessage( - role=Role.ASSISTANT, - text=f"Received human input for: {original_request.prompt}. Continuing with the task.", - author_name=original_request.agent_id, - ) - self._chat_history.append(agent_response) + # Re-invoke the agent to continue execution + agent_response = await self._invoke_agent(context) + if agent_response is None: + # Agent is waiting for more human input + return + self._chat_history.append(agent_response) + await context.send_message(_MagenticResponseMessage(body=agent_response)) + else: + # Fallback: no pending tool request, just add as text message + logger.warning( + f"Agent {original_request.agent_id}: No pending tool request found for response, " + "using fallback text handling", + ) + human_response_msg = ChatMessage( + role=Role.USER, + text=f"Human response to '{original_request.prompt}': {response_text}", + author_name="human", + ) + self._chat_history.append(human_response_msg) - # Send response back to orchestrator to continue the workflow - await context.send_message(_MagenticResponseMessage(body=agent_response)) + # Create a response message indicating human input was received + agent_response_msg = ChatMessage( + role=Role.ASSISTANT, + text=f"Received human input for: {original_request.prompt}. Continuing with the task.", + author_name=original_request.agent_id, + ) + self._chat_history.append(agent_response_msg) + await context.send_message(_MagenticResponseMessage(body=agent_response_msg)) async def _emit_agent_delta_event( self, @@ -1983,6 +2012,9 @@ async def _invoke_agent( if fn_call.arguments: context_text = f"Arguments: {fn_call.arguments}" + # Store the original FunctionApprovalRequestContent for later use + self._pending_tool_request = user_input_request + # Create and send the human intervention request for tool approval request = _MagenticHumanInterventionRequest( kind=MagenticHumanInterventionKind.TOOL_APPROVAL, @@ -2082,7 +2114,6 @@ def __init__(self) -> None: self._manager: MagenticManagerBase | None = None self._enable_plan_review: bool = False self._checkpoint_storage: CheckpointStorage | None = None - self._enable_human_input: bool = False self._enable_stall_intervention: bool = False def participants(self, **participants: AgentProtocol | Executor) -> Self: @@ -2169,26 +2200,6 @@ def with_plan_review(self, enable: bool = True) -> "MagenticBuilder": self._enable_plan_review = enable return self - def with_human_input(self, enable: bool = True) -> "MagenticBuilder": - """[DEPRECATED] This method is no longer needed. - - Tool approval requests (user_input_requests) are now ALWAYS surfaced as - MagenticHumanInputRequest events. You do not need to call this method. - - This method is kept for backward compatibility but has no effect. - - Note: - For human intervention during workflow stalls, use :meth:`with_human_input_on_stall`. - - Args: - enable: Has no effect (kept for backward compatibility) - - Returns: - Self for method chaining - """ - self._enable_human_input = enable - return self - def with_human_input_on_stall(self, enable: bool = True) -> "MagenticBuilder": """Enable human intervention when the workflow detects a stall. @@ -2475,8 +2486,6 @@ def _orchestrator_factory(wiring: _GroupChatConfig) -> Executor: executor_id="magentic_orchestrator", ) - enable_human_input = self._enable_human_input - def _participant_factory( spec: GroupChatParticipantSpec, wiring: _GroupChatConfig, @@ -2484,7 +2493,6 @@ def _participant_factory( agent_executor = MagenticAgentExecutor( spec.participant, spec.name, - enable_human_input=enable_human_input, ) orchestrator = wiring.orchestrator if isinstance(orchestrator, MagenticOrchestratorExecutor): diff --git a/python/samples/getting_started/workflows/README.md b/python/samples/getting_started/workflows/README.md index 89d4e53102..96eb7a8e85 100644 --- a/python/samples/getting_started/workflows/README.md +++ b/python/samples/getting_started/workflows/README.md @@ -101,7 +101,7 @@ For observability samples in Agent Framework, see the [observability getting sta | Handoff (Return-to-Previous) | [orchestration/handoff_return_to_previous.py](./orchestration/handoff_return_to_previous.py) | Return-to-previous routing: after user input, routes back to the previous specialist instead of coordinator using `.enable_return_to_previous()` | | Magentic Workflow (Multi-Agent) | [orchestration/magentic.py](./orchestration/magentic.py) | Orchestrate multiple agents with Magentic manager and streaming | | Magentic + Human Plan Review | [orchestration/magentic_human_plan_update.py](./orchestration/magentic_human_plan_update.py) | Human reviews/updates the plan before execution | -| Magentic + Human Input (HITL) | [orchestration/magentic_human_in_the_loop.py](./orchestration/magentic_human_in_the_loop.py) | Plan review plus human input during agent execution | +| Magentic + Agent Clarification | [orchestration/magentic_agent_clarification.py](./orchestration/magentic_agent_clarification.py) | Agents ask clarifying questions via `ask_user` tool with `@ai_function(approval_mode="always_require")` | | Magentic + Checkpoint Resume | [orchestration/magentic_checkpoint.py](./orchestration/magentic_checkpoint.py) | Resume Magentic orchestration from saved checkpoints | | Sequential Orchestration (Agents) | [orchestration/sequential_agents.py](./orchestration/sequential_agents.py) | Chain agents sequentially with shared conversation context | | Sequential Orchestration (Custom Executor) | [orchestration/sequential_custom_executors.py](./orchestration/sequential_custom_executors.py) | Mix agents with a summarizer that appends a compact summary | diff --git a/python/samples/getting_started/workflows/orchestration/magentic_agent_clarification.py b/python/samples/getting_started/workflows/orchestration/magentic_agent_clarification.py new file mode 100644 index 0000000000..44dea25acc --- /dev/null +++ b/python/samples/getting_started/workflows/orchestration/magentic_agent_clarification.py @@ -0,0 +1,230 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +import logging +from typing import Annotated, cast + +from agent_framework import ( + MAGENTIC_EVENT_TYPE_AGENT_DELTA, + MAGENTIC_EVENT_TYPE_ORCHESTRATOR, + AgentRunUpdateEvent, + ChatAgent, + ChatMessage, + MagenticBuilder, + MagenticHumanInterventionDecision, + MagenticHumanInterventionKind, + MagenticHumanInterventionReply, + MagenticHumanInterventionRequest, + RequestInfoEvent, + WorkflowOutputEvent, + ai_function, +) +from agent_framework.openai import OpenAIChatClient + +logging.basicConfig(level=logging.WARNING) +logger = logging.getLogger(__name__) + +""" +Sample: Agent Clarification via Tool Calls in Magentic Workflows + +This sample demonstrates how agents can ask clarifying questions to users during +execution via the HITL (Human-in-the-Loop) mechanism. + +Scenario: "Onboard Jessica Smith" +- User provides an ambiguous task: "Onboard Jessica Smith" +- The onboarding agent recognizes missing information and uses the ask_user tool +- The ask_user call surfaces as a TOOL_APPROVAL request via RequestInfoEvent +- User provides the answer (e.g., "Engineering, Software Engineer") +- The answer is fed back to the agent as a FunctionResultContent +- Agent continues execution with the clarified information + +How it works: +1. Agent has an `ask_user` tool decorated with `@ai_function(approval_mode="always_require")` +2. When agent calls `ask_user`, it surfaces as a FunctionApprovalRequestContent +3. MagenticAgentExecutor converts this to a MagenticHumanInterventionRequest(kind=TOOL_APPROVAL) +4. User provides answer via MagenticHumanInterventionReply with response_text +5. The response_text becomes the function result fed back to the agent +6. Agent receives the result and continues processing + +Prerequisites: +- OpenAI credentials configured for `OpenAIChatClient`. +""" + + +@ai_function(approval_mode="always_require") +def ask_user(question: Annotated[str, "The question to ask the user for clarification"]) -> str: + """Ask the user a clarifying question to gather missing information. + + Use this tool when you need additional information from the user to complete + your task effectively. The user's response will be returned so you can + continue with your work. + + Args: + question: The question to ask the user + + Returns: + The user's response to the question + """ + # This function body is a placeholder - the actual interaction happens via HITL. + # When the agent calls this tool: + # 1. The tool call surfaces as a FunctionApprovalRequestContent + # 2. MagenticAgentExecutor detects this and emits a HITL request + # 3. The user provides their answer + # 4. The answer is fed back as the function result + return f"User was asked: {question}" + + +async def main() -> None: + # Create an onboarding agent that asks clarifying questions + onboarding_agent = ChatAgent( + name="OnboardingAgent", + description="HR specialist who handles employee onboarding", + instructions=( + "You are an HR Onboarding Specialist. Your job is to onboard new employees.\n\n" + "IMPORTANT: When given an onboarding request, you MUST gather the following " + "information before proceeding:\n" + "1. Department (e.g., Engineering, Sales, Marketing)\n" + "2. Role/Title (e.g., Software Engineer, Account Executive)\n" + "3. Start date (if not specified)\n" + "4. Manager's name (if known)\n\n" + "Use the ask_user tool to request ANY missing information. " + "Do not proceed with onboarding until you have at least the department and role.\n\n" + "Once you have the information, create an onboarding plan." + ), + chat_client=OpenAIChatClient(model_id="gpt-4o"), + tools=[ask_user], # Tool decorated with @ai_function(approval_mode="always_require") + ) + + # Create a manager agent + manager_agent = ChatAgent( + name="MagenticManager", + description="Orchestrator that coordinates the onboarding workflow", + instructions="You coordinate a team to complete HR tasks efficiently.", + chat_client=OpenAIChatClient(model_id="gpt-4o"), + ) + + print("\nBuilding Magentic Workflow with Agent Clarification...") + + workflow = ( + MagenticBuilder() + .participants(onboarding=onboarding_agent) + .with_standard_manager( + agent=manager_agent, + max_round_count=10, + max_stall_count=3, + max_reset_count=2, + ) + .build() + ) + + # Ambiguous task - agent should ask for clarification + task = "Onboard Jessica Smith" + + print(f"\nTask: {task}") + print("(This is intentionally vague - the agent should ask for more details)") + print("\nStarting workflow execution...") + print("=" * 60) + + try: + pending_request: RequestInfoEvent | None = None + pending_responses: dict[str, object] | None = None + completed = False + workflow_output: str | None = None + + last_stream_agent_id: str | None = None + stream_line_open: bool = False + + while not completed: + if pending_responses is not None: + stream = workflow.send_responses_streaming(pending_responses) + else: + stream = workflow.run_stream(task) + + async for event in stream: + if isinstance(event, AgentRunUpdateEvent): + props = event.data.additional_properties if event.data else None + event_type = props.get("magentic_event_type") if props else None + + if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR: + kind = props.get("orchestrator_message_kind", "") if props else "" + text = event.data.text if event.data else "" + if stream_line_open: + print() + stream_line_open = False + print(f"\n[ORCHESTRATOR: {kind}]\n{text}\n{'-' * 40}") + elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA: + agent_id = props.get("agent_id", "unknown") if props else "unknown" + if last_stream_agent_id != agent_id or not stream_line_open: + if stream_line_open: + print() + print(f"\n[{agent_id}]: ", end="", flush=True) + last_stream_agent_id = agent_id + stream_line_open = True + if event.data and event.data.text: + print(event.data.text, end="", flush=True) + + elif isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest: + if stream_line_open: + print() + stream_line_open = False + pending_request = event + req = cast(MagenticHumanInterventionRequest, event.data) + + if req.kind == MagenticHumanInterventionKind.TOOL_APPROVAL: + print("\n" + "=" * 60) + print("AGENT ASKING FOR CLARIFICATION") + print("=" * 60) + print(f"\nAgent: {req.agent_id}") + print(f"Question: {req.prompt}") + if req.context: + print(f"Details: {req.context}") + print() + + elif isinstance(event, WorkflowOutputEvent): + if stream_line_open: + print() + stream_line_open = False + workflow_output = event.data if event.data else None + completed = True + + if stream_line_open: + print() + stream_line_open = False + pending_responses = None + + if pending_request is not None: + req = cast(MagenticHumanInterventionRequest, pending_request.data) + + if req.kind == MagenticHumanInterventionKind.TOOL_APPROVAL: + # Agent is asking for clarification + print("Please provide your answer:") + answer = input("> ").strip() # noqa: ASYNC250 + + if answer.lower() == "exit": + print("Exiting workflow...") + return + + # Send the answer back - it will be fed to the agent as the function result + reply = MagenticHumanInterventionReply( + decision=MagenticHumanInterventionDecision.APPROVE, + response_text=answer if answer else "No additional information provided.", + ) + pending_responses = {pending_request.request_id: reply} + pending_request = None + + print("\n" + "=" * 60) + print("WORKFLOW COMPLETED") + print("=" * 60) + if workflow_output: + messages = cast(list[ChatMessage], workflow_output) + if messages: + final_msg = messages[-1] + print(f"\nFinal Result:\n{final_msg.text}") + + except Exception as e: + print(f"Workflow execution failed: {e}") + logger.exception("Workflow exception", exc_info=e) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/samples/getting_started/workflows/orchestration/magentic_human_in_the_loop.py b/python/samples/getting_started/workflows/orchestration/magentic_human_in_the_loop.py deleted file mode 100644 index 1021e1371f..0000000000 --- a/python/samples/getting_started/workflows/orchestration/magentic_human_in_the_loop.py +++ /dev/null @@ -1,324 +0,0 @@ -# Copyright (c) Microsoft. All rights reserved. - -import asyncio -import logging -from typing import cast - -from agent_framework import ( - MAGENTIC_EVENT_TYPE_AGENT_DELTA, - MAGENTIC_EVENT_TYPE_ORCHESTRATOR, - AgentRunUpdateEvent, - ChatAgent, - ChatMessage, - MagenticBuilder, - MagenticHumanInterventionDecision, - MagenticHumanInterventionKind, - MagenticHumanInterventionReply, - MagenticHumanInterventionRequest, - RequestInfoEvent, - WorkflowOutputEvent, -) -from agent_framework.openai import OpenAIChatClient - -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - -""" -Sample: Magentic Orchestration with Human-in-the-Loop - -This sample demonstrates the unified human intervention pattern in Magentic workflows. -All HITL scenarios use a single request/reply type with a `kind` field: - -1. PLAN_REVIEW: Human reviews and approves/revises the initial plan before execution -2. TOOL_APPROVAL: Agents request approval for tool/function calls (always surfaced) -3. STALL: Human intervention when workflow detects agents are not making progress - -Key types: -- MagenticHumanInterventionRequest: Unified request with `kind` field -- MagenticHumanInterventionReply: Unified reply with `decision` field -- MagenticHumanInterventionKind: PLAN_REVIEW, TOOL_APPROVAL, STALL -- MagenticHumanInterventionDecision: APPROVE, REVISE, CONTINUE, REPLAN, GUIDANCE - -Key behaviors demonstrated: -- with_plan_review(): Enables plan review before starting -- with_human_input_on_stall(): Enables human intervention when workflow stalls -- Tool approval requests are always surfaced (foundational behavior) -- Single unified handler for all intervention types - -Use cases: -- Plan review: Validating the orchestrator's understanding of requirements -- Tool approval: High-stakes decisions requiring human approval -- Stall intervention: Complex tasks where human guidance helps agents get back on track - -Prerequisites: -- OpenAI credentials configured for `OpenAIChatClient`. -""" - - -async def main() -> None: - # Create a research agent that may need human clarification - researcher_agent = ChatAgent( - name="ResearcherAgent", - description="Specialist in research and information gathering", - instructions=( - "You are a Researcher. When you encounter ambiguous or unclear aspects " - "of a research task, ask for clarification before proceeding. " - "You find information without additional computation or quantitative analysis." - ), - chat_client=OpenAIChatClient(model_id="gpt-4o"), - ) - - # Create an analyst agent that processes and summarizes information - analyst_agent = ChatAgent( - name="AnalystAgent", - description="Data analyst who processes and summarizes research findings", - instructions=( - "You are an Analyst. You take research findings and create clear, " - "structured summaries with actionable insights. When the analysis " - "direction is unclear, request guidance from the user." - ), - chat_client=OpenAIChatClient(model_id="gpt-4o"), - ) - - # Create a manager agent with specific model options for deterministic planning - manager_agent = ChatAgent( - name="MagenticManager", - description="Orchestrator that coordinates the research and analysis workflow", - instructions="You coordinate a team to complete research and analysis tasks efficiently.", - chat_client=OpenAIChatClient(model_id="gpt-4o"), - ) - - print("\nBuilding Magentic Workflow with HITL capabilities...") - - # Build workflow with plan review and stall intervention enabled - # Note: Tool approval (user_input_requests) is always surfaced - no need to enable - workflow = ( - MagenticBuilder() - .participants(researcher=researcher_agent, analyst=analyst_agent) - .with_standard_manager( - agent=manager_agent, # Use agent for control over model behavior - max_round_count=10, - max_stall_count=3, - max_reset_count=2, - ) - .with_plan_review() # Enable plan review before execution - .with_human_input_on_stall() # Enable human intervention when workflow stalls - .build() - ) - - task = ( - "Research the latest developments in sustainable aviation fuel (SAF) technology. " - "Focus on production methods, current adoption rates, and major challenges. " - "Then analyze the findings and provide recommendations for airline companies " - "considering SAF adoption." - ) - - print(f"\nTask: {task}") - print("\nStarting workflow execution...") - print("=" * 60) - - try: - pending_request: RequestInfoEvent | None = None - pending_responses: dict[str, object] | None = None - completed = False - workflow_output: str | None = None - - last_stream_agent_id: str | None = None - stream_line_open: bool = False - - while not completed: - # Use streaming for both initial run and response sending - if pending_responses is not None: - stream = workflow.send_responses_streaming(pending_responses) - else: - stream = workflow.run_stream(task) - - # Collect events from the stream - async for event in stream: - if isinstance(event, AgentRunUpdateEvent): - props = event.data.additional_properties if event.data else None - event_type = props.get("magentic_event_type") if props else None - - if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR: - kind = props.get("orchestrator_message_kind", "") if props else "" - text = event.data.text if event.data else "" - if stream_line_open: - print() - stream_line_open = False - print(f"\n[ORCHESTRATOR: {kind}]\n{text}\n{'-' * 40}") - elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA: - agent_id = props.get("agent_id", "unknown") if props else "unknown" - if last_stream_agent_id != agent_id or not stream_line_open: - if stream_line_open: - print() - print(f"\n[{agent_id}]: ", end="", flush=True) - last_stream_agent_id = agent_id - stream_line_open = True - if event.data and event.data.text: - print(event.data.text, end="", flush=True) - - # Handle Plan Review Request - elif isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest: - if stream_line_open: - print() - stream_line_open = False - pending_request = event - req = cast(MagenticHumanInterventionRequest, event.data) - - if req.kind == MagenticHumanInterventionKind.PLAN_REVIEW: - print("\n" + "=" * 60) - print("PLAN REVIEW REQUEST") - print("=" * 60) - if req.plan_text: - print(f"\nProposed Plan:\n{req.plan_text}") - print() - - elif req.kind == MagenticHumanInterventionKind.TOOL_APPROVAL: - print("\n" + "=" * 60) - print("TOOL APPROVAL REQUESTED") - print("=" * 60) - print(f"\nAgent: {req.agent_id}") - print(f"Request: {req.prompt}") - if req.context: - print(f"Context: {req.context}") - print() - - elif req.kind == MagenticHumanInterventionKind.STALL: - print("\n" + "=" * 60) - print("STALL INTERVENTION REQUESTED") - print("=" * 60) - print(f"\nWorkflow appears stalled after {req.stall_count} rounds") - print(f"Reason: {req.stall_reason}") - if req.last_agent: - print(f"Last active agent: {req.last_agent}") - if req.plan_text: - print(f"\nCurrent plan:\n{req.plan_text}") - print() - - elif isinstance(event, WorkflowOutputEvent): - if stream_line_open: - print() - stream_line_open = False - workflow_output = event.data if event.data else None - completed = True - - if stream_line_open: - print() - stream_line_open = False - pending_responses = None - - # Handle pending requests - if pending_request is not None: - req = cast(MagenticHumanInterventionRequest, pending_request.data) - reply: MagenticHumanInterventionReply | None = None - - if req.kind == MagenticHumanInterventionKind.PLAN_REVIEW: - # Handle plan review - print("Plan review options:") - print("1. approve - Approve the plan as-is") - print("2. approve with comments - Approve with feedback") - print("3. revise - Request revision with feedback") - print("4. exit - Exit the workflow") - - while True: - choice = input("Enter your choice (1-4): ").strip().lower() # noqa: ASYNC250 - if choice in ["approve", "1"]: - reply = MagenticHumanInterventionReply(decision=MagenticHumanInterventionDecision.APPROVE) - break - if choice in ["approve with comments", "2"]: - comments = input("Enter your comments: ").strip() # noqa: ASYNC250 - reply = MagenticHumanInterventionReply( - decision=MagenticHumanInterventionDecision.APPROVE, - comments=comments if comments else None, - ) - break - if choice in ["revise", "3"]: - comments = input("Enter feedback for revision: ").strip() # noqa: ASYNC250 - reply = MagenticHumanInterventionReply( - decision=MagenticHumanInterventionDecision.REVISE, - comments=comments if comments else None, - ) - break - if choice in ["exit", "4"]: - print("Exiting workflow...") - return - print("Invalid choice. Please enter a number 1-4.") - - elif req.kind == MagenticHumanInterventionKind.TOOL_APPROVAL: - # Handle tool approval request - print("Tool approval options:") - print("1. approve - Allow the tool call") - print("2. deny - Reject the tool call") - print("3. guidance - Provide guidance instead") - print("4. exit - Exit the workflow") - - while True: - choice = input("Enter your choice (1-4): ").strip().lower() # noqa: ASYNC250 - if choice in ["approve", "1"]: - reply = MagenticHumanInterventionReply(decision=MagenticHumanInterventionDecision.APPROVE) - break - if choice in ["deny", "2"]: - reply = MagenticHumanInterventionReply(decision=MagenticHumanInterventionDecision.REJECT) - break - if choice in ["guidance", "3"]: - guidance = input("Enter your guidance: ").strip() # noqa: ASYNC250 - reply = MagenticHumanInterventionReply( - decision=MagenticHumanInterventionDecision.GUIDANCE, - comments=guidance if guidance else None, - ) - break - if choice in ["exit", "4"]: - print("Exiting workflow...") - return - print("Invalid choice. Please enter a number 1-4.") - - elif req.kind == MagenticHumanInterventionKind.STALL: - # Handle stall intervention request - print("Stall intervention options:") - print("1. continue - Continue with current plan (reset stall counter)") - print("2. replan - Trigger automatic replanning") - print("3. guidance - Provide guidance to help agents get back on track") - print("4. exit - Exit the workflow") - - while True: - choice = input("Enter your choice (1-4): ").strip().lower() # noqa: ASYNC250 - if choice in ["continue", "1"]: - reply = MagenticHumanInterventionReply(decision=MagenticHumanInterventionDecision.CONTINUE) - break - if choice in ["replan", "2"]: - reply = MagenticHumanInterventionReply(decision=MagenticHumanInterventionDecision.REPLAN) - break - if choice in ["guidance", "3"]: - guidance = input("Enter your guidance: ").strip() # noqa: ASYNC250 - reply = MagenticHumanInterventionReply( - decision=MagenticHumanInterventionDecision.GUIDANCE, - comments=guidance if guidance else None, - ) - break - if choice in ["exit", "4"]: - print("Exiting workflow...") - return - print("Invalid choice. Please enter a number 1-4.") - - if reply is not None: - pending_responses = {pending_request.request_id: reply} - pending_request = None - - # Show final result - print("\n" + "=" * 60) - print("WORKFLOW COMPLETED") - print("=" * 60) - if workflow_output: - # workflow_output is a list[ChatMessage] - messages = cast(list[ChatMessage], workflow_output) - if messages: - final_msg = messages[-1] - print(f"\nFinal Result:\n{final_msg.text}") - - except Exception as e: - print(f"Workflow execution failed: {e}") - logger.exception("Workflow exception", exc_info=e) - - -if __name__ == "__main__": - asyncio.run(main()) From 69836c189db340bcdc0b6f52337b8b528da9f6c0 Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Tue, 2 Dec 2025 16:21:12 +0900 Subject: [PATCH 3/6] human plan intervention sample --- .../getting_started/workflows/README.md | 1 + .../orchestration/magentic_human_replan.py | 213 ++++++++++++++++++ 2 files changed, 214 insertions(+) create mode 100644 python/samples/getting_started/workflows/orchestration/magentic_human_replan.py diff --git a/python/samples/getting_started/workflows/README.md b/python/samples/getting_started/workflows/README.md index 96eb7a8e85..258aa64b1d 100644 --- a/python/samples/getting_started/workflows/README.md +++ b/python/samples/getting_started/workflows/README.md @@ -101,6 +101,7 @@ For observability samples in Agent Framework, see the [observability getting sta | Handoff (Return-to-Previous) | [orchestration/handoff_return_to_previous.py](./orchestration/handoff_return_to_previous.py) | Return-to-previous routing: after user input, routes back to the previous specialist instead of coordinator using `.enable_return_to_previous()` | | Magentic Workflow (Multi-Agent) | [orchestration/magentic.py](./orchestration/magentic.py) | Orchestrate multiple agents with Magentic manager and streaming | | Magentic + Human Plan Review | [orchestration/magentic_human_plan_update.py](./orchestration/magentic_human_plan_update.py) | Human reviews/updates the plan before execution | +| Magentic + Human Stall Intervention | [orchestration/magentic_human_replan.py](./orchestration/magentic_human_replan.py) | Human intervenes when workflow stalls with `with_human_input_on_stall()` | | Magentic + Agent Clarification | [orchestration/magentic_agent_clarification.py](./orchestration/magentic_agent_clarification.py) | Agents ask clarifying questions via `ask_user` tool with `@ai_function(approval_mode="always_require")` | | Magentic + Checkpoint Resume | [orchestration/magentic_checkpoint.py](./orchestration/magentic_checkpoint.py) | Resume Magentic orchestration from saved checkpoints | | Sequential Orchestration (Agents) | [orchestration/sequential_agents.py](./orchestration/sequential_agents.py) | Chain agents sequentially with shared conversation context | diff --git a/python/samples/getting_started/workflows/orchestration/magentic_human_replan.py b/python/samples/getting_started/workflows/orchestration/magentic_human_replan.py new file mode 100644 index 0000000000..aaa9be66f8 --- /dev/null +++ b/python/samples/getting_started/workflows/orchestration/magentic_human_replan.py @@ -0,0 +1,213 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +import logging +from typing import cast + +from agent_framework import ( + MAGENTIC_EVENT_TYPE_AGENT_DELTA, + MAGENTIC_EVENT_TYPE_ORCHESTRATOR, + AgentRunUpdateEvent, + ChatAgent, + ChatMessage, + MagenticBuilder, + MagenticHumanInterventionDecision, + MagenticHumanInterventionKind, + MagenticHumanInterventionReply, + MagenticHumanInterventionRequest, + RequestInfoEvent, + WorkflowOutputEvent, +) +from agent_framework.openai import OpenAIChatClient + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +""" +Sample: Magentic Orchestration with Human Stall Intervention + +This sample demonstrates how humans can intervene when a Magentic workflow stalls. +When agents stop making progress, the workflow requests human input instead of +automatically replanning. + +Key concepts: +- with_human_input_on_stall(): Enables human intervention when workflow detects stalls +- MagenticHumanInterventionKind.STALL: The request kind for stall interventions +- Human can choose to: continue, trigger replan, or provide guidance + +Stall intervention options: +- CONTINUE: Reset stall counter and continue with current plan +- REPLAN: Trigger automatic replanning by the manager +- GUIDANCE: Provide text guidance to help agents get back on track + +Prerequisites: +- OpenAI credentials configured for `OpenAIChatClient`. + +NOTE: it is sometimes difficult to get the agents to actually stall depending on the task. +""" + + +async def main() -> None: + researcher_agent = ChatAgent( + name="ResearcherAgent", + description="Specialist in research and information gathering", + instructions="You are a Researcher. You find information and gather facts.", + chat_client=OpenAIChatClient(model_id="gpt-4o"), + ) + + analyst_agent = ChatAgent( + name="AnalystAgent", + description="Data analyst who processes and summarizes research findings", + instructions="You are an Analyst. You analyze findings and create summaries.", + chat_client=OpenAIChatClient(model_id="gpt-4o"), + ) + + manager_agent = ChatAgent( + name="MagenticManager", + description="Orchestrator that coordinates the workflow", + instructions="You coordinate a team to complete tasks efficiently.", + chat_client=OpenAIChatClient(model_id="gpt-4o"), + ) + + print("\nBuilding Magentic Workflow with Human Stall Intervention...") + + workflow = ( + MagenticBuilder() + .participants(researcher=researcher_agent, analyst=analyst_agent) + .with_standard_manager( + agent=manager_agent, + max_round_count=10, + max_stall_count=1, # Stall detection after 1 round without progress + max_reset_count=2, + ) + .with_human_input_on_stall() # Request human input when stalled (instead of auto-replan) + .build() + ) + + task = "Research sustainable aviation fuel technology and summarize the findings." + + print(f"\nTask: {task}") + print("\nStarting workflow execution...") + print("=" * 60) + + try: + pending_request: RequestInfoEvent | None = None + pending_responses: dict[str, object] | None = None + completed = False + workflow_output: str | None = None + + last_stream_agent_id: str | None = None + stream_line_open: bool = False + + while not completed: + if pending_responses is not None: + stream = workflow.send_responses_streaming(pending_responses) + else: + stream = workflow.run_stream(task) + + async for event in stream: + if isinstance(event, AgentRunUpdateEvent): + props = event.data.additional_properties if event.data else None + event_type = props.get("magentic_event_type") if props else None + + if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR: + kind = props.get("orchestrator_message_kind", "") if props else "" + text = event.data.text if event.data else "" + if stream_line_open: + print() + stream_line_open = False + print(f"\n[ORCHESTRATOR: {kind}]\n{text}\n{'-' * 40}") + elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA: + agent_id = props.get("agent_id", "unknown") if props else "unknown" + if last_stream_agent_id != agent_id or not stream_line_open: + if stream_line_open: + print() + print(f"\n[{agent_id}]: ", end="", flush=True) + last_stream_agent_id = agent_id + stream_line_open = True + if event.data and event.data.text: + print(event.data.text, end="", flush=True) + + elif isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest: + if stream_line_open: + print() + stream_line_open = False + pending_request = event + req = cast(MagenticHumanInterventionRequest, event.data) + + if req.kind == MagenticHumanInterventionKind.STALL: + print("\n" + "=" * 60) + print("STALL INTERVENTION REQUESTED") + print("=" * 60) + print(f"\nWorkflow appears stalled after {req.stall_count} rounds") + print(f"Reason: {req.stall_reason}") + if req.last_agent: + print(f"Last active agent: {req.last_agent}") + if req.plan_text: + print(f"\nCurrent plan:\n{req.plan_text}") + print() + + elif isinstance(event, WorkflowOutputEvent): + if stream_line_open: + print() + stream_line_open = False + workflow_output = event.data if event.data else None + completed = True + + if stream_line_open: + print() + stream_line_open = False + pending_responses = None + + # Handle stall intervention request + if pending_request is not None: + req = cast(MagenticHumanInterventionRequest, pending_request.data) + reply: MagenticHumanInterventionReply | None = None + + if req.kind == MagenticHumanInterventionKind.STALL: + print("Stall intervention options:") + print("1. continue - Continue with current plan (reset stall counter)") + print("2. replan - Trigger automatic replanning") + print("3. guidance - Provide guidance to help agents") + print("4. exit - Exit the workflow") + + while True: + choice = input("Enter your choice (1-4): ").strip().lower() # noqa: ASYNC250 + if choice in ["continue", "1"]: + reply = MagenticHumanInterventionReply(decision=MagenticHumanInterventionDecision.CONTINUE) + break + if choice in ["replan", "2"]: + reply = MagenticHumanInterventionReply(decision=MagenticHumanInterventionDecision.REPLAN) + break + if choice in ["guidance", "3"]: + guidance = input("Enter your guidance: ").strip() # noqa: ASYNC250 + reply = MagenticHumanInterventionReply( + decision=MagenticHumanInterventionDecision.GUIDANCE, + comments=guidance if guidance else None, + ) + break + if choice in ["exit", "4"]: + print("Exiting workflow...") + return + print("Invalid choice. Please enter a number 1-4.") + + if reply is not None: + pending_responses = {pending_request.request_id: reply} + pending_request = None + + print("\n" + "=" * 60) + print("WORKFLOW COMPLETED") + print("=" * 60) + if workflow_output: + messages = cast(list[ChatMessage], workflow_output) + if messages: + final_msg = messages[-1] + print(f"\nFinal Result:\n{final_msg.text}") + + except Exception as e: + print(f"Workflow execution failed: {e}") + logger.exception("Workflow exception", exc_info=e) + + +if __name__ == "__main__": + asyncio.run(main()) From b108bdffc6bbdd61c0901c00daf0931dcd039137 Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Tue, 2 Dec 2025 16:31:43 +0900 Subject: [PATCH 4/6] Clean up --- .../agent_framework/_workflows/__init__.py | 6 -- .../agent_framework/_workflows/_magentic.py | 63 ++++++------------- .../core/tests/workflow/test_magentic.py | 21 ++++--- .../orchestration/magentic_checkpoint.py | 20 +++--- .../magentic_human_plan_update.py | 34 +++++----- 5 files changed, 59 insertions(+), 85 deletions(-) diff --git a/python/packages/core/agent_framework/_workflows/__init__.py b/python/packages/core/agent_framework/_workflows/__init__.py index cce4355779..04623c87d9 100644 --- a/python/packages/core/agent_framework/_workflows/__init__.py +++ b/python/packages/core/agent_framework/_workflows/__init__.py @@ -80,9 +80,6 @@ MagenticHumanInterventionReply, MagenticHumanInterventionRequest, MagenticManagerBase, - MagenticPlanReviewDecision, - MagenticPlanReviewReply, - MagenticPlanReviewRequest, MagenticStallInterventionDecision, MagenticStallInterventionReply, MagenticStallInterventionRequest, @@ -158,9 +155,6 @@ "MagenticHumanInterventionReply", "MagenticHumanInterventionRequest", "MagenticManagerBase", - "MagenticPlanReviewDecision", - "MagenticPlanReviewReply", - "MagenticPlanReviewRequest", "MagenticStallInterventionDecision", "MagenticStallInterventionReply", "MagenticStallInterventionRequest", diff --git a/python/packages/core/agent_framework/_workflows/_magentic.py b/python/packages/core/agent_framework/_workflows/_magentic.py index 0a8e21e088..efcc55f20d 100644 --- a/python/packages/core/agent_framework/_workflows/_magentic.py +++ b/python/packages/core/agent_framework/_workflows/_magentic.py @@ -468,34 +468,6 @@ class _MagenticHumanInterventionReply: response_text: str | None = None -# Backward compatibility aliases for existing types -@dataclass -class _MagenticPlanReviewRequest: - """[DEPRECATED] Use MagenticHumanInterventionRequest with kind=PLAN_REVIEW instead.""" - - request_id: str = field(default_factory=lambda: str(uuid4())) - task_text: str = "" - facts_text: str = "" - plan_text: str = "" - round_index: int = 0 - - -class MagenticPlanReviewDecision(str, Enum): - """[DEPRECATED] Use MagenticHumanInterventionDecision instead.""" - - APPROVE = "approve" - REVISE = "revise" - - -@dataclass -class _MagenticPlanReviewReply: - """[DEPRECATED] Use MagenticHumanInterventionReply instead.""" - - decision: MagenticPlanReviewDecision - edited_plan_text: str | None = None - comments: str | None = None - - # endregion Human Intervention Types @@ -1199,7 +1171,7 @@ async def handle_start_message( self, message: _MagenticStartMessage, context: WorkflowContext[ - _MagenticResponseMessage | _MagenticRequestMessage | _MagenticPlanReviewRequest, list[ChatMessage] + _MagenticResponseMessage | _MagenticRequestMessage | _MagenticHumanInterventionRequest, list[ChatMessage] ], ) -> None: """Handle the initial start message to begin orchestration.""" @@ -1244,7 +1216,7 @@ async def handle_task_text( self, task_text: str, context: WorkflowContext[ - _MagenticResponseMessage | _MagenticRequestMessage | _MagenticPlanReviewRequest, list[ChatMessage] + _MagenticResponseMessage | _MagenticRequestMessage | _MagenticHumanInterventionRequest, list[ChatMessage] ], ) -> None: await self.handle_start_message(_MagenticStartMessage.from_string(task_text), context) @@ -1254,7 +1226,7 @@ async def handle_task_message( self, task_message: ChatMessage, context: WorkflowContext[ - _MagenticResponseMessage | _MagenticRequestMessage | _MagenticPlanReviewRequest, list[ChatMessage] + _MagenticResponseMessage | _MagenticRequestMessage | _MagenticHumanInterventionRequest, list[ChatMessage] ], ) -> None: await self.handle_start_message(_MagenticStartMessage(task_message), context) @@ -1264,7 +1236,7 @@ async def handle_task_messages( self, conversation: list[ChatMessage], context: WorkflowContext[ - _MagenticResponseMessage | _MagenticRequestMessage | _MagenticPlanReviewRequest, list[ChatMessage] + _MagenticResponseMessage | _MagenticRequestMessage | _MagenticHumanInterventionRequest, list[ChatMessage] ], ) -> None: await self.handle_start_message(_MagenticStartMessage(conversation), context) @@ -2157,9 +2129,9 @@ def with_plan_review(self, enable: bool = True) -> "MagenticBuilder": """Enable or disable human-in-the-loop plan review before task execution. When enabled, the workflow will pause after the manager generates the initial - plan and emit a _MagenticPlanReviewRequest event. A human reviewer can then - approve, request revisions, or reject the plan. The workflow continues only - after approval. + plan and emit a MagenticHumanInterventionRequest event with kind=PLAN_REVIEW. + A human reviewer can then approve, request revisions, or reject the plan. + The workflow continues only after approval. This is useful for: - High-stakes tasks requiring human oversight @@ -2180,22 +2152,25 @@ def with_plan_review(self, enable: bool = True) -> "MagenticBuilder": workflow = ( MagenticBuilder() .participants(agent1=agent1) - .with_standard_manager(chat_client=client) + .with_standard_manager(agent=manager_agent) .with_plan_review(enable=True) .build() ) # During execution, handle plan review async for event in workflow.run_stream("task"): - if isinstance(event, _MagenticPlanReviewRequest): - # Review plan and respond - reply = _MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.APPROVE) - await workflow.send(reply) + if isinstance(event, RequestInfoEvent): + request = event.data + if isinstance(request, MagenticHumanInterventionRequest): + if request.kind == MagenticHumanInterventionKind.PLAN_REVIEW: + # Review plan and respond + reply = MagenticHumanInterventionReply(decision=MagenticHumanInterventionDecision.APPROVE) + await workflow.send_responses({event.request_id: reply}) See Also: - - :class:`_MagenticPlanReviewRequest`: Event emitted for review - - :class:`_MagenticPlanReviewReply`: Response to send back - - :class:`MagenticPlanReviewDecision`: Approve/Revise/Reject options + - :class:`MagenticHumanInterventionRequest`: Event emitted for review + - :class:`MagenticHumanInterventionReply`: Response to send back + - :class:`MagenticHumanInterventionDecision`: APPROVE/REVISE options """ self._enable_plan_review = enable return self @@ -2745,8 +2720,6 @@ def __getattr__(self, name: str) -> Any: MagenticHumanInterventionReply = _MagenticHumanInterventionReply # Backward compatibility aliases (deprecated) -MagenticPlanReviewRequest = _MagenticPlanReviewRequest -MagenticPlanReviewReply = _MagenticPlanReviewReply # Old aliases - point to unified types for compatibility MagenticHumanInputRequest = _MagenticHumanInterventionRequest # type: ignore MagenticStallInterventionRequest = _MagenticHumanInterventionRequest # type: ignore diff --git a/python/packages/core/tests/workflow/test_magentic.py b/python/packages/core/tests/workflow/test_magentic.py index 2ac65e3bc2..b09baa786f 100644 --- a/python/packages/core/tests/workflow/test_magentic.py +++ b/python/packages/core/tests/workflow/test_magentic.py @@ -19,9 +19,6 @@ MagenticHumanInterventionReply, MagenticHumanInterventionRequest, MagenticManagerBase, - MagenticPlanReviewDecision, - MagenticPlanReviewReply, - MagenticPlanReviewRequest, RequestInfoEvent, Role, TextContent, @@ -59,21 +56,25 @@ def test_magentic_start_message_from_string(): assert msg.task.text == "Do the thing" -def test_plan_review_request_defaults_and_reply_variants(): - req = MagenticPlanReviewRequest() # defaults provided by dataclass +def test_human_intervention_request_defaults_and_reply_variants(): + from agent_framework._workflows._magentic import MagenticHumanInterventionKind + + req = MagenticHumanInterventionRequest(kind=MagenticHumanInterventionKind.PLAN_REVIEW) assert hasattr(req, "request_id") assert req.task_text == "" and req.facts_text == "" and req.plan_text == "" assert isinstance(req.round_index, int) and req.round_index == 0 # Replies: approve, revise with comments, revise with edited text - approve = MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.APPROVE) - revise_comments = MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.REVISE, comments="Tighten scope") - revise_text = MagenticPlanReviewReply( - decision=MagenticPlanReviewDecision.REVISE, + approve = MagenticHumanInterventionReply(decision=MagenticHumanInterventionDecision.APPROVE) + revise_comments = MagenticHumanInterventionReply( + decision=MagenticHumanInterventionDecision.REVISE, comments="Tighten scope" + ) + revise_text = MagenticHumanInterventionReply( + decision=MagenticHumanInterventionDecision.REVISE, edited_plan_text="- Step 1\n- Step 2", ) - assert approve.decision == MagenticPlanReviewDecision.APPROVE + assert approve.decision == MagenticHumanInterventionDecision.APPROVE assert revise_comments.comments == "Tighten scope" assert revise_text.edited_plan_text is not None and revise_text.edited_plan_text.startswith("- Step 1") diff --git a/python/samples/getting_started/workflows/orchestration/magentic_checkpoint.py b/python/samples/getting_started/workflows/orchestration/magentic_checkpoint.py index b34d73238e..36e6ca4c01 100644 --- a/python/samples/getting_started/workflows/orchestration/magentic_checkpoint.py +++ b/python/samples/getting_started/workflows/orchestration/magentic_checkpoint.py @@ -8,9 +8,10 @@ ChatAgent, FileCheckpointStorage, MagenticBuilder, - MagenticPlanReviewDecision, - MagenticPlanReviewReply, - MagenticPlanReviewRequest, + MagenticHumanInterventionDecision, + MagenticHumanInterventionKind, + MagenticHumanInterventionReply, + MagenticHumanInterventionRequest, RequestInfoEvent, WorkflowCheckpoint, WorkflowOutputEvent, @@ -111,9 +112,12 @@ async def main() -> None: # the plan for human review. plan_review_request_id: str | None = None async for event in workflow.run_stream(TASK): - if isinstance(event, RequestInfoEvent) and event.request_type is MagenticPlanReviewRequest: - plan_review_request_id = event.request_id - print(f"Captured plan review request: {plan_review_request_id}") + if isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest: + request = event.data + if isinstance(request, MagenticHumanInterventionRequest): + if request.kind == MagenticHumanInterventionKind.PLAN_REVIEW: + plan_review_request_id = event.request_id + print(f"Captured plan review request: {plan_review_request_id}") if isinstance(event, WorkflowStatusEvent) and event.state is WorkflowRunState.IDLE_WITH_PENDING_REQUESTS: break @@ -145,12 +149,12 @@ async def main() -> None: resumed_workflow = build_workflow(checkpoint_storage) # Construct an approval reply to supply when the plan review request is re-emitted. - approval = MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.APPROVE) + approval = MagenticHumanInterventionReply(decision=MagenticHumanInterventionDecision.APPROVE) # Resume execution and capture the re-emitted plan review request. request_info_event: RequestInfoEvent | None = None async for event in resumed_workflow.run_stream(checkpoint_id=resume_checkpoint.checkpoint_id): - if isinstance(event, RequestInfoEvent) and isinstance(event.data, MagenticPlanReviewRequest): + if isinstance(event, RequestInfoEvent) and isinstance(event.data, MagenticHumanInterventionRequest): request_info_event = event if request_info_event is None: diff --git a/python/samples/getting_started/workflows/orchestration/magentic_human_plan_update.py b/python/samples/getting_started/workflows/orchestration/magentic_human_plan_update.py index 6a3055d0e9..b96fac7e99 100644 --- a/python/samples/getting_started/workflows/orchestration/magentic_human_plan_update.py +++ b/python/samples/getting_started/workflows/orchestration/magentic_human_plan_update.py @@ -11,9 +11,10 @@ ChatAgent, HostedCodeInterpreterTool, MagenticBuilder, - MagenticPlanReviewDecision, - MagenticPlanReviewReply, - MagenticPlanReviewRequest, + MagenticHumanInterventionDecision, + MagenticHumanInterventionKind, + MagenticHumanInterventionReply, + MagenticHumanInterventionRequest, RequestInfoEvent, WorkflowOutputEvent, ) @@ -111,7 +112,7 @@ def on_exception(exception: Exception) -> None: try: pending_request: RequestInfoEvent | None = None - pending_responses: dict[str, MagenticPlanReviewReply] | None = None + pending_responses: dict[str, MagenticHumanInterventionReply] | None = None completed = False workflow_output: str | None = None @@ -142,11 +143,12 @@ def on_exception(exception: Exception) -> None: stream_line_open = True if event.data and event.data.text: print(event.data.text, end="", flush=True) - elif isinstance(event, RequestInfoEvent) and event.request_type is MagenticPlanReviewRequest: - pending_request = event - review_req = cast(MagenticPlanReviewRequest, event.data) - if review_req.plan_text: - print(f"\n=== PLAN REVIEW REQUEST ===\n{review_req.plan_text}\n") + elif isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest: + request = cast(MagenticHumanInterventionRequest, event.data) + if request.kind == MagenticHumanInterventionKind.PLAN_REVIEW: + pending_request = event + if request.plan_text: + print(f"\n=== PLAN REVIEW REQUEST ===\n{request.plan_text}\n") elif isinstance(event, WorkflowOutputEvent): # Capture workflow output during streaming workflow_output = str(event.data) if event.data else None @@ -170,19 +172,19 @@ def on_exception(exception: Exception) -> None: while True: choice = input("Enter your choice (1-5): ").strip().lower() # noqa: ASYNC250 if choice in ["approve", "1"]: - reply = MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.APPROVE) + reply = MagenticHumanInterventionReply(decision=MagenticHumanInterventionDecision.APPROVE) break if choice in ["approve with comments", "2"]: comments = input("Enter your comments for the manager: ").strip() # noqa: ASYNC250 - reply = MagenticPlanReviewReply( - decision=MagenticPlanReviewDecision.APPROVE, + reply = MagenticHumanInterventionReply( + decision=MagenticHumanInterventionDecision.APPROVE, comments=comments if comments else None, ) break if choice in ["revise", "3"]: comments = input("Enter feedback for revising the plan: ").strip() # noqa: ASYNC250 - reply = MagenticPlanReviewReply( - decision=MagenticPlanReviewDecision.REVISE, + reply = MagenticHumanInterventionReply( + decision=MagenticHumanInterventionDecision.REVISE, comments=comments if comments else None, ) break @@ -195,8 +197,8 @@ def on_exception(exception: Exception) -> None: break lines.append(line) edited_plan = "\n".join(lines) - reply = MagenticPlanReviewReply( - decision=MagenticPlanReviewDecision.REVISE, + reply = MagenticHumanInterventionReply( + decision=MagenticHumanInterventionDecision.REVISE, edited_plan_text=edited_plan if edited_plan else None, ) break From 03b15debf4a83e8eee9ba1ceb3821232727afec5 Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Tue, 2 Dec 2025 16:36:42 +0900 Subject: [PATCH 5/6] Improve loging --- .../agent_framework/_workflows/_magentic.py | 45 +++++++++---------- 1 file changed, 21 insertions(+), 24 deletions(-) diff --git a/python/packages/core/agent_framework/_workflows/_magentic.py b/python/packages/core/agent_framework/_workflows/_magentic.py index efcc55f20d..8cfe1c2c27 100644 --- a/python/packages/core/agent_framework/_workflows/_magentic.py +++ b/python/packages/core/agent_framework/_workflows/_magentic.py @@ -1081,7 +1081,7 @@ async def on_checkpoint_save(self) -> dict[str, Any]: try: state["manager_state"] = self._manager.on_checkpoint_save() except Exception as exc: - logger.warning("Failed to save manager state for checkpoint: %s\nSkipping...", exc) + logger.warning(f"Failed to save manager state for checkpoint: {exc}\nSkipping...") return state @@ -1111,14 +1111,14 @@ async def on_checkpoint_restore(self, state: dict[str, Any]) -> None: else: self._context = None except Exception as exc: # pragma: no cover - defensive - logger.warning("Failed to restore magentic context: %s", exc) + logger.warning(f"Failed to restore magentic context: {exc}") self._context = None ledger_payload = state.get("task_ledger") if ledger_payload is not None: try: self._task_ledger = _message_from_payload(ledger_payload) except Exception as exc: # pragma: no cover - logger.warning("Failed to restore task ledger message: %s", exc) + logger.warning(f"Failed to restore task ledger message: {exc}") self._task_ledger = None if "plan_review_round" in state: @@ -1138,7 +1138,7 @@ async def on_checkpoint_restore(self, state: dict[str, Any]) -> None: try: self._manager.on_checkpoint_restore(manager_state) except Exception as exc: # pragma: no cover - logger.warning("Failed to restore manager state: %s", exc) + logger.warning(f"Failed to restore manager state: {exc}") self._reconcile_restored_participants() @@ -1499,20 +1499,19 @@ async def _run_inner_loop_helper( return ctx.round_count += 1 - logger.info("Magentic Orchestrator: Inner loop - round %s", ctx.round_count) + logger.info(f"Magentic Orchestrator: Inner loop - round {ctx.round_count}") # Create progress ledger using the manager try: current_progress_ledger = await self._manager.create_progress_ledger(ctx.clone(deep=True)) except Exception as ex: - logger.warning("Magentic Orchestrator: Progress ledger creation failed, triggering reset: %s", ex) + logger.warning(f"Magentic Orchestrator: Progress ledger creation failed, triggering reset: {ex}") await self._reset_and_replan(context) return logger.debug( - "Progress evaluation: satisfied=%s, next=%s", - current_progress_ledger.is_request_satisfied.answer, - current_progress_ledger.next_speaker.answer, + f"Progress evaluation: satisfied={current_progress_ledger.is_request_satisfied.answer}, " + f"next={current_progress_ledger.next_speaker.answer}" ) # Check for task completion @@ -1528,7 +1527,7 @@ async def _run_inner_loop_helper( ctx.stall_count = max(0, ctx.stall_count - 1) if ctx.stall_count > self._manager.max_stall_count: - logger.info("Magentic Orchestrator: Stalling detected after %d rounds", ctx.stall_count) + logger.info(f"Magentic Orchestrator: Stalling detected after {ctx.stall_count} rounds") if self._enable_stall_intervention: # Request human intervention instead of auto-replan is_progress = current_progress_ledger.is_progress_being_made.answer @@ -1569,7 +1568,7 @@ async def _run_inner_loop_helper( instruction = current_progress_ledger.instruction_or_question.answer if next_speaker_value not in self._participants: - logger.warning("Invalid next speaker: %s", next_speaker_value) + logger.warning(f"Invalid next speaker: {next_speaker_value}") await self._prepare_final_answer(context) return @@ -1586,7 +1585,7 @@ async def _run_inner_loop_helper( target_executor_id = f"agent_{next_speaker_value}" # Request specific agent to respond - logger.debug("Magentic Orchestrator: Requesting %s to respond", next_speaker_value) + logger.debug(f"Magentic Orchestrator: Requesting {next_speaker_value} to respond") await context.send_message( _MagenticRequestMessage( agent_name=next_speaker_value, @@ -1650,7 +1649,7 @@ async def _check_within_limits_or_complete( if hit_round_limit or hit_reset_limit: limit_type = "round" if hit_round_limit else "reset" - logger.error("Magentic Orchestrator: Max %s count reached", limit_type) + logger.error(f"Magentic Orchestrator: Max {limit_type} count reached") # Only emit completion once and then mark terminated if not self._terminated: @@ -1743,7 +1742,7 @@ async def on_checkpoint_restore(self, state: dict[str, Any]) -> None: try: self._chat_history = decode_chat_messages(history_payload) except Exception as exc: # pragma: no cover - logger.warning("Agent %s: Failed to restore chat history: %s", self._agent_id, exc) + logger.warning(f"Agent {self._agent_id}: Failed to restore chat history: {exc}") self._chat_history = [] else: self._chat_history = [] @@ -1753,12 +1752,12 @@ async def handle_response_message( self, message: _MagenticResponseMessage, context: WorkflowContext[_MagenticResponseMessage] ) -> None: """Handle response message (task ledger broadcast).""" - logger.debug("Agent %s: Received response message", self._agent_id) + logger.debug(f"Agent {self._agent_id}: Received response message") # Check if this message is intended for this agent if message.target_agent is not None and message.target_agent != self._agent_id and not message.broadcast: # Message is targeted to a different agent, ignore it - logger.debug("Agent %s: Ignoring message targeted to %s", self._agent_id, message.target_agent) + logger.debug(f"Agent {self._agent_id}: Ignoring message targeted to {message.target_agent}") return # Add transfer message if needed @@ -1793,7 +1792,7 @@ async def handle_request_message( if message.agent_name != self._agent_id: return - logger.info("Agent %s: Received request to respond", self._agent_id) + logger.info(f"Agent {self._agent_id}: Received request to respond") # Store the original request message for potential continuation after human input self._current_request_message = message @@ -1833,7 +1832,7 @@ async def handle_request_message( await context.send_message(_MagenticResponseMessage(body=agent_response)) except Exception as e: - logger.warning("Agent %s invoke failed: %s", self._agent_id, e) + logger.warning(f"Agent {self._agent_id} invoke failed: {e}") # Fallback response response = ChatMessage( role=Role.ASSISTANT, @@ -1845,7 +1844,7 @@ async def handle_request_message( def reset(self) -> None: """Reset the internal chat history of the agent (internal operation).""" - logger.debug("Agent %s: Resetting chat history", self._agent_id) + logger.debug(f"Agent {self._agent_id}: Resetting chat history") self._chat_history.clear() self._pending_human_input_request = None self._pending_tool_request = None @@ -1872,10 +1871,8 @@ async def handle_tool_approval_response( """ response_text = response.response_text or response.comments or "" logger.info( - "Agent %s: Received tool approval for request %s: %s", - original_request.agent_id, - original_request.request_id, - response_text[:50] if response_text else "", + f"Agent {original_request.agent_id}: Received tool approval for request " + f"{original_request.request_id}: {response_text[:50] if response_text else ''}" ) # Get the pending tool request to extract call_id @@ -2440,7 +2437,7 @@ def build(self) -> Workflow: if self._manager is None: raise ValueError("No manager configured. Call with_standard_manager(...) before build().") - logger.info("Building Magentic workflow with %d participants", len(self._participants)) + logger.info(f"Building Magentic workflow with {len(self._participants)} participants") # Create participant descriptions participant_descriptions: dict[str, str] = {} From b118b0a671b6d434a9bd8a832e479cce24858cd0 Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Wed, 3 Dec 2025 12:49:26 +0900 Subject: [PATCH 6/6] updates --- .../agent_framework/_workflows/_magentic.py | 127 +++++++++++++----- .../core/tests/workflow/test_magentic.py | 14 +- 2 files changed, 101 insertions(+), 40 deletions(-) diff --git a/python/packages/core/agent_framework/_workflows/_magentic.py b/python/packages/core/agent_framework/_workflows/_magentic.py index 8cfe1c2c27..d0df28dcde 100644 --- a/python/packages/core/agent_framework/_workflows/_magentic.py +++ b/python/packages/core/agent_framework/_workflows/_magentic.py @@ -1860,19 +1860,22 @@ async def handle_tool_approval_response( """Handle human response for tool approval and continue agent execution. When a human provides input in response to a tool approval request, - this handler processes the response, creates a FunctionResultContent with - the human's answer, adds it to the conversation, and re-invokes the agent - to continue execution. + this handler processes the response based on the decision type: + + - APPROVE: Execute the tool call with the provided response text + - REJECT: Do not execute the tool, inform the agent of rejection + - GUIDANCE: Execute the tool call with the guidance text as input Args: original_request: The original human intervention request - response: The human's response + response: The human's response containing the decision and any text context: The workflow context """ response_text = response.response_text or response.comments or "" + decision = response.decision logger.info( - f"Agent {original_request.agent_id}: Received tool approval for request " - f"{original_request.request_id}: {response_text[:50] if response_text else ''}" + f"Agent {original_request.agent_id}: Received tool approval response " + f"(decision={decision.value}): {response_text[:50] if response_text else ''}" ) # Get the pending tool request to extract call_id @@ -1880,6 +1883,40 @@ async def handle_tool_approval_response( self._pending_human_input_request = None self._pending_tool_request = None + # Handle REJECT decision - do not execute the tool call + if decision == MagenticHumanInterventionDecision.REJECT: + rejection_reason = response_text or "Tool call rejected by human" + logger.info(f"Agent {self._agent_id}: Tool call rejected: {rejection_reason}") + + if pending_tool_request is not None: + # Create a FunctionResultContent indicating rejection + function_result = FunctionResultContent( + call_id=pending_tool_request.function_call.call_id, + result=f"Tool call was rejected by human reviewer. Reason: {rejection_reason}", + ) + result_msg = ChatMessage( + role=Role.USER, + contents=[function_result], + ) + self._chat_history.append(result_msg) + else: + # Fallback without pending tool request + rejection_msg = ChatMessage( + role=Role.USER, + text=f"Tool call '{original_request.prompt}' was rejected: {rejection_reason}", + author_name="human", + ) + self._chat_history.append(rejection_msg) + + # Re-invoke the agent so it can adapt to the rejection + agent_response = await self._invoke_agent(context) + if agent_response is None: + return + self._chat_history.append(agent_response) + await context.send_message(_MagenticResponseMessage(body=agent_response)) + return + + # Handle APPROVE and GUIDANCE decisions - execute the tool call if pending_tool_request is not None: # Create a FunctionResultContent with the human's response function_result = FunctionResultContent( @@ -1953,6 +1990,15 @@ async def _invoke_agent( ) -> ChatMessage | None: """Invoke the wrapped agent and return a response. + This method streams the agent's response updates, collects them into an + AgentRunResponse, and handles any human input requests (tool approvals). + + Note: + If multiple user input requests are present in the agent's response, + only the first one is processed. A warning is logged and subsequent + requests are ignored. This is a current limitation of the single-request + pending state architecture. + Returns: ChatMessage with the agent's response, or None if waiting for human input. """ @@ -1967,34 +2013,45 @@ async def _invoke_agent( run_result: AgentRunResponse = AgentRunResponse.from_agent_run_response_updates(updates) - # Handle human input requests (tool approval) - always surface these events + # Handle human input requests (tool approval) - process one at a time if run_result.user_input_requests: - for user_input_request in run_result.user_input_requests: - # Build a prompt from the request - prompt = "Human input required" - context_text = None - - # Extract information from the request to build a useful prompt - if hasattr(user_input_request, "function_call"): - fn_call = user_input_request.function_call - prompt = f"Approve function call: {fn_call.name}" - if fn_call.arguments: - context_text = f"Arguments: {fn_call.arguments}" - - # Store the original FunctionApprovalRequestContent for later use - self._pending_tool_request = user_input_request - - # Create and send the human intervention request for tool approval - request = _MagenticHumanInterventionRequest( - kind=MagenticHumanInterventionKind.TOOL_APPROVAL, - agent_id=self._agent_id, - prompt=prompt, - context=context_text, - conversation_snapshot=list(self._chat_history[-5:]), + if len(run_result.user_input_requests) > 1: + logger.warning( + f"Agent {self._agent_id}: Multiple user input requests received " + f"({len(run_result.user_input_requests)}), processing only the first one" ) - self._pending_human_input_request = request - await ctx.request_info(request, _MagenticHumanInterventionReply) - return None # Signal that we're waiting for human input + + user_input_request = run_result.user_input_requests[0] + + # Build a prompt from the request based on its type + prompt: str + context_text: str | None = None + + if isinstance(user_input_request, FunctionApprovalRequestContent): + fn_call = user_input_request.function_call + prompt = f"Approve function call: {fn_call.name}" + if fn_call.arguments: + context_text = f"Arguments: {fn_call.arguments}" + else: + # Fallback for unknown request types + request_type = type(user_input_request).__name__ + prompt = f"Agent {self._agent_id} requires human input ({request_type})" + logger.warning(f"Agent {self._agent_id}: Unrecognized user input request type: {request_type}") + + # Store the original FunctionApprovalRequestContent for later use + self._pending_tool_request = user_input_request + + # Create and send the human intervention request for tool approval + request = _MagenticHumanInterventionRequest( + kind=MagenticHumanInterventionKind.TOOL_APPROVAL, + agent_id=self._agent_id, + prompt=prompt, + context=context_text, + conversation_snapshot=list(self._chat_history[-5:]), + ) + self._pending_human_input_request = request + await ctx.request_info(request, _MagenticHumanInterventionReply) + return None # Signal that we're waiting for human input messages: list[ChatMessage] | None = None with contextlib.suppress(Exception): @@ -2110,7 +2167,7 @@ def participants(self, **participants: AgentProtocol | Executor) -> Self: .participants( researcher=research_agent, writer=writing_agent, coder=coding_agent, reviewer=review_agent ) - .with_standard_manager(chat_client=client) + .with_standard_manager(agent=manager_agent) .build() ) @@ -2205,7 +2262,7 @@ def with_human_input_on_stall(self, enable: bool = True) -> "MagenticBuilder": workflow = ( MagenticBuilder() .participants(agent1=agent1) - .with_standard_manager(chat_client=client, max_stall_count=3) + .with_standard_manager(agent=manager_agent, max_stall_count=3) .with_human_input_on_stall(enable=True) .build() ) @@ -2257,7 +2314,7 @@ def with_checkpointing(self, checkpoint_storage: CheckpointStorage) -> "Magentic workflow = ( MagenticBuilder() .participants(agent1=agent1) - .with_standard_manager(chat_client=client) + .with_standard_manager(agent=manager_agent) .with_checkpointing(storage) .build() ) diff --git a/python/packages/core/tests/workflow/test_magentic.py b/python/packages/core/tests/workflow/test_magentic.py index b09baa786f..e9f5dcf70d 100644 --- a/python/packages/core/tests/workflow/test_magentic.py +++ b/python/packages/core/tests/workflow/test_magentic.py @@ -435,15 +435,19 @@ class _StubManagerAgent(BaseAgent): async def run( self, - request: None, - context: WorkflowContext | None = None, + messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, + *, + thread: Any = None, + **kwargs: Any, ) -> AgentRunResponse: return AgentRunResponse(messages=[ChatMessage(role=Role.ASSISTANT, text="ok")]) - def run_streaming( + def run_stream( self, - request: None, - context: WorkflowContext | None = None, + messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, + *, + thread: Any = None, + **kwargs: Any, ) -> AsyncIterable[AgentRunResponseUpdate]: async def _gen() -> AsyncIterable[AgentRunResponseUpdate]: yield AgentRunResponseUpdate(message_deltas=[ChatMessage(role=Role.ASSISTANT, text="ok")])