diff --git a/python/packages/core/agent_framework/_workflows/__init__.py b/python/packages/core/agent_framework/_workflows/__init__.py index 990264df41..04623c87d9 100644 --- a/python/packages/core/agent_framework/_workflows/__init__.py +++ b/python/packages/core/agent_framework/_workflows/__init__.py @@ -74,10 +74,15 @@ ORCH_MSG_KIND_USER_TASK, MagenticBuilder, MagenticContext, + MagenticHumanInputRequest, + MagenticHumanInterventionDecision, + MagenticHumanInterventionKind, + MagenticHumanInterventionReply, + MagenticHumanInterventionRequest, MagenticManagerBase, - MagenticPlanReviewDecision, - MagenticPlanReviewReply, - MagenticPlanReviewRequest, + MagenticStallInterventionDecision, + MagenticStallInterventionReply, + MagenticStallInterventionRequest, StandardMagenticManager, ) from ._orchestration_state import OrchestrationState @@ -144,10 +149,15 @@ "InProcRunnerContext", "MagenticBuilder", "MagenticContext", + "MagenticHumanInputRequest", + "MagenticHumanInterventionDecision", + "MagenticHumanInterventionKind", + "MagenticHumanInterventionReply", + "MagenticHumanInterventionRequest", "MagenticManagerBase", - "MagenticPlanReviewDecision", - "MagenticPlanReviewReply", - "MagenticPlanReviewRequest", + "MagenticStallInterventionDecision", + "MagenticStallInterventionReply", + "MagenticStallInterventionRequest", "ManagerDirectiveModel", "ManagerSelectionRequest", "ManagerSelectionResponse", diff --git a/python/packages/core/agent_framework/_workflows/_magentic.py b/python/packages/core/agent_framework/_workflows/_magentic.py index 14fc98e990..d91cf2a3b8 100644 --- a/python/packages/core/agent_framework/_workflows/_magentic.py +++ b/python/packages/core/agent_framework/_workflows/_magentic.py @@ -17,8 +17,9 @@ AgentProtocol, AgentRunResponse, AgentRunResponseUpdate, - ChatClientProtocol, ChatMessage, + FunctionApprovalRequestContent, + FunctionResultContent, Role, ) @@ -373,29 +374,101 @@ def from_dict(cls, value: dict[str, Any]) -> "_MagenticResponseMessage": return cls(body=body, target_agent=target_agent, broadcast=broadcast) +# region Human Intervention Types + + +class MagenticHumanInterventionKind(str, Enum): + """The kind of human intervention being requested.""" + + PLAN_REVIEW = "plan_review" # Review and approve/revise the initial plan + TOOL_APPROVAL = "tool_approval" # Approve a tool/function call + STALL = "stall" # Workflow has stalled and needs guidance + + +class MagenticHumanInterventionDecision(str, Enum): + """Decision options for human intervention responses.""" + + APPROVE = "approve" # Approve (plan review, tool approval) + REVISE = "revise" # Request revision with feedback (plan review) + REJECT = "reject" # Reject/deny (tool approval) + CONTINUE = "continue" # Continue with current state (stall) + REPLAN = "replan" # Trigger replanning (stall) + GUIDANCE = "guidance" # Provide guidance text (stall, tool approval) + + @dataclass -class _MagenticPlanReviewRequest: - """Internal: Human-in-the-loop request to review and optionally edit the plan before execution.""" +class _MagenticHumanInterventionRequest: + """Unified request for human intervention in a Magentic workflow. + + This request is emitted when the workflow needs human input. The `kind` field + indicates what type of intervention is needed, and the relevant fields are + populated based on the kind. + + Attributes: + request_id: Unique identifier for correlating responses + kind: The type of intervention needed (plan_review, tool_approval, stall) + + # Plan review fields + task_text: The task description (plan_review) + facts_text: Extracted facts from the task (plan_review) + plan_text: The proposed or current plan (plan_review, stall) + round_index: Number of review rounds so far (plan_review) + + # Tool approval fields + agent_id: The agent requesting input (tool_approval) + prompt: Description of what input is needed (tool_approval) + context: Additional context (tool_approval) + conversation_snapshot: Recent conversation history (tool_approval) + + # Stall intervention fields + stall_count: Number of consecutive stall rounds (stall) + max_stall_count: Threshold that triggered intervention (stall) + stall_reason: Description of why progress stalled (stall) + last_agent: Last active agent (stall) + """ request_id: str = field(default_factory=lambda: str(uuid4())) + kind: MagenticHumanInterventionKind = MagenticHumanInterventionKind.PLAN_REVIEW + + # Plan review fields task_text: str = "" facts_text: str = "" plan_text: str = "" - round_index: int = 0 # number of review rounds so far + round_index: int = 0 + # Tool approval fields + agent_id: str = "" + prompt: str = "" + context: str | None = None + conversation_snapshot: list[ChatMessage] = field(default_factory=list) # type: ignore -class MagenticPlanReviewDecision(str, Enum): - APPROVE = "approve" - REVISE = "revise" + # Stall intervention fields + stall_count: int = 0 + max_stall_count: int = 3 + stall_reason: str = "" + last_agent: str = "" @dataclass -class _MagenticPlanReviewReply: - """Internal: Human reply to a plan review request.""" +class _MagenticHumanInterventionReply: + """Unified reply to a human intervention request. + + The relevant fields depend on the original request kind and the decision made. + + Attributes: + decision: The human's decision (approve, revise, continue, replan, guidance) + edited_plan_text: New plan text if directly editing (plan_review with approve/revise) + comments: Feedback for revision or guidance text (plan_review, stall with guidance) + response_text: Free-form response text (tool_approval) + """ - decision: MagenticPlanReviewDecision - edited_plan_text: str | None = None # if supplied, becomes the new plan text verbatim - comments: str | None = None # guidance for replan if no edited text provided + decision: MagenticHumanInterventionDecision + edited_plan_text: str | None = None + comments: str | None = None + response_text: str | None = None + + +# endregion Human Intervention Types @dataclass @@ -650,10 +723,9 @@ class StandardMagenticManager(MagenticManagerBase): def __init__( self, - chat_client: ChatClientProtocol, + agent: AgentProtocol, task_ledger: _MagenticTaskLedger | None = None, *, - instructions: str | None = None, task_ledger_facts_prompt: str | None = None, task_ledger_plan_prompt: str | None = None, task_ledger_full_prompt: str | None = None, @@ -669,11 +741,11 @@ def __init__( """Initialize the Standard Magentic Manager. Args: - chat_client: The chat client to use for LLM calls. - instructions: Instructions for the orchestrator agent. + agent: An agent instance to use for LLM calls. The agent's configured + options (temperature, seed, instructions, etc.) will be applied. + task_ledger: Optional task ledger for managing task state. Keyword Args: - task_ledger: Optional task ledger for managing task state. task_ledger_facts_prompt: Optional prompt for the task ledger facts. task_ledger_plan_prompt: Optional prompt for the task ledger plan. task_ledger_full_prompt: Optional prompt for the full task ledger. @@ -692,8 +764,7 @@ def __init__( max_round_count=max_round_count, ) - self.chat_client: ChatClientProtocol = chat_client - self.instructions: str | None = instructions + self._agent: AgentProtocol = agent self.task_ledger: _MagenticTaskLedger | None = task_ledger # Prompts may be overridden if needed @@ -717,34 +788,20 @@ async def _complete( self, messages: list[ChatMessage], ) -> ChatMessage: - """Call the underlying ChatClientProtocol directly and return the last assistant message. + """Call the underlying agent and return the last assistant message. - If manager instructions are provided, they are injected as a SYSTEM message - at the start of the request to guide the model consistently without needing - an intermediate Agent wrapper. + The agent's run method is called which applies the agent's configured options + (temperature, seed, instructions, etc.). """ - # Prepend system instructions if present - request_messages: list[ChatMessage] = [] - if self.instructions: - request_messages.append(ChatMessage(role=Role.SYSTEM, text=self.instructions)) - request_messages.extend(messages) - - # Invoke the chat client non-streaming API - response = await self.chat_client.get_response(request_messages) - try: - out_messages: list[ChatMessage] | None = list(response.messages) # type: ignore[assignment] - except Exception: - out_messages = None - + response: AgentRunResponse = await self._agent.run(messages) + out_messages = response.messages if response else None if out_messages: last = out_messages[-1] return ChatMessage( - role=last.role or Role.ASSISTANT, - text=last.text or "", + role=last.role, + text=last.text, author_name=last.author_name or MAGENTIC_MANAGER_NAME, ) - - # Fallback if no messages return ChatMessage(role=Role.ASSISTANT, text="No output produced.", author_name=MAGENTIC_MANAGER_NAME) async def plan(self, magentic_context: MagenticContext) -> ChatMessage: @@ -918,6 +975,7 @@ class MagenticOrchestratorExecutor(BaseGroupChatOrchestrator): _plan_review_round: int _max_plan_review_rounds: int _terminated: bool + _enable_stall_intervention: bool def __init__( self, @@ -926,6 +984,7 @@ def __init__( *, require_plan_signoff: bool = False, max_plan_review_rounds: int = 10, + enable_stall_intervention: bool = False, executor_id: str | None = None, ) -> None: """Initializes a new instance of the MagenticOrchestratorExecutor. @@ -935,6 +994,7 @@ def __init__( participants: A dictionary of participant IDs to their names. require_plan_signoff: Whether to require plan sign-off from a human. max_plan_review_rounds: The maximum number of plan review rounds. + enable_stall_intervention: Whether to request human input on stalls instead of auto-replan. executor_id: An optional executor ID. """ super().__init__(executor_id or f"magentic_orchestrator_{uuid4().hex[:8]}") @@ -945,6 +1005,7 @@ def __init__( self._require_plan_signoff = require_plan_signoff self._plan_review_round = 0 self._max_plan_review_rounds = max_plan_review_rounds + self._enable_stall_intervention = enable_stall_intervention # Registry of agent executors for internal coordination (e.g., resets) self._agent_executors = {} # Terminal state marker to stop further processing after completion/limits @@ -1020,7 +1081,7 @@ async def on_checkpoint_save(self) -> dict[str, Any]: try: state["manager_state"] = self._manager.on_checkpoint_save() except Exception as exc: - logger.warning("Failed to save manager state for checkpoint: %s\nSkipping...", exc) + logger.warning(f"Failed to save manager state for checkpoint: {exc}\nSkipping...") return state @@ -1050,14 +1111,14 @@ async def on_checkpoint_restore(self, state: dict[str, Any]) -> None: else: self._context = None except Exception as exc: # pragma: no cover - defensive - logger.warning("Failed to restore magentic context: %s", exc) + logger.warning(f"Failed to restore magentic context: {exc}") self._context = None ledger_payload = state.get("task_ledger") if ledger_payload is not None: try: self._task_ledger = _message_from_payload(ledger_payload) except Exception as exc: # pragma: no cover - logger.warning("Failed to restore task ledger message: %s", exc) + logger.warning(f"Failed to restore task ledger message: {exc}") self._task_ledger = None if "plan_review_round" in state: @@ -1077,7 +1138,7 @@ async def on_checkpoint_restore(self, state: dict[str, Any]) -> None: try: self._manager.on_checkpoint_restore(manager_state) except Exception as exc: # pragma: no cover - logger.warning("Failed to restore manager state: %s", exc) + logger.warning(f"Failed to restore manager state: {exc}") self._reconcile_restored_participants() @@ -1110,7 +1171,7 @@ async def handle_start_message( self, message: _MagenticStartMessage, context: WorkflowContext[ - _MagenticResponseMessage | _MagenticRequestMessage | _MagenticPlanReviewRequest, list[ChatMessage] + _MagenticResponseMessage | _MagenticRequestMessage | _MagenticHumanInterventionRequest, list[ChatMessage] ], ) -> None: """Handle the initial start message to begin orchestration.""" @@ -1155,7 +1216,7 @@ async def handle_task_text( self, task_text: str, context: WorkflowContext[ - _MagenticResponseMessage | _MagenticRequestMessage | _MagenticPlanReviewRequest, list[ChatMessage] + _MagenticResponseMessage | _MagenticRequestMessage | _MagenticHumanInterventionRequest, list[ChatMessage] ], ) -> None: await self.handle_start_message(_MagenticStartMessage.from_string(task_text), context) @@ -1165,7 +1226,7 @@ async def handle_task_message( self, task_message: ChatMessage, context: WorkflowContext[ - _MagenticResponseMessage | _MagenticRequestMessage | _MagenticPlanReviewRequest, list[ChatMessage] + _MagenticResponseMessage | _MagenticRequestMessage | _MagenticHumanInterventionRequest, list[ChatMessage] ], ) -> None: await self.handle_start_message(_MagenticStartMessage(task_message), context) @@ -1175,7 +1236,7 @@ async def handle_task_messages( self, conversation: list[ChatMessage], context: WorkflowContext[ - _MagenticResponseMessage | _MagenticRequestMessage | _MagenticPlanReviewRequest, list[ChatMessage] + _MagenticResponseMessage | _MagenticRequestMessage | _MagenticHumanInterventionRequest, list[ChatMessage] ], ) -> None: await self.handle_start_message(_MagenticStartMessage(conversation), context) @@ -1210,22 +1271,45 @@ async def handle_response_message( await self._run_inner_loop(context) @response_handler - async def handle_plan_review_response( + async def handle_human_intervention_response( self, - original_request: _MagenticPlanReviewRequest, - response: _MagenticPlanReviewReply, + original_request: _MagenticHumanInterventionRequest, + response: _MagenticHumanInterventionReply, context: WorkflowContext[ - # may broadcast ledger next, or ask for another round of review - _MagenticResponseMessage | _MagenticRequestMessage | _MagenticPlanReviewRequest, list[ChatMessage] + _MagenticResponseMessage | _MagenticRequestMessage | _MagenticHumanInterventionRequest, list[ChatMessage] ], ) -> None: + """Handle unified human intervention responses. + + Routes the response to the appropriate handler based on the original request kind. + """ if getattr(self, "_terminated", False): return if self._context is None: return - if response.decision == MagenticPlanReviewDecision.APPROVE: + if original_request.kind == MagenticHumanInterventionKind.PLAN_REVIEW: + await self._handle_plan_review_response(original_request, response, context) + elif original_request.kind == MagenticHumanInterventionKind.STALL: + await self._handle_stall_intervention_response(original_request, response, context) + # TOOL_APPROVAL is handled by MagenticAgentExecutor, not the orchestrator + + async def _handle_plan_review_response( + self, + original_request: _MagenticHumanInterventionRequest, + response: _MagenticHumanInterventionReply, + context: WorkflowContext[ + _MagenticResponseMessage | _MagenticRequestMessage | _MagenticHumanInterventionRequest, list[ChatMessage] + ], + ) -> None: + """Handle plan review response.""" + if self._context is None: + return + + is_approve = response.decision == MagenticHumanInterventionDecision.APPROVE + + if is_approve: # Close the review loop on approval (no further plan review requests this run) self._require_plan_signoff = False # If the user supplied an edited plan, adopt it @@ -1246,13 +1330,11 @@ async def handle_plan_review_response( text=combined, author_name=MAGENTIC_MANAGER_NAME, ) - # If approved with comments but no edited text, apply comments via replan and proceed (no extra review) + # If approved with comments but no edited text, apply comments via replan and proceed elif response.comments: - # Record the human feedback for grounding self._context.chat_history.append( ChatMessage(role=Role.USER, text=f"Human plan feedback: {response.comments}") ) - # Ask the manager to replan based on comments; proceed immediately self._task_ledger = await self._manager.replan(self._context.clone(deep=True)) # Record the signed-off plan (no broadcast) @@ -1272,9 +1354,7 @@ async def handle_plan_review_response( self._plan_review_round += 1 if self._plan_review_round > self._max_plan_review_rounds: logger.warning("Magentic Orchestrator: Max plan review rounds reached. Proceeding with current plan.") - # Stop any further plan review requests for the rest of this run self._require_plan_signoff = False - # Add a clear note to the conversation so users know review is closed notice = ChatMessage( role=Role.ASSISTANT, text=( @@ -1287,7 +1367,6 @@ async def handle_plan_review_response( await self._emit_orchestrator_message(context, notice, ORCH_MSG_KIND_NOTICE) if self._task_ledger: self._context.chat_history.append(self._task_ledger) - # No further review requests; proceed directly into coordination ctx2 = cast( WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, list[ChatMessage]], context, @@ -1295,12 +1374,11 @@ async def handle_plan_review_response( await self._run_inner_loop(ctx2) return - # If the user provided an edited plan, adopt it directly and ask them to confirm once more + # If the user provided an edited plan, adopt it and ask for confirmation if response.edited_plan_text: mgr_ledger2 = getattr(self._manager, "task_ledger", None) if mgr_ledger2 is not None: mgr_ledger2.plan.text = response.edited_plan_text - # Rebuild combined message for preview in the next review request team_text = _team_block(self._participants) combined = self._manager.task_ledger_full_prompt.format( task=self._context.task.text, @@ -1312,16 +1390,67 @@ async def handle_plan_review_response( await self._send_plan_review_request(cast(WorkflowContext, context)) return - # Else pass comments into the chat history and replan with the manager + # Else pass comments into the chat history and replan if response.comments: self._context.chat_history.append( ChatMessage(role=Role.USER, text=f"Human plan feedback: {response.comments}") ) - # Ask the manager to replan; this only adjusts the plan stage, not a full reset self._task_ledger = await self._manager.replan(self._context.clone(deep=True)) await self._send_plan_review_request(cast(WorkflowContext, context)) + async def _handle_stall_intervention_response( + self, + original_request: _MagenticHumanInterventionRequest, + response: _MagenticHumanInterventionReply, + context: WorkflowContext[ + _MagenticResponseMessage | _MagenticRequestMessage | _MagenticHumanInterventionRequest, list[ChatMessage] + ], + ) -> None: + """Handle stall intervention response.""" + if self._context is None: + return + + ctx = self._context + logger.info( + f"Stall intervention response: decision={response.decision.value}, " + f"stall_count was {original_request.stall_count}" + ) + + if response.decision == MagenticHumanInterventionDecision.CONTINUE: + ctx.stall_count = 0 + ctx2 = cast( + WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, list[ChatMessage]], + context, + ) + await self._run_inner_loop(ctx2) + return + + if response.decision == MagenticHumanInterventionDecision.REPLAN: + ctx2 = cast( + WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, list[ChatMessage]], + context, + ) + await self._reset_and_replan(ctx2) + return + + if response.decision == MagenticHumanInterventionDecision.GUIDANCE: + ctx.stall_count = 0 + guidance = response.comments or response.response_text + if guidance: + guidance_msg = ChatMessage( + role=Role.USER, + text=f"Human guidance to help with stall: {guidance}", + ) + ctx.chat_history.append(guidance_msg) + await self._emit_orchestrator_message(context, guidance_msg, ORCH_MSG_KIND_NOTICE) + ctx2 = cast( + WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, list[ChatMessage]], + context, + ) + await self._run_inner_loop(ctx2) + return + async def _run_outer_loop( self, context: WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, list[ChatMessage]], @@ -1370,20 +1499,19 @@ async def _run_inner_loop_helper( return ctx.round_count += 1 - logger.info("Magentic Orchestrator: Inner loop - round %s", ctx.round_count) + logger.info(f"Magentic Orchestrator: Inner loop - round {ctx.round_count}") # Create progress ledger using the manager try: current_progress_ledger = await self._manager.create_progress_ledger(ctx.clone(deep=True)) except Exception as ex: - logger.warning("Magentic Orchestrator: Progress ledger creation failed, triggering reset: %s", ex) + logger.warning(f"Magentic Orchestrator: Progress ledger creation failed, triggering reset: {ex}") await self._reset_and_replan(context) return logger.debug( - "Progress evaluation: satisfied=%s, next=%s", - current_progress_ledger.is_request_satisfied.answer, - current_progress_ledger.next_speaker.answer, + f"Progress evaluation: satisfied={current_progress_ledger.is_request_satisfied.answer}, " + f"next={current_progress_ledger.next_speaker.answer}" ) # Check for task completion @@ -1399,7 +1527,34 @@ async def _run_inner_loop_helper( ctx.stall_count = max(0, ctx.stall_count - 1) if ctx.stall_count > self._manager.max_stall_count: - logger.info("Magentic Orchestrator: Stalling detected. Resetting and replanning") + logger.info(f"Magentic Orchestrator: Stalling detected after {ctx.stall_count} rounds") + if self._enable_stall_intervention: + # Request human intervention instead of auto-replan + is_progress = current_progress_ledger.is_progress_being_made.answer + is_loop = current_progress_ledger.is_in_loop.answer + stall_reason = "No progress being made" if not is_progress else "" + if is_loop: + loop_msg = "Agents appear to be in a loop" + stall_reason = f"{stall_reason}; {loop_msg}" if stall_reason else loop_msg + next_speaker_val = current_progress_ledger.next_speaker.answer + last_agent = next_speaker_val if isinstance(next_speaker_val, str) else "" + # Get facts and plan from manager's task ledger + mgr_ledger = getattr(self._manager, "task_ledger", None) + facts_text = mgr_ledger.facts.text if mgr_ledger else "" + plan_text = mgr_ledger.plan.text if mgr_ledger else "" + request = _MagenticHumanInterventionRequest( + kind=MagenticHumanInterventionKind.STALL, + stall_count=ctx.stall_count, + max_stall_count=self._manager.max_stall_count, + task_text=ctx.task.text if ctx.task else "", + facts_text=facts_text, + plan_text=plan_text, + last_agent=last_agent, + stall_reason=stall_reason, + ) + await context.request_info(request, _MagenticHumanInterventionReply) + return + # Default behavior: auto-replan await self._reset_and_replan(context) return @@ -1413,7 +1568,7 @@ async def _run_inner_loop_helper( instruction = current_progress_ledger.instruction_or_question.answer if next_speaker_value not in self._participants: - logger.warning("Invalid next speaker: %s", next_speaker_value) + logger.warning(f"Invalid next speaker: {next_speaker_value}") await self._prepare_final_answer(context) return @@ -1430,7 +1585,7 @@ async def _run_inner_loop_helper( target_executor_id = f"agent_{next_speaker_value}" # Request specific agent to respond - logger.debug("Magentic Orchestrator: Requesting %s to respond", next_speaker_value) + logger.debug(f"Magentic Orchestrator: Requesting {next_speaker_value} to respond") await context.send_message( _MagenticRequestMessage( agent_name=next_speaker_value, @@ -1494,7 +1649,7 @@ async def _check_within_limits_or_complete( if hit_round_limit or hit_reset_limit: limit_type = "round" if hit_round_limit else "reset" - logger.error("Magentic Orchestrator: Max %s count reached", limit_type) + logger.error(f"Magentic Orchestrator: Max {limit_type} count reached") # Only emit completion once and then mark terminated if not self._terminated: @@ -1515,7 +1670,7 @@ async def _check_within_limits_or_complete( return True async def _send_plan_review_request(self, context: WorkflowContext) -> None: - """Send a PlanReviewRequest.""" + """Send a human intervention request for plan review.""" # If plan sign-off is disabled (e.g., ran out of review rounds), do nothing if not self._require_plan_signoff: return @@ -1524,13 +1679,14 @@ async def _send_plan_review_request(self, context: WorkflowContext) -> None: plan_text = ledger.plan.text if ledger else "" task_text = self._context.task.text if self._context else "" - req = _MagenticPlanReviewRequest( + req = _MagenticHumanInterventionRequest( + kind=MagenticHumanInterventionKind.PLAN_REVIEW, task_text=task_text, facts_text=facts_text, plan_text=plan_text, round_index=self._plan_review_round, ) - await context.request_info(req, _MagenticPlanReviewReply) + await context.request_info(req, _MagenticHumanInterventionReply) # region Magentic Executors @@ -1543,6 +1699,7 @@ class MagenticAgentExecutor(Executor): - Receiving task ledger broadcasts - Responding to specific agent requests - Resetting agent state when needed + - Surfacing tool approval requests (user_input_requests) as HITL events """ def __init__( @@ -1554,6 +1711,9 @@ def __init__( self._agent = agent self._agent_id = agent_id self._chat_history: list[ChatMessage] = [] + self._pending_human_input_request: _MagenticHumanInterventionRequest | None = None + self._pending_tool_request: FunctionApprovalRequestContent | None = None + self._current_request_message: _MagenticRequestMessage | None = None @override async def on_checkpoint_save(self) -> dict[str, Any]: @@ -1582,7 +1742,7 @@ async def on_checkpoint_restore(self, state: dict[str, Any]) -> None: try: self._chat_history = decode_chat_messages(history_payload) except Exception as exc: # pragma: no cover - logger.warning("Agent %s: Failed to restore chat history: %s", self._agent_id, exc) + logger.warning(f"Agent {self._agent_id}: Failed to restore chat history: {exc}") self._chat_history = [] else: self._chat_history = [] @@ -1592,12 +1752,12 @@ async def handle_response_message( self, message: _MagenticResponseMessage, context: WorkflowContext[_MagenticResponseMessage] ) -> None: """Handle response message (task ledger broadcast).""" - logger.debug("Agent %s: Received response message", self._agent_id) + logger.debug(f"Agent {self._agent_id}: Received response message") # Check if this message is intended for this agent if message.target_agent is not None and message.target_agent != self._agent_id and not message.broadcast: # Message is targeted to a different agent, ignore it - logger.debug("Agent %s: Ignoring message targeted to %s", self._agent_id, message.target_agent) + logger.debug(f"Agent {self._agent_id}: Ignoring message targeted to {message.target_agent}") return # Add transfer message if needed @@ -1632,7 +1792,10 @@ async def handle_request_message( if message.agent_name != self._agent_id: return - logger.info("Agent %s: Received request to respond", self._agent_id) + logger.info(f"Agent {self._agent_id}: Received request to respond") + + # Store the original request message for potential continuation after human input + self._current_request_message = message # Add persona adoption message with appropriate role persona_role = self._get_persona_adoption_role() @@ -1650,23 +1813,26 @@ async def handle_request_message( from agent_framework import BaseAgent as _AF_AgentBase # local import to avoid cycles if not isinstance(self._agent, _AF_AgentBase): - response = ChatMessage( + response: ChatMessage = ChatMessage( role=Role.ASSISTANT, text=f"{self._agent_id} is a workflow executor and cannot be invoked directly.", author_name=self._agent_id, ) self._chat_history.append(response) await self._emit_agent_message_event(context, response) + await context.send_message(_MagenticResponseMessage(body=response)) else: # Invoke the agent - response = await self._invoke_agent(context) - self._chat_history.append(response) - - # Send response back to orchestrator - await context.send_message(_MagenticResponseMessage(body=response)) + agent_response = await self._invoke_agent(context) + if agent_response is None: + # Agent is waiting for human input - don't send response yet + return + self._chat_history.append(agent_response) + # Send response back to orchestrator + await context.send_message(_MagenticResponseMessage(body=agent_response)) except Exception as e: - logger.warning("Agent %s invoke failed: %s", self._agent_id, e) + logger.warning(f"Agent {self._agent_id} invoke failed: {e}") # Fallback response response = ChatMessage( role=Role.ASSISTANT, @@ -1678,8 +1844,120 @@ async def handle_request_message( def reset(self) -> None: """Reset the internal chat history of the agent (internal operation).""" - logger.debug("Agent %s: Resetting chat history", self._agent_id) + logger.debug(f"Agent {self._agent_id}: Resetting chat history") self._chat_history.clear() + self._pending_human_input_request = None + self._pending_tool_request = None + self._current_request_message = None + + @response_handler + async def handle_tool_approval_response( + self, + original_request: _MagenticHumanInterventionRequest, + response: _MagenticHumanInterventionReply, + context: WorkflowContext[_MagenticResponseMessage, AgentRunResponse], + ) -> None: + """Handle human response for tool approval and continue agent execution. + + When a human provides input in response to a tool approval request, + this handler processes the response based on the decision type: + + - APPROVE: Execute the tool call with the provided response text + - REJECT: Do not execute the tool, inform the agent of rejection + - GUIDANCE: Execute the tool call with the guidance text as input + + Args: + original_request: The original human intervention request + response: The human's response containing the decision and any text + context: The workflow context + """ + response_text = response.response_text or response.comments or "" + decision = response.decision + logger.info( + f"Agent {original_request.agent_id}: Received tool approval response " + f"(decision={decision.value}): {response_text[:50] if response_text else ''}" + ) + + # Get the pending tool request to extract call_id + pending_tool_request = self._pending_tool_request + self._pending_human_input_request = None + self._pending_tool_request = None + + # Handle REJECT decision - do not execute the tool call + if decision == MagenticHumanInterventionDecision.REJECT: + rejection_reason = response_text or "Tool call rejected by human" + logger.info(f"Agent {self._agent_id}: Tool call rejected: {rejection_reason}") + + if pending_tool_request is not None: + # Create a FunctionResultContent indicating rejection + function_result = FunctionResultContent( + call_id=pending_tool_request.function_call.call_id, + result=f"Tool call was rejected by human reviewer. Reason: {rejection_reason}", + ) + result_msg = ChatMessage( + role=Role.USER, + contents=[function_result], + ) + self._chat_history.append(result_msg) + else: + # Fallback without pending tool request + rejection_msg = ChatMessage( + role=Role.USER, + text=f"Tool call '{original_request.prompt}' was rejected: {rejection_reason}", + author_name="human", + ) + self._chat_history.append(rejection_msg) + + # Re-invoke the agent so it can adapt to the rejection + agent_response = await self._invoke_agent(context) + if agent_response is None: + return + self._chat_history.append(agent_response) + await context.send_message(_MagenticResponseMessage(body=agent_response)) + return + + # Handle APPROVE and GUIDANCE decisions - execute the tool call + if pending_tool_request is not None: + # Create a FunctionResultContent with the human's response + function_result = FunctionResultContent( + call_id=pending_tool_request.function_call.call_id, + result=response_text, + ) + # Add the function result as a message to continue the conversation + result_msg = ChatMessage( + role=Role.USER, + contents=[function_result], + ) + self._chat_history.append(result_msg) + + # Re-invoke the agent to continue execution + agent_response = await self._invoke_agent(context) + if agent_response is None: + # Agent is waiting for more human input + return + self._chat_history.append(agent_response) + await context.send_message(_MagenticResponseMessage(body=agent_response)) + else: + # Fallback: no pending tool request, just add as text message + logger.warning( + f"Agent {original_request.agent_id}: No pending tool request found for response, " + "using fallback text handling", + ) + human_response_msg = ChatMessage( + role=Role.USER, + text=f"Human response to '{original_request.prompt}': {response_text}", + author_name="human", + ) + self._chat_history.append(human_response_msg) + + # Create a response message indicating human input was received + agent_response_msg = ChatMessage( + role=Role.ASSISTANT, + text=f"Received human input for: {original_request.prompt}. Continuing with the task.", + author_name=original_request.agent_id, + ) + self._chat_history.append(agent_response_msg) + await context.send_message(_MagenticResponseMessage(body=agent_response_msg)) async def _emit_agent_delta_event( self, @@ -1687,10 +1965,12 @@ async def _emit_agent_delta_event( update: AgentRunResponseUpdate, ) -> None: # Add metadata to identify this as an agent streaming update - if update.additional_properties is None: - update.additional_properties = {} - update.additional_properties["magentic_event_type"] = MAGENTIC_EVENT_TYPE_AGENT_DELTA - update.additional_properties["agent_id"] = self._agent_id + props = update.additional_properties + if props is None: + props = {} + update.additional_properties = props + props["magentic_event_type"] = MAGENTIC_EVENT_TYPE_AGENT_DELTA + props["agent_id"] = self._agent_id # Emit AgentRunUpdateEvent with the agent response update await ctx.add_event(AgentRunUpdateEvent(executor_id=self._agent_id, data=update)) @@ -1707,8 +1987,21 @@ async def _emit_agent_message_event( async def _invoke_agent( self, ctx: WorkflowContext[_MagenticResponseMessage, AgentRunResponse], - ) -> ChatMessage: - """Invoke the wrapped agent and return a response.""" + ) -> ChatMessage | None: + """Invoke the wrapped agent and return a response. + + This method streams the agent's response updates, collects them into an + AgentRunResponse, and handles any human input requests (tool approvals). + + Note: + If multiple user input requests are present in the agent's response, + only the first one is processed. A warning is logged and subsequent + requests are ignored. This is a current limitation of the single-request + pending state architecture. + + Returns: + ChatMessage with the agent's response, or None if waiting for human input. + """ logger.debug(f"Agent {self._agent_id}: Running with {len(self._chat_history)} messages") updates: list[AgentRunResponseUpdate] = [] @@ -1720,6 +2013,46 @@ async def _invoke_agent( run_result: AgentRunResponse = AgentRunResponse.from_agent_run_response_updates(updates) + # Handle human input requests (tool approval) - process one at a time + if run_result.user_input_requests: + if len(run_result.user_input_requests) > 1: + logger.warning( + f"Agent {self._agent_id}: Multiple user input requests received " + f"({len(run_result.user_input_requests)}), processing only the first one" + ) + + user_input_request = run_result.user_input_requests[0] + + # Build a prompt from the request based on its type + prompt: str + context_text: str | None = None + + if isinstance(user_input_request, FunctionApprovalRequestContent): + fn_call = user_input_request.function_call + prompt = f"Approve function call: {fn_call.name}" + if fn_call.arguments: + context_text = f"Arguments: {fn_call.arguments}" + else: + # Fallback for unknown request types + request_type = type(user_input_request).__name__ + prompt = f"Agent {self._agent_id} requires human input ({request_type})" + logger.warning(f"Agent {self._agent_id}: Unrecognized user input request type: {request_type}") + + # Store the original FunctionApprovalRequestContent for later use + self._pending_tool_request = user_input_request + + # Create and send the human intervention request for tool approval + request = _MagenticHumanInterventionRequest( + kind=MagenticHumanInterventionKind.TOOL_APPROVAL, + agent_id=self._agent_id, + prompt=prompt, + context=context_text, + conversation_snapshot=list(self._chat_history[-5:]), + ) + self._pending_human_input_request = request + await ctx.request_info(request, _MagenticHumanInterventionReply) + return None # Signal that we're waiting for human input + messages: list[ChatMessage] | None = None with contextlib.suppress(Exception): messages = list(run_result.messages) # type: ignore[assignment] @@ -1807,6 +2140,7 @@ def __init__(self) -> None: self._manager: MagenticManagerBase | None = None self._enable_plan_review: bool = False self._checkpoint_storage: CheckpointStorage | None = None + self._enable_stall_intervention: bool = False def participants(self, **participants: AgentProtocol | Executor) -> Self: """Add participant agents or executors to the Magentic workflow. @@ -1833,7 +2167,7 @@ def participants(self, **participants: AgentProtocol | Executor) -> Self: .participants( researcher=research_agent, writer=writing_agent, coder=coding_agent, reviewer=review_agent ) - .with_standard_manager(chat_client=client) + .with_standard_manager(agent=manager_agent) .build() ) @@ -1849,9 +2183,9 @@ def with_plan_review(self, enable: bool = True) -> "MagenticBuilder": """Enable or disable human-in-the-loop plan review before task execution. When enabled, the workflow will pause after the manager generates the initial - plan and emit a _MagenticPlanReviewRequest event. A human reviewer can then - approve, request revisions, or reject the plan. The workflow continues only - after approval. + plan and emit a MagenticHumanInterventionRequest event with kind=PLAN_REVIEW. + A human reviewer can then approve, request revisions, or reject the plan. + The workflow continues only after approval. This is useful for: - High-stakes tasks requiring human oversight @@ -1872,26 +2206,90 @@ def with_plan_review(self, enable: bool = True) -> "MagenticBuilder": workflow = ( MagenticBuilder() .participants(agent1=agent1) - .with_standard_manager(chat_client=client) + .with_standard_manager(agent=manager_agent) .with_plan_review(enable=True) .build() ) # During execution, handle plan review async for event in workflow.run_stream("task"): - if isinstance(event, _MagenticPlanReviewRequest): - # Review plan and respond - reply = _MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.APPROVE) - await workflow.send(reply) + if isinstance(event, RequestInfoEvent): + request = event.data + if isinstance(request, MagenticHumanInterventionRequest): + if request.kind == MagenticHumanInterventionKind.PLAN_REVIEW: + # Review plan and respond + reply = MagenticHumanInterventionReply(decision=MagenticHumanInterventionDecision.APPROVE) + await workflow.send_responses({event.request_id: reply}) See Also: - - :class:`_MagenticPlanReviewRequest`: Event emitted for review - - :class:`_MagenticPlanReviewReply`: Response to send back - - :class:`MagenticPlanReviewDecision`: Approve/Revise/Reject options + - :class:`MagenticHumanInterventionRequest`: Event emitted for review + - :class:`MagenticHumanInterventionReply`: Response to send back + - :class:`MagenticHumanInterventionDecision`: APPROVE/REVISE options """ self._enable_plan_review = enable return self + def with_human_input_on_stall(self, enable: bool = True) -> "MagenticBuilder": + """Enable human intervention when the workflow detects a stall. + + When enabled, instead of automatically replanning when the workflow detects + that agents are not making progress or are stuck in a loop, the workflow will + pause and emit a MagenticStallInterventionRequest event. A human can then + decide to continue, trigger replanning, or provide guidance. + + This is useful for: + - Workflows where automatic replanning may not resolve the issue + - Scenarios requiring human judgment about workflow direction + - Debugging stuck workflows with human insight + - Complex tasks where human guidance can help agents get back on track + + When stall detection triggers (based on max_stall_count), instead of calling + _reset_and_replan automatically, the workflow will: + 1. Emit a MagenticHumanInterventionRequest with kind=STALL + 2. Wait for human response via send_responses_streaming + 3. Take action based on the human's decision (continue, replan, or guidance) + + Args: + enable: Whether to enable stall intervention (default True) + + Returns: + Self for method chaining + + Usage: + + .. code-block:: python + + workflow = ( + MagenticBuilder() + .participants(agent1=agent1) + .with_standard_manager(agent=manager_agent, max_stall_count=3) + .with_human_input_on_stall(enable=True) + .build() + ) + + # During execution, handle human intervention requests + async for event in workflow.run_stream("task"): + if isinstance(event, RequestInfoEvent): + if event.request_type is MagenticHumanInterventionRequest: + request = event.data + if request.kind == MagenticHumanInterventionKind.STALL: + print(f"Workflow stalled: {request.stall_reason}") + reply = MagenticHumanInterventionReply( + decision=MagenticHumanInterventionDecision.GUIDANCE, + comments="Focus on completing the current step first", + ) + responses = {event.request_id: reply} + async for ev in workflow.send_responses_streaming(responses): + ... + + See Also: + - :class:`MagenticHumanInterventionRequest`: Unified request type + - :class:`MagenticHumanInterventionDecision`: Decision options + - :meth:`with_standard_manager`: Configure max_stall_count for stall detection + """ + self._enable_stall_intervention = enable + return self + def with_checkpointing(self, checkpoint_storage: CheckpointStorage) -> "MagenticBuilder": """Enable workflow state persistence using the provided checkpoint storage. @@ -1916,7 +2314,7 @@ def with_checkpointing(self, checkpoint_storage: CheckpointStorage) -> "Magentic workflow = ( MagenticBuilder() .participants(agent1=agent1) - .with_standard_manager(chat_client=client) + .with_standard_manager(agent=manager_agent) .with_checkpointing(storage) .build() ) @@ -1943,9 +2341,8 @@ def with_standard_manager( manager: MagenticManagerBase | None = None, *, # Constructor args for StandardMagenticManager when manager is not provided - chat_client: ChatClientProtocol | None = None, + agent: AgentProtocol | None = None, task_ledger: _MagenticTaskLedger | None = None, - instructions: str | None = None, # Prompt overrides task_ledger_facts_prompt: str | None = None, task_ledger_plan_prompt: str | None = None, @@ -1966,18 +2363,18 @@ def with_standard_manager( 1. **Provide existing manager**: Pass a pre-configured manager instance (custom or standard) for full control over behavior - 2. **Auto-create standard manager**: Pass chat_client and options to automatically - create a StandardMagenticManager with specified configuration + 2. **Auto-create with agent**: Pass an agent to automatically create a + StandardMagenticManager that uses the agent's configured instructions and + options (temperature, seed, etc.) Args: manager: Pre-configured manager instance (StandardMagenticManager or custom MagenticManagerBase subclass). If provided, all other arguments are ignored. - chat_client: LLM chat client for generating plans and decisions. Required if - manager is not provided. + agent: Agent instance for generating plans and decisions. The agent's + configured instructions and options (temperature, seed, etc.) will be + applied. task_ledger: Optional custom task ledger implementation for specialized prompting or structured output requirements - instructions: System instructions prepended to all manager prompts to guide - behavior and set expectations task_ledger_facts_prompt: Custom prompt template for extracting facts from task description task_ledger_plan_prompt: Custom prompt template for generating initial plan @@ -2002,25 +2399,30 @@ def with_standard_manager( Self for method chaining Raises: - ValueError: If manager is None and chat_client is also None + ValueError: If manager is None and agent is not provided. - Usage with auto-created manager: + Usage with agent (recommended): .. code-block:: python - from azure.ai.projects.aio import AIProjectClient + from agent_framework import ChatAgent, ChatOptions + from agent_framework.openai import OpenAIChatClient - project_client = AIProjectClient.from_connection_string(...) - chat_client = project_client.inference.get_chat_completions_client() + # Configure manager agent with specific options and instructions + manager_agent = ChatAgent( + name="Coordinator", + chat_client=OpenAIChatClient(model_id="gpt-4o"), + chat_options=ChatOptions(temperature=0.3, seed=42), + instructions="Be concise and focus on accuracy", + ) workflow = ( MagenticBuilder() .participants(agent1=agent1, agent2=agent2) .with_standard_manager( - chat_client=chat_client, + agent=manager_agent, max_round_count=20, max_stall_count=3, - instructions="Be concise and focus on accuracy", ) .build() ) @@ -2046,7 +2448,7 @@ async def plan(self, context: MagenticContext) -> ChatMessage: MagenticBuilder() .participants(coder=coder_agent, reviewer=reviewer_agent) .with_standard_manager( - chat_client=chat_client, + agent=manager_agent, task_ledger_plan_prompt="Create a detailed step-by-step plan...", progress_ledger_prompt="Assess progress and decide next action...", max_stall_count=2, @@ -2059,20 +2461,18 @@ async def plan(self, context: MagenticContext) -> ChatMessage: - Custom managers can implement alternative selection strategies - Prompt templates support Jinja2-style variable substitution - Stall detection helps prevent infinite loops in stuck scenarios + - The agent's instructions are used as system instructions for all manager prompts """ if manager is not None: self._manager = manager return self - if chat_client is None: - raise ValueError( - "chat_client is required when manager is not provided: with_standard_manager(chat_client=...)" - ) + if agent is None: + raise ValueError("agent is required when manager is not provided: with_standard_manager(agent=...)") self._manager = StandardMagenticManager( - chat_client=chat_client, + agent=agent, task_ledger=task_ledger, - instructions=instructions, task_ledger_facts_prompt=task_ledger_facts_prompt, task_ledger_plan_prompt=task_ledger_plan_prompt, task_ledger_full_prompt=task_ledger_full_prompt, @@ -2094,7 +2494,7 @@ def build(self) -> Workflow: if self._manager is None: raise ValueError("No manager configured. Call with_standard_manager(...) before build().") - logger.info("Building Magentic workflow with %d participants", len(self._participants)) + logger.info(f"Building Magentic workflow with {len(self._participants)} participants") # Create participant descriptions participant_descriptions: dict[str, str] = {} @@ -2104,12 +2504,14 @@ def build(self) -> Workflow: # Type narrowing: we already checked self._manager is not None above manager: MagenticManagerBase = self._manager # type: ignore[assignment] + enable_stall_intervention = self._enable_stall_intervention def _orchestrator_factory(wiring: _GroupChatConfig) -> Executor: return MagenticOrchestratorExecutor( manager=manager, participants=participant_descriptions, require_plan_signoff=self._enable_plan_review, + enable_stall_intervention=enable_stall_intervention, executor_id="magentic_orchestrator", ) @@ -2367,6 +2769,13 @@ def __getattr__(self, name: str) -> Any: # endregion Magentic Workflow -# Public aliases for types needed by users implementing custom plan review handlers -MagenticPlanReviewRequest = _MagenticPlanReviewRequest -MagenticPlanReviewReply = _MagenticPlanReviewReply +# Public aliases for unified human intervention types +MagenticHumanInterventionRequest = _MagenticHumanInterventionRequest +MagenticHumanInterventionReply = _MagenticHumanInterventionReply + +# Backward compatibility aliases (deprecated) +# Old aliases - point to unified types for compatibility +MagenticHumanInputRequest = _MagenticHumanInterventionRequest # type: ignore +MagenticStallInterventionRequest = _MagenticHumanInterventionRequest # type: ignore +MagenticStallInterventionReply = _MagenticHumanInterventionReply # type: ignore +MagenticStallInterventionDecision = MagenticHumanInterventionDecision # type: ignore diff --git a/python/packages/core/tests/workflow/test_magentic.py b/python/packages/core/tests/workflow/test_magentic.py index b41d243a3e..e9f5dcf70d 100644 --- a/python/packages/core/tests/workflow/test_magentic.py +++ b/python/packages/core/tests/workflow/test_magentic.py @@ -12,16 +12,13 @@ AgentRunResponseUpdate, AgentRunUpdateEvent, BaseAgent, - ChatClientProtocol, ChatMessage, - ChatResponse, - ChatResponseUpdate, Executor, MagenticBuilder, + MagenticHumanInterventionDecision, + MagenticHumanInterventionReply, + MagenticHumanInterventionRequest, MagenticManagerBase, - MagenticPlanReviewDecision, - MagenticPlanReviewReply, - MagenticPlanReviewRequest, RequestInfoEvent, Role, TextContent, @@ -59,21 +56,25 @@ def test_magentic_start_message_from_string(): assert msg.task.text == "Do the thing" -def test_plan_review_request_defaults_and_reply_variants(): - req = MagenticPlanReviewRequest() # defaults provided by dataclass +def test_human_intervention_request_defaults_and_reply_variants(): + from agent_framework._workflows._magentic import MagenticHumanInterventionKind + + req = MagenticHumanInterventionRequest(kind=MagenticHumanInterventionKind.PLAN_REVIEW) assert hasattr(req, "request_id") assert req.task_text == "" and req.facts_text == "" and req.plan_text == "" assert isinstance(req.round_index, int) and req.round_index == 0 # Replies: approve, revise with comments, revise with edited text - approve = MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.APPROVE) - revise_comments = MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.REVISE, comments="Tighten scope") - revise_text = MagenticPlanReviewReply( - decision=MagenticPlanReviewDecision.REVISE, + approve = MagenticHumanInterventionReply(decision=MagenticHumanInterventionDecision.APPROVE) + revise_comments = MagenticHumanInterventionReply( + decision=MagenticHumanInterventionDecision.REVISE, comments="Tighten scope" + ) + revise_text = MagenticHumanInterventionReply( + decision=MagenticHumanInterventionDecision.REVISE, edited_plan_text="- Step 1\n- Step 2", ) - assert approve.decision == MagenticPlanReviewDecision.APPROVE + assert approve.decision == MagenticHumanInterventionDecision.APPROVE assert revise_comments.comments == "Tighten scope" assert revise_text.edited_plan_text is not None and revise_text.edited_plan_text.startswith("- Step 1") @@ -220,15 +221,14 @@ async def test_magentic_workflow_plan_review_approval_to_completion(): req_event: RequestInfoEvent | None = None async for ev in wf.run_stream("do work"): - if isinstance(ev, RequestInfoEvent) and ev.request_type is MagenticPlanReviewRequest: + if isinstance(ev, RequestInfoEvent) and ev.request_type is MagenticHumanInterventionRequest: req_event = ev assert req_event is not None completed = False output: list[ChatMessage] | None = None - async for ev in wf.send_responses_streaming( - responses={req_event.request_id: MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.APPROVE)} - ): + reply = MagenticHumanInterventionReply(decision=MagenticHumanInterventionDecision.APPROVE) + async for ev in wf.send_responses_streaming(responses={req_event.request_id: reply}): if isinstance(ev, WorkflowStatusEvent) and ev.state == WorkflowRunState.IDLE: completed = True elif isinstance(ev, WorkflowOutputEvent): @@ -265,7 +265,7 @@ async def replan(self, magentic_context: MagenticContext) -> ChatMessage: # typ # Wait for the initial plan review request req_event: RequestInfoEvent | None = None async for ev in wf.run_stream("do work"): - if isinstance(ev, RequestInfoEvent) and ev.request_type is MagenticPlanReviewRequest: + if isinstance(ev, RequestInfoEvent) and ev.request_type is MagenticHumanInterventionRequest: req_event = ev assert req_event is not None @@ -274,13 +274,13 @@ async def replan(self, magentic_context: MagenticContext) -> ChatMessage: # typ completed = False async for ev in wf.send_responses_streaming( responses={ - req_event.request_id: MagenticPlanReviewReply( - decision=MagenticPlanReviewDecision.APPROVE, + req_event.request_id: MagenticHumanInterventionReply( + decision=MagenticHumanInterventionDecision.APPROVE, comments="Looks good; consider Z", ) } ): - if isinstance(ev, RequestInfoEvent) and ev.request_type is MagenticPlanReviewRequest: + if isinstance(ev, RequestInfoEvent) and ev.request_type is MagenticHumanInterventionRequest: saw_second_review = True if isinstance(ev, WorkflowStatusEvent) and ev.state == WorkflowRunState.IDLE: completed = True @@ -338,7 +338,7 @@ async def test_magentic_checkpoint_resume_round_trip(): task_text = "checkpoint task" req_event: RequestInfoEvent | None = None async for ev in wf.run_stream(task_text): - if isinstance(ev, RequestInfoEvent) and ev.request_type is MagenticPlanReviewRequest: + if isinstance(ev, RequestInfoEvent) and ev.request_type is MagenticHumanInterventionRequest: req_event = ev assert req_event is not None @@ -359,13 +359,13 @@ async def test_magentic_checkpoint_resume_round_trip(): orchestrator = next(exec for exec in wf_resume.executors.values() if isinstance(exec, MagenticOrchestratorExecutor)) - reply = MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.APPROVE) + reply = MagenticHumanInterventionReply(decision=MagenticHumanInterventionDecision.APPROVE) completed: WorkflowOutputEvent | None = None req_event = None async for event in wf_resume.run_stream( resume_checkpoint.checkpoint_id, ): - if isinstance(event, RequestInfoEvent) and event.request_type is MagenticPlanReviewRequest: + if isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest: req_event = event assert req_event is not None @@ -430,25 +430,33 @@ async def test_magentic_agent_executor_on_checkpoint_save_and_restore_roundtrip( from agent_framework import StandardMagenticManager # noqa: E402 -class _StubChatClient(ChatClientProtocol): - @property - def additional_properties(self) -> dict[str, Any]: - """Get additional properties associated with the client.""" - return {} - - async def get_response(self, messages, **kwargs): # type: ignore[override] - return ChatResponse(messages=[ChatMessage(role=Role.ASSISTANT, text="ok")]) - - def get_streaming_response(self, messages, **kwargs) -> AsyncIterable[ChatResponseUpdate]: # type: ignore[override] - async def _gen(): - if False: - yield ChatResponseUpdate() # pragma: no cover +class _StubManagerAgent(BaseAgent): + """Stub agent for testing StandardMagenticManager.""" + + async def run( + self, + messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, + *, + thread: Any = None, + **kwargs: Any, + ) -> AgentRunResponse: + return AgentRunResponse(messages=[ChatMessage(role=Role.ASSISTANT, text="ok")]) + + def run_stream( + self, + messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, + *, + thread: Any = None, + **kwargs: Any, + ) -> AsyncIterable[AgentRunResponseUpdate]: + async def _gen() -> AsyncIterable[AgentRunResponseUpdate]: + yield AgentRunResponseUpdate(message_deltas=[ChatMessage(role=Role.ASSISTANT, text="ok")]) return _gen() async def test_standard_manager_plan_and_replan_via_complete_monkeypatch(): - mgr = StandardMagenticManager(chat_client=_StubChatClient()) + mgr = StandardMagenticManager(agent=_StubManagerAgent()) async def fake_complete_plan(messages: list[ChatMessage], **kwargs: Any) -> ChatMessage: # Return a different response depending on call order length @@ -481,7 +489,7 @@ async def fake_complete_replan(messages: list[ChatMessage], **kwargs: Any) -> Ch async def test_standard_manager_progress_ledger_success_and_error(): - mgr = StandardMagenticManager(chat_client=_StubChatClient()) + mgr = StandardMagenticManager(agent=_StubManagerAgent()) ctx = MagenticContext( task=ChatMessage(role=Role.USER, text="task"), participant_descriptions={"alice": "desc"}, @@ -718,7 +726,7 @@ async def test_magentic_checkpoint_resume_rejects_participant_renames(): req_event: RequestInfoEvent | None = None async for event in workflow.run_stream("task"): - if isinstance(event, RequestInfoEvent) and event.request_type is MagenticPlanReviewRequest: + if isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest: req_event = event assert req_event is not None diff --git a/python/samples/getting_started/workflows/README.md b/python/samples/getting_started/workflows/README.md index 3146f3f38b..258aa64b1d 100644 --- a/python/samples/getting_started/workflows/README.md +++ b/python/samples/getting_started/workflows/README.md @@ -101,6 +101,8 @@ For observability samples in Agent Framework, see the [observability getting sta | Handoff (Return-to-Previous) | [orchestration/handoff_return_to_previous.py](./orchestration/handoff_return_to_previous.py) | Return-to-previous routing: after user input, routes back to the previous specialist instead of coordinator using `.enable_return_to_previous()` | | Magentic Workflow (Multi-Agent) | [orchestration/magentic.py](./orchestration/magentic.py) | Orchestrate multiple agents with Magentic manager and streaming | | Magentic + Human Plan Review | [orchestration/magentic_human_plan_update.py](./orchestration/magentic_human_plan_update.py) | Human reviews/updates the plan before execution | +| Magentic + Human Stall Intervention | [orchestration/magentic_human_replan.py](./orchestration/magentic_human_replan.py) | Human intervenes when workflow stalls with `with_human_input_on_stall()` | +| Magentic + Agent Clarification | [orchestration/magentic_agent_clarification.py](./orchestration/magentic_agent_clarification.py) | Agents ask clarifying questions via `ask_user` tool with `@ai_function(approval_mode="always_require")` | | Magentic + Checkpoint Resume | [orchestration/magentic_checkpoint.py](./orchestration/magentic_checkpoint.py) | Resume Magentic orchestration from saved checkpoints | | Sequential Orchestration (Agents) | [orchestration/sequential_agents.py](./orchestration/sequential_agents.py) | Chain agents sequentially with shared conversation context | | Sequential Orchestration (Custom Executor) | [orchestration/sequential_custom_executors.py](./orchestration/sequential_custom_executors.py) | Mix agents with a summarizer that appends a compact summary | diff --git a/python/samples/getting_started/workflows/agents/magentic_workflow_as_agent.py b/python/samples/getting_started/workflows/agents/magentic_workflow_as_agent.py index 091f37c055..f6dd8ca83d 100644 --- a/python/samples/getting_started/workflows/agents/magentic_workflow_as_agent.py +++ b/python/samples/getting_started/workflows/agents/magentic_workflow_as_agent.py @@ -48,13 +48,21 @@ async def main() -> None: tools=HostedCodeInterpreterTool(), ) + # Create a manager agent for orchestration + manager_agent = ChatAgent( + name="MagenticManager", + description="Orchestrator that coordinates the research and coding workflow", + instructions="You coordinate a team to complete complex tasks efficiently.", + chat_client=OpenAIChatClient(), + ) + print("\nBuilding Magentic Workflow...") workflow = ( MagenticBuilder() .participants(researcher=researcher_agent, coder=coder_agent) .with_standard_manager( - chat_client=OpenAIChatClient(), + agent=manager_agent, max_round_count=10, max_stall_count=3, max_reset_count=2, diff --git a/python/samples/getting_started/workflows/orchestration/magentic.py b/python/samples/getting_started/workflows/orchestration/magentic.py index 0e265cb931..213486706a 100644 --- a/python/samples/getting_started/workflows/orchestration/magentic.py +++ b/python/samples/getting_started/workflows/orchestration/magentic.py @@ -65,6 +65,14 @@ async def main() -> None: tools=HostedCodeInterpreterTool(), ) + # Create a manager agent for orchestration + manager_agent = ChatAgent( + name="MagenticManager", + description="Orchestrator that coordinates the research and coding workflow", + instructions="You coordinate a team to complete complex tasks efficiently.", + chat_client=OpenAIChatClient(), + ) + print("\nBuilding Magentic Workflow...") # State used by on_agent_stream callback @@ -75,7 +83,7 @@ async def main() -> None: MagenticBuilder() .participants(researcher=researcher_agent, coder=coder_agent) .with_standard_manager( - chat_client=OpenAIChatClient(), + agent=manager_agent, max_round_count=10, max_stall_count=3, max_reset_count=2, diff --git a/python/samples/getting_started/workflows/orchestration/magentic_agent_clarification.py b/python/samples/getting_started/workflows/orchestration/magentic_agent_clarification.py new file mode 100644 index 0000000000..44dea25acc --- /dev/null +++ b/python/samples/getting_started/workflows/orchestration/magentic_agent_clarification.py @@ -0,0 +1,230 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +import logging +from typing import Annotated, cast + +from agent_framework import ( + MAGENTIC_EVENT_TYPE_AGENT_DELTA, + MAGENTIC_EVENT_TYPE_ORCHESTRATOR, + AgentRunUpdateEvent, + ChatAgent, + ChatMessage, + MagenticBuilder, + MagenticHumanInterventionDecision, + MagenticHumanInterventionKind, + MagenticHumanInterventionReply, + MagenticHumanInterventionRequest, + RequestInfoEvent, + WorkflowOutputEvent, + ai_function, +) +from agent_framework.openai import OpenAIChatClient + +logging.basicConfig(level=logging.WARNING) +logger = logging.getLogger(__name__) + +""" +Sample: Agent Clarification via Tool Calls in Magentic Workflows + +This sample demonstrates how agents can ask clarifying questions to users during +execution via the HITL (Human-in-the-Loop) mechanism. + +Scenario: "Onboard Jessica Smith" +- User provides an ambiguous task: "Onboard Jessica Smith" +- The onboarding agent recognizes missing information and uses the ask_user tool +- The ask_user call surfaces as a TOOL_APPROVAL request via RequestInfoEvent +- User provides the answer (e.g., "Engineering, Software Engineer") +- The answer is fed back to the agent as a FunctionResultContent +- Agent continues execution with the clarified information + +How it works: +1. Agent has an `ask_user` tool decorated with `@ai_function(approval_mode="always_require")` +2. When agent calls `ask_user`, it surfaces as a FunctionApprovalRequestContent +3. MagenticAgentExecutor converts this to a MagenticHumanInterventionRequest(kind=TOOL_APPROVAL) +4. User provides answer via MagenticHumanInterventionReply with response_text +5. The response_text becomes the function result fed back to the agent +6. Agent receives the result and continues processing + +Prerequisites: +- OpenAI credentials configured for `OpenAIChatClient`. +""" + + +@ai_function(approval_mode="always_require") +def ask_user(question: Annotated[str, "The question to ask the user for clarification"]) -> str: + """Ask the user a clarifying question to gather missing information. + + Use this tool when you need additional information from the user to complete + your task effectively. The user's response will be returned so you can + continue with your work. + + Args: + question: The question to ask the user + + Returns: + The user's response to the question + """ + # This function body is a placeholder - the actual interaction happens via HITL. + # When the agent calls this tool: + # 1. The tool call surfaces as a FunctionApprovalRequestContent + # 2. MagenticAgentExecutor detects this and emits a HITL request + # 3. The user provides their answer + # 4. The answer is fed back as the function result + return f"User was asked: {question}" + + +async def main() -> None: + # Create an onboarding agent that asks clarifying questions + onboarding_agent = ChatAgent( + name="OnboardingAgent", + description="HR specialist who handles employee onboarding", + instructions=( + "You are an HR Onboarding Specialist. Your job is to onboard new employees.\n\n" + "IMPORTANT: When given an onboarding request, you MUST gather the following " + "information before proceeding:\n" + "1. Department (e.g., Engineering, Sales, Marketing)\n" + "2. Role/Title (e.g., Software Engineer, Account Executive)\n" + "3. Start date (if not specified)\n" + "4. Manager's name (if known)\n\n" + "Use the ask_user tool to request ANY missing information. " + "Do not proceed with onboarding until you have at least the department and role.\n\n" + "Once you have the information, create an onboarding plan." + ), + chat_client=OpenAIChatClient(model_id="gpt-4o"), + tools=[ask_user], # Tool decorated with @ai_function(approval_mode="always_require") + ) + + # Create a manager agent + manager_agent = ChatAgent( + name="MagenticManager", + description="Orchestrator that coordinates the onboarding workflow", + instructions="You coordinate a team to complete HR tasks efficiently.", + chat_client=OpenAIChatClient(model_id="gpt-4o"), + ) + + print("\nBuilding Magentic Workflow with Agent Clarification...") + + workflow = ( + MagenticBuilder() + .participants(onboarding=onboarding_agent) + .with_standard_manager( + agent=manager_agent, + max_round_count=10, + max_stall_count=3, + max_reset_count=2, + ) + .build() + ) + + # Ambiguous task - agent should ask for clarification + task = "Onboard Jessica Smith" + + print(f"\nTask: {task}") + print("(This is intentionally vague - the agent should ask for more details)") + print("\nStarting workflow execution...") + print("=" * 60) + + try: + pending_request: RequestInfoEvent | None = None + pending_responses: dict[str, object] | None = None + completed = False + workflow_output: str | None = None + + last_stream_agent_id: str | None = None + stream_line_open: bool = False + + while not completed: + if pending_responses is not None: + stream = workflow.send_responses_streaming(pending_responses) + else: + stream = workflow.run_stream(task) + + async for event in stream: + if isinstance(event, AgentRunUpdateEvent): + props = event.data.additional_properties if event.data else None + event_type = props.get("magentic_event_type") if props else None + + if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR: + kind = props.get("orchestrator_message_kind", "") if props else "" + text = event.data.text if event.data else "" + if stream_line_open: + print() + stream_line_open = False + print(f"\n[ORCHESTRATOR: {kind}]\n{text}\n{'-' * 40}") + elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA: + agent_id = props.get("agent_id", "unknown") if props else "unknown" + if last_stream_agent_id != agent_id or not stream_line_open: + if stream_line_open: + print() + print(f"\n[{agent_id}]: ", end="", flush=True) + last_stream_agent_id = agent_id + stream_line_open = True + if event.data and event.data.text: + print(event.data.text, end="", flush=True) + + elif isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest: + if stream_line_open: + print() + stream_line_open = False + pending_request = event + req = cast(MagenticHumanInterventionRequest, event.data) + + if req.kind == MagenticHumanInterventionKind.TOOL_APPROVAL: + print("\n" + "=" * 60) + print("AGENT ASKING FOR CLARIFICATION") + print("=" * 60) + print(f"\nAgent: {req.agent_id}") + print(f"Question: {req.prompt}") + if req.context: + print(f"Details: {req.context}") + print() + + elif isinstance(event, WorkflowOutputEvent): + if stream_line_open: + print() + stream_line_open = False + workflow_output = event.data if event.data else None + completed = True + + if stream_line_open: + print() + stream_line_open = False + pending_responses = None + + if pending_request is not None: + req = cast(MagenticHumanInterventionRequest, pending_request.data) + + if req.kind == MagenticHumanInterventionKind.TOOL_APPROVAL: + # Agent is asking for clarification + print("Please provide your answer:") + answer = input("> ").strip() # noqa: ASYNC250 + + if answer.lower() == "exit": + print("Exiting workflow...") + return + + # Send the answer back - it will be fed to the agent as the function result + reply = MagenticHumanInterventionReply( + decision=MagenticHumanInterventionDecision.APPROVE, + response_text=answer if answer else "No additional information provided.", + ) + pending_responses = {pending_request.request_id: reply} + pending_request = None + + print("\n" + "=" * 60) + print("WORKFLOW COMPLETED") + print("=" * 60) + if workflow_output: + messages = cast(list[ChatMessage], workflow_output) + if messages: + final_msg = messages[-1] + print(f"\nFinal Result:\n{final_msg.text}") + + except Exception as e: + print(f"Workflow execution failed: {e}") + logger.exception("Workflow exception", exc_info=e) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/samples/getting_started/workflows/orchestration/magentic_checkpoint.py b/python/samples/getting_started/workflows/orchestration/magentic_checkpoint.py index de7d794b19..36e6ca4c01 100644 --- a/python/samples/getting_started/workflows/orchestration/magentic_checkpoint.py +++ b/python/samples/getting_started/workflows/orchestration/magentic_checkpoint.py @@ -8,9 +8,10 @@ ChatAgent, FileCheckpointStorage, MagenticBuilder, - MagenticPlanReviewDecision, - MagenticPlanReviewReply, - MagenticPlanReviewRequest, + MagenticHumanInterventionDecision, + MagenticHumanInterventionKind, + MagenticHumanInterventionReply, + MagenticHumanInterventionRequest, RequestInfoEvent, WorkflowCheckpoint, WorkflowOutputEvent, @@ -69,6 +70,14 @@ def build_workflow(checkpoint_storage: FileCheckpointStorage): chat_client=AzureOpenAIChatClient(credential=AzureCliCredential()), ) + # Create a manager agent for orchestration + manager_agent = ChatAgent( + name="MagenticManager", + description="Orchestrator that coordinates the research and writing workflow", + instructions="You coordinate a team to complete complex tasks efficiently.", + chat_client=AzureOpenAIChatClient(credential=AzureCliCredential()), + ) + # The builder wires in the Magentic orchestrator, sets the plan review path, and # stores the checkpoint backend so the runtime knows where to persist snapshots. return ( @@ -76,7 +85,7 @@ def build_workflow(checkpoint_storage: FileCheckpointStorage): .participants(researcher=researcher, writer=writer) .with_plan_review() .with_standard_manager( - chat_client=AzureOpenAIChatClient(credential=AzureCliCredential()), + agent=manager_agent, max_round_count=10, max_stall_count=3, ) @@ -103,9 +112,12 @@ async def main() -> None: # the plan for human review. plan_review_request_id: str | None = None async for event in workflow.run_stream(TASK): - if isinstance(event, RequestInfoEvent) and event.request_type is MagenticPlanReviewRequest: - plan_review_request_id = event.request_id - print(f"Captured plan review request: {plan_review_request_id}") + if isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest: + request = event.data + if isinstance(request, MagenticHumanInterventionRequest): + if request.kind == MagenticHumanInterventionKind.PLAN_REVIEW: + plan_review_request_id = event.request_id + print(f"Captured plan review request: {plan_review_request_id}") if isinstance(event, WorkflowStatusEvent) and event.state is WorkflowRunState.IDLE_WITH_PENDING_REQUESTS: break @@ -137,12 +149,12 @@ async def main() -> None: resumed_workflow = build_workflow(checkpoint_storage) # Construct an approval reply to supply when the plan review request is re-emitted. - approval = MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.APPROVE) + approval = MagenticHumanInterventionReply(decision=MagenticHumanInterventionDecision.APPROVE) # Resume execution and capture the re-emitted plan review request. request_info_event: RequestInfoEvent | None = None async for event in resumed_workflow.run_stream(checkpoint_id=resume_checkpoint.checkpoint_id): - if isinstance(event, RequestInfoEvent) and isinstance(event.data, MagenticPlanReviewRequest): + if isinstance(event, RequestInfoEvent) and isinstance(event.data, MagenticHumanInterventionRequest): request_info_event = event if request_info_event is None: diff --git a/python/samples/getting_started/workflows/orchestration/magentic_human_plan_update.py b/python/samples/getting_started/workflows/orchestration/magentic_human_plan_update.py index 52df5b24c6..b96fac7e99 100644 --- a/python/samples/getting_started/workflows/orchestration/magentic_human_plan_update.py +++ b/python/samples/getting_started/workflows/orchestration/magentic_human_plan_update.py @@ -11,9 +11,10 @@ ChatAgent, HostedCodeInterpreterTool, MagenticBuilder, - MagenticPlanReviewDecision, - MagenticPlanReviewReply, - MagenticPlanReviewRequest, + MagenticHumanInterventionDecision, + MagenticHumanInterventionKind, + MagenticHumanInterventionReply, + MagenticHumanInterventionRequest, RequestInfoEvent, WorkflowOutputEvent, ) @@ -66,6 +67,14 @@ async def main() -> None: tools=HostedCodeInterpreterTool(), ) + # Create a manager agent for the orchestration + manager_agent = ChatAgent( + name="MagenticManager", + description="Orchestrator that coordinates the research and coding workflow", + instructions="You coordinate a team to complete complex tasks efficiently.", + chat_client=OpenAIChatClient(), + ) + # Callbacks def on_exception(exception: Exception) -> None: print(f"Exception occurred: {exception}") @@ -80,7 +89,7 @@ def on_exception(exception: Exception) -> None: MagenticBuilder() .participants(researcher=researcher_agent, coder=coder_agent) .with_standard_manager( - chat_client=OpenAIChatClient(), + agent=manager_agent, max_round_count=10, max_stall_count=3, max_reset_count=2, @@ -103,7 +112,7 @@ def on_exception(exception: Exception) -> None: try: pending_request: RequestInfoEvent | None = None - pending_responses: dict[str, MagenticPlanReviewReply] | None = None + pending_responses: dict[str, MagenticHumanInterventionReply] | None = None completed = False workflow_output: str | None = None @@ -134,11 +143,12 @@ def on_exception(exception: Exception) -> None: stream_line_open = True if event.data and event.data.text: print(event.data.text, end="", flush=True) - elif isinstance(event, RequestInfoEvent) and event.request_type is MagenticPlanReviewRequest: - pending_request = event - review_req = cast(MagenticPlanReviewRequest, event.data) - if review_req.plan_text: - print(f"\n=== PLAN REVIEW REQUEST ===\n{review_req.plan_text}\n") + elif isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest: + request = cast(MagenticHumanInterventionRequest, event.data) + if request.kind == MagenticHumanInterventionKind.PLAN_REVIEW: + pending_request = event + if request.plan_text: + print(f"\n=== PLAN REVIEW REQUEST ===\n{request.plan_text}\n") elif isinstance(event, WorkflowOutputEvent): # Capture workflow output during streaming workflow_output = str(event.data) if event.data else None @@ -154,21 +164,48 @@ def on_exception(exception: Exception) -> None: # Get human input for plan review decision print("Plan review options:") print("1. approve - Approve the plan as-is") - print("2. revise - Request revision of the plan") - print("3. exit - Exit the workflow") + print("2. approve with comments - Approve with feedback for the manager") + print("3. revise - Request revision with your feedback") + print("4. edit - Directly edit the plan text") + print("5. exit - Exit the workflow") while True: - choice = input("Enter your choice (approve/revise/exit): ").strip().lower() # noqa: ASYNC250 + choice = input("Enter your choice (1-5): ").strip().lower() # noqa: ASYNC250 if choice in ["approve", "1"]: - reply = MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.APPROVE) + reply = MagenticHumanInterventionReply(decision=MagenticHumanInterventionDecision.APPROVE) + break + if choice in ["approve with comments", "2"]: + comments = input("Enter your comments for the manager: ").strip() # noqa: ASYNC250 + reply = MagenticHumanInterventionReply( + decision=MagenticHumanInterventionDecision.APPROVE, + comments=comments if comments else None, + ) + break + if choice in ["revise", "3"]: + comments = input("Enter feedback for revising the plan: ").strip() # noqa: ASYNC250 + reply = MagenticHumanInterventionReply( + decision=MagenticHumanInterventionDecision.REVISE, + comments=comments if comments else None, + ) break - if choice in ["revise", "2"]: - reply = MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.REVISE) + if choice in ["edit", "4"]: + print("Enter your edited plan (end with an empty line):") + lines = [] + while True: + line = input() # noqa: ASYNC250 + if line == "": + break + lines.append(line) + edited_plan = "\n".join(lines) + reply = MagenticHumanInterventionReply( + decision=MagenticHumanInterventionDecision.REVISE, + edited_plan_text=edited_plan if edited_plan else None, + ) break - if choice in ["exit", "3"]: + if choice in ["exit", "5"]: print("Exiting workflow...") return - print("Invalid choice. Please enter 'approve', 'revise', or 'exit'.") + print("Invalid choice. Please enter a number 1-5.") pending_responses = {pending_request.request_id: reply} pending_request = None diff --git a/python/samples/getting_started/workflows/orchestration/magentic_human_replan.py b/python/samples/getting_started/workflows/orchestration/magentic_human_replan.py new file mode 100644 index 0000000000..aaa9be66f8 --- /dev/null +++ b/python/samples/getting_started/workflows/orchestration/magentic_human_replan.py @@ -0,0 +1,213 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +import logging +from typing import cast + +from agent_framework import ( + MAGENTIC_EVENT_TYPE_AGENT_DELTA, + MAGENTIC_EVENT_TYPE_ORCHESTRATOR, + AgentRunUpdateEvent, + ChatAgent, + ChatMessage, + MagenticBuilder, + MagenticHumanInterventionDecision, + MagenticHumanInterventionKind, + MagenticHumanInterventionReply, + MagenticHumanInterventionRequest, + RequestInfoEvent, + WorkflowOutputEvent, +) +from agent_framework.openai import OpenAIChatClient + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +""" +Sample: Magentic Orchestration with Human Stall Intervention + +This sample demonstrates how humans can intervene when a Magentic workflow stalls. +When agents stop making progress, the workflow requests human input instead of +automatically replanning. + +Key concepts: +- with_human_input_on_stall(): Enables human intervention when workflow detects stalls +- MagenticHumanInterventionKind.STALL: The request kind for stall interventions +- Human can choose to: continue, trigger replan, or provide guidance + +Stall intervention options: +- CONTINUE: Reset stall counter and continue with current plan +- REPLAN: Trigger automatic replanning by the manager +- GUIDANCE: Provide text guidance to help agents get back on track + +Prerequisites: +- OpenAI credentials configured for `OpenAIChatClient`. + +NOTE: it is sometimes difficult to get the agents to actually stall depending on the task. +""" + + +async def main() -> None: + researcher_agent = ChatAgent( + name="ResearcherAgent", + description="Specialist in research and information gathering", + instructions="You are a Researcher. You find information and gather facts.", + chat_client=OpenAIChatClient(model_id="gpt-4o"), + ) + + analyst_agent = ChatAgent( + name="AnalystAgent", + description="Data analyst who processes and summarizes research findings", + instructions="You are an Analyst. You analyze findings and create summaries.", + chat_client=OpenAIChatClient(model_id="gpt-4o"), + ) + + manager_agent = ChatAgent( + name="MagenticManager", + description="Orchestrator that coordinates the workflow", + instructions="You coordinate a team to complete tasks efficiently.", + chat_client=OpenAIChatClient(model_id="gpt-4o"), + ) + + print("\nBuilding Magentic Workflow with Human Stall Intervention...") + + workflow = ( + MagenticBuilder() + .participants(researcher=researcher_agent, analyst=analyst_agent) + .with_standard_manager( + agent=manager_agent, + max_round_count=10, + max_stall_count=1, # Stall detection after 1 round without progress + max_reset_count=2, + ) + .with_human_input_on_stall() # Request human input when stalled (instead of auto-replan) + .build() + ) + + task = "Research sustainable aviation fuel technology and summarize the findings." + + print(f"\nTask: {task}") + print("\nStarting workflow execution...") + print("=" * 60) + + try: + pending_request: RequestInfoEvent | None = None + pending_responses: dict[str, object] | None = None + completed = False + workflow_output: str | None = None + + last_stream_agent_id: str | None = None + stream_line_open: bool = False + + while not completed: + if pending_responses is not None: + stream = workflow.send_responses_streaming(pending_responses) + else: + stream = workflow.run_stream(task) + + async for event in stream: + if isinstance(event, AgentRunUpdateEvent): + props = event.data.additional_properties if event.data else None + event_type = props.get("magentic_event_type") if props else None + + if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR: + kind = props.get("orchestrator_message_kind", "") if props else "" + text = event.data.text if event.data else "" + if stream_line_open: + print() + stream_line_open = False + print(f"\n[ORCHESTRATOR: {kind}]\n{text}\n{'-' * 40}") + elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA: + agent_id = props.get("agent_id", "unknown") if props else "unknown" + if last_stream_agent_id != agent_id or not stream_line_open: + if stream_line_open: + print() + print(f"\n[{agent_id}]: ", end="", flush=True) + last_stream_agent_id = agent_id + stream_line_open = True + if event.data and event.data.text: + print(event.data.text, end="", flush=True) + + elif isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest: + if stream_line_open: + print() + stream_line_open = False + pending_request = event + req = cast(MagenticHumanInterventionRequest, event.data) + + if req.kind == MagenticHumanInterventionKind.STALL: + print("\n" + "=" * 60) + print("STALL INTERVENTION REQUESTED") + print("=" * 60) + print(f"\nWorkflow appears stalled after {req.stall_count} rounds") + print(f"Reason: {req.stall_reason}") + if req.last_agent: + print(f"Last active agent: {req.last_agent}") + if req.plan_text: + print(f"\nCurrent plan:\n{req.plan_text}") + print() + + elif isinstance(event, WorkflowOutputEvent): + if stream_line_open: + print() + stream_line_open = False + workflow_output = event.data if event.data else None + completed = True + + if stream_line_open: + print() + stream_line_open = False + pending_responses = None + + # Handle stall intervention request + if pending_request is not None: + req = cast(MagenticHumanInterventionRequest, pending_request.data) + reply: MagenticHumanInterventionReply | None = None + + if req.kind == MagenticHumanInterventionKind.STALL: + print("Stall intervention options:") + print("1. continue - Continue with current plan (reset stall counter)") + print("2. replan - Trigger automatic replanning") + print("3. guidance - Provide guidance to help agents") + print("4. exit - Exit the workflow") + + while True: + choice = input("Enter your choice (1-4): ").strip().lower() # noqa: ASYNC250 + if choice in ["continue", "1"]: + reply = MagenticHumanInterventionReply(decision=MagenticHumanInterventionDecision.CONTINUE) + break + if choice in ["replan", "2"]: + reply = MagenticHumanInterventionReply(decision=MagenticHumanInterventionDecision.REPLAN) + break + if choice in ["guidance", "3"]: + guidance = input("Enter your guidance: ").strip() # noqa: ASYNC250 + reply = MagenticHumanInterventionReply( + decision=MagenticHumanInterventionDecision.GUIDANCE, + comments=guidance if guidance else None, + ) + break + if choice in ["exit", "4"]: + print("Exiting workflow...") + return + print("Invalid choice. Please enter a number 1-4.") + + if reply is not None: + pending_responses = {pending_request.request_id: reply} + pending_request = None + + print("\n" + "=" * 60) + print("WORKFLOW COMPLETED") + print("=" * 60) + if workflow_output: + messages = cast(list[ChatMessage], workflow_output) + if messages: + final_msg = messages[-1] + print(f"\nFinal Result:\n{final_msg.text}") + + except Exception as e: + print(f"Workflow execution failed: {e}") + logger.exception("Workflow exception", exc_info=e) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/samples/semantic-kernel-migration/orchestrations/magentic.py b/python/samples/semantic-kernel-migration/orchestrations/magentic.py index f67620273a..87094a2047 100644 --- a/python/samples/semantic-kernel-migration/orchestrations/magentic.py +++ b/python/samples/semantic-kernel-migration/orchestrations/magentic.py @@ -136,10 +136,18 @@ async def run_agent_framework_example(prompt: str) -> str | None: tools=HostedCodeInterpreterTool(), ) + # Create a manager agent for orchestration + manager_agent = ChatAgent( + name="MagenticManager", + description="Orchestrator that coordinates the research and coding workflow", + instructions="You coordinate a team to complete complex tasks efficiently.", + chat_client=OpenAIChatClient(), + ) + workflow = ( MagenticBuilder() .participants(researcher=researcher, coder=coder) - .with_standard_manager(chat_client=OpenAIChatClient()) + .with_standard_manager(agent=manager_agent) .build() )