diff --git a/python/packages/orchestrations/agent_framework_orchestrations/_handoff.py b/python/packages/orchestrations/agent_framework_orchestrations/_handoff.py index e0dc3e8ea9..e76b34b62a 100644 --- a/python/packages/orchestrations/agent_framework_orchestrations/_handoff.py +++ b/python/packages/orchestrations/agent_framework_orchestrations/_handoff.py @@ -30,6 +30,7 @@ """ import inspect +import json import logging import sys from collections.abc import Awaitable, Callable, Sequence @@ -139,7 +140,10 @@ async def process( from agent_framework._middleware import MiddlewareTermination # Short-circuit execution and provide deterministic response payload for the tool call. - context.result = {HANDOFF_FUNCTION_RESULT_KEY: self._handoff_functions[context.function.name]} + # Parse the result using the default parser to ensure in a form that can be passed directly to LLM APIs. + context.result = FunctionTool.parse_result({ + HANDOFF_FUNCTION_RESULT_KEY: self._handoff_functions[context.function.name] + }) raise MiddlewareTermination(result=context.result) @@ -493,9 +497,22 @@ def _is_handoff_requested(self, response: AgentResponse) -> str | None: last_message = response.messages[-1] for content in last_message.contents: if content.type == "function_result": - # Use string comparison instead of isinstance to improve performance - if content.result and isinstance(content.result, dict): - handoff_target = content.result.get(HANDOFF_FUNCTION_RESULT_KEY) # type: ignore + if not content.result: + continue + + parsed_result: dict[str, Any] | None = None + if isinstance(content.result, dict): + parsed_result = content.result + elif isinstance(content.result, str): + try: + loaded_result = json.loads(content.result) + except json.JSONDecodeError: + continue + if isinstance(loaded_result, dict): + parsed_result = loaded_result + + if parsed_result is not None: + handoff_target = parsed_result.get(HANDOFF_FUNCTION_RESULT_KEY) if isinstance(handoff_target, str): return handoff_target else: diff --git a/python/packages/orchestrations/tests/test_handoff.py b/python/packages/orchestrations/tests/test_handoff.py index e8778a86ca..26e21f46da 100644 --- a/python/packages/orchestrations/tests/test_handoff.py +++ b/python/packages/orchestrations/tests/test_handoff.py @@ -17,10 +17,17 @@ resolve_agent_id, ) from agent_framework._clients import BaseChatClient -from agent_framework._middleware import ChatMiddlewareLayer -from agent_framework._tools import FunctionInvocationLayer +from agent_framework._middleware import ChatMiddlewareLayer, FunctionInvocationContext, MiddlewareTermination +from agent_framework._tools import FunctionInvocationLayer, FunctionTool, tool from agent_framework.orchestrations import HandoffAgentUserRequest, HandoffBuilder +from agent_framework_orchestrations._handoff import ( + HANDOFF_FUNCTION_RESULT_KEY, + HandoffConfiguration, + _AutoHandoffMiddleware, # pyright: ignore[reportPrivateUsage] + get_handoff_tool_name, +) + class MockChatClient(ChatMiddlewareLayer[Any], FunctionInvocationLayer[Any], BaseChatClient[Any]): """Mock chat client for testing handoff workflows.""" @@ -365,3 +372,41 @@ def test_handoff_builder_accepts_all_instances_in_add_handoff(): assert "triage" in workflow.executors assert "specialist_a" in workflow.executors assert "specialist_b" in workflow.executors + + +async def test_auto_handoff_middleware_intercepts_handoff_tool_call() -> None: + """Middleware should short-circuit matching handoff tool calls with a synthetic result.""" + target_id = "specialist" + middleware = _AutoHandoffMiddleware([HandoffConfiguration(target=target_id)]) + + @tool(name=get_handoff_tool_name(target_id), approval_mode="never_require") + def handoff_tool() -> str: + return "unreachable" + + context = FunctionInvocationContext(function=handoff_tool, arguments={}) + call_next = AsyncMock() + + with pytest.raises(MiddlewareTermination) as exc_info: + await middleware.process(context, call_next) + + call_next.assert_not_awaited() + expected_result = FunctionTool.parse_result({HANDOFF_FUNCTION_RESULT_KEY: target_id}) + assert context.result == expected_result + assert exc_info.value.result == expected_result + + +async def test_auto_handoff_middleware_calls_next_for_non_handoff_tool() -> None: + """Middleware should pass through when the function name is not a configured handoff tool.""" + middleware = _AutoHandoffMiddleware([HandoffConfiguration(target="specialist")]) + + @tool(name="regular_tool", approval_mode="never_require") + def regular_tool() -> str: + return "ok" + + context = FunctionInvocationContext(function=regular_tool, arguments={}) + call_next = AsyncMock() + + await middleware.process(context, call_next) + + call_next.assert_awaited_once() + assert context.result is None diff --git a/python/samples/03-workflows/agents/azure_chat_agents_tool_calls_with_feedback.py b/python/samples/03-workflows/agents/azure_chat_agents_tool_calls_with_feedback.py index 72ecda0609..1e37b79f1e 100644 --- a/python/samples/03-workflows/agents/azure_chat_agents_tool_calls_with_feedback.py +++ b/python/samples/03-workflows/agents/azure_chat_agents_tool_calls_with_feedback.py @@ -3,6 +3,7 @@ import asyncio import json import os +from collections.abc import AsyncIterable from dataclasses import dataclass, field from typing import Annotated @@ -12,7 +13,6 @@ AgentExecutorRequest, AgentExecutorResponse, AgentResponse, - AgentResponseUpdate, Executor, Message, WorkflowBuilder, @@ -246,6 +246,31 @@ def display_agent_run_update(event: WorkflowEvent, last_executor: str | None) -> print(update, end="", flush=True) +async def consume_stream(stream: AsyncIterable[WorkflowEvent]) -> dict[str, str] | None: + """Consume a workflow event stream, printing outputs and returning any pending human responses.""" + requests: list[WorkflowEvent] = [] + async for event in stream: + if event.type == "request_info" and isinstance(event.data, DraftFeedbackRequest): + # Stash the request so we can prompt the human after the stream completes. + requests.append(event) + + if requests: + pending_responses: dict[str, str] = {} + for request in requests: + print("\n----- Writer draft -----") + print(request.data.draft_text.strip()) + print("\nProvide guidance for the editor (or 'approve' to accept the draft).") + answer = input("Human feedback: ").strip() # noqa: ASYNC250 + if answer.lower() == "exit": + print("Exiting...") + exit(0) + pending_responses[request.request_id] = answer + + return pending_responses + + return None + + async def main() -> None: """Run the workflow and bridge human feedback between two agents.""" @@ -267,66 +292,23 @@ async def main() -> None: .build() ) - # Switch to turn on agent run update display. - # By default this is off to reduce clutter during human input. - display_agent_run_update_switch = False - print( "Interactive mode. When prompted, provide a short feedback note for the editor.", flush=True, ) - pending_responses: dict[str, str] | None = None - completed = False - initial_run = True + # Initiate the first run of the workflow. + # Runs are not isolated; state is preserved across multiple calls to run. + stream = workflow.run( + "Create a short launch blurb for the LumenX desk lamp. Emphasize adjustability and warm lighting.", + stream=True, + ) + pending_responses = await consume_stream(stream) - while not completed: - last_executor: str | None = None - if initial_run: - stream = workflow.run( - "Create a short launch blurb for the LumenX desk lamp. Emphasize adjustability and warm lighting.", - stream=True, - ) - initial_run = False - elif pending_responses is not None: - stream = workflow.run(stream=True, responses=pending_responses) - pending_responses = None - else: - break - - requests: list[tuple[str, DraftFeedbackRequest]] = [] - - async for event in stream: - if ( - event.type == "output" - and isinstance(event.data, AgentResponseUpdate) - and display_agent_run_update_switch - ): - display_agent_run_update(event, last_executor) - if event.type == "request_info" and isinstance(event.data, DraftFeedbackRequest): - # Stash the request so we can prompt the human after the stream completes. - requests.append((event.request_id, event.data)) - last_executor = None - elif event.type == "output" and not isinstance(event.data, AgentResponseUpdate): - # Only mark as completed for final outputs, not streaming updates - last_executor = None - response = event.data - final_text = getattr(response, "text", str(response)) - print(final_text, flush=True, end="") - completed = True - - if requests and not completed: - responses: dict[str, str] = {} - for request_id, request in requests: - print("\n----- Writer draft -----") - print(request.draft_text.strip()) - print("\nProvide guidance for the editor (or 'approve' to accept the draft).") - answer = input("Human feedback: ").strip() # noqa: ASYNC250 - if answer.lower() == "exit": - print("Exiting...") - return - responses[request_id] = answer - pending_responses = responses + # Run until there are no more requests + while pending_responses is not None: + stream = workflow.run(stream=True, responses=pending_responses) + pending_responses = await consume_stream(stream) print("Workflow complete.") diff --git a/python/samples/03-workflows/agents/concurrent_workflow_as_agent.py b/python/samples/03-workflows/agents/concurrent_workflow_as_agent.py index c9d3a55920..de465eba15 100644 --- a/python/samples/03-workflows/agents/concurrent_workflow_as_agent.py +++ b/python/samples/03-workflows/agents/concurrent_workflow_as_agent.py @@ -26,18 +26,6 @@ """ -def clear_and_redraw(buffers: dict[str, str], agent_order: list[str]) -> None: - """Clear terminal and redraw all agent outputs grouped together.""" - # ANSI escape: clear screen and move cursor to top-left - print("\033[2J\033[H", end="") - print("===== Concurrent Agent Streaming (Live) =====\n") - for name in agent_order: - print(f"--- {name} ---") - print(buffers.get(name, "")) - print() - print("", end="", flush=True) - - async def main() -> None: # 1) Create three domain agents using AzureOpenAIResponsesClient client = AzureOpenAIResponsesClient( diff --git a/python/samples/03-workflows/agents/workflow_as_agent_human_in_the_loop.py b/python/samples/03-workflows/agents/workflow_as_agent_human_in_the_loop.py index dfe510762d..30b80b2adf 100644 --- a/python/samples/03-workflows/agents/workflow_as_agent_human_in_the_loop.py +++ b/python/samples/03-workflows/agents/workflow_as_agent_human_in_the_loop.py @@ -106,7 +106,7 @@ async def main() -> None: # and escalation paths for human review. worker = Worker( id="worker", - chat_client=AzureOpenAIResponsesClient( + client=AzureOpenAIResponsesClient( project_endpoint=os.environ["AZURE_AI_PROJECT_ENDPOINT"], deployment_name=os.environ["AZURE_AI_MODEL_DEPLOYMENT_NAME"], credential=AzureCliCredential(), @@ -161,7 +161,7 @@ async def main() -> None: request_id = agent_request.request_id # Mock a human response approval for demonstration purposes. - human_response = ReviewResponse(request_id=request_id, feedback="Approved", approved=True) + human_response = ReviewResponse(request_id=request_id, feedback="", approved=True) # Create the function call result object to send back to the agent. human_review_function_result = Content.from_function_result( diff --git a/python/samples/03-workflows/control-flow/workflow_cancellation.py b/python/samples/03-workflows/control-flow/workflow_cancellation.py index 5eefbf0c65..1c2a0b024a 100644 --- a/python/samples/03-workflows/control-flow/workflow_cancellation.py +++ b/python/samples/03-workflows/control-flow/workflow_cancellation.py @@ -50,12 +50,7 @@ async def step3(text: str, ctx: WorkflowContext[Never, str]) -> None: def build_workflow(): """Build a simple 3-step sequential workflow (~6 seconds total).""" - return ( - WorkflowBuilder(start_executor=step1) - .add_edge(step1, step2) - .add_edge(step2, step3) - .build() - ) + return WorkflowBuilder(start_executor=step1).add_edge(step1, step2).add_edge(step2, step3).build() async def run_with_cancellation() -> None: @@ -64,7 +59,7 @@ async def run_with_cancellation() -> None: workflow = build_workflow() # Wrap workflow.run() in a task to enable cancellation - task = asyncio.create_task(workflow.run("hello world")) + task = asyncio.ensure_future(workflow.run("hello world")) # Wait 3 seconds (Step1 completes, Step2 is mid-execution), then cancel await asyncio.sleep(3) diff --git a/python/samples/03-workflows/declarative/deep_research/main.py b/python/samples/03-workflows/declarative/deep_research/main.py index a36f3f49a1..bb3dcc6f0d 100644 --- a/python/samples/03-workflows/declarative/deep_research/main.py +++ b/python/samples/03-workflows/declarative/deep_research/main.py @@ -180,7 +180,7 @@ async def main() -> None: ) # Load workflow from YAML - samples_root = Path(__file__).parent.parent.parent.parent.parent.parent.parent + samples_root = Path(__file__).parent.parent.parent.parent.parent.parent workflow_path = samples_root / "workflow-samples" / "DeepResearch.yaml" if not workflow_path.exists(): # Fall back to local copy if workflow-samples doesn't exist diff --git a/python/samples/03-workflows/declarative/human_in_loop/main.py b/python/samples/03-workflows/declarative/human_in_loop/main.py index 8f501ab358..04c2ab8964 100644 --- a/python/samples/03-workflows/declarative/human_in_loop/main.py +++ b/python/samples/03-workflows/declarative/human_in_loop/main.py @@ -14,6 +14,7 @@ import asyncio from pathlib import Path +from typing import cast from agent_framework import Workflow from agent_framework.declarative import ExternalInputRequest, WorkflowFactory @@ -31,27 +32,18 @@ async def run_with_streaming(workflow: Workflow) -> None: data = event.data if isinstance(data, TextOutputEvent): print(f"[Bot]: {data.text}") - elif isinstance(data, ExternalInputRequest): - # In a real scenario, you would: - # 1. Display the prompt to the user - # 2. Wait for their response - # 3. Use the response to continue the workflow - output_property = data.metadata.get("output_property", "unknown") - print(f"[System] Input requested for: {output_property}") - if data.message: - print(f"[System] Prompt: {data.message}") else: print(f"[Output]: {data}") - - -async def run_with_result(workflow: Workflow) -> None: - """Demonstrate batch workflow execution with run().""" - print("\n=== Batch Execution (run) ===") - print("-" * 40) - - result = await workflow.run({}) - for output in result.get_outputs(): - print(f" Output: {output}") + elif event.type == "request_info": + request = cast(ExternalInputRequest, event.data) + # In a real scenario, you would: + # 1. Display the prompt to the user + # 2. Wait for their response + # 3. Use the response to continue the workflow + output_property = request.metadata.get("output_property", "unknown") + print(f"[System] Input requested for: {output_property}") + if request.message: + print(f"[System] Prompt: {request.message}") async def main() -> None: @@ -70,9 +62,6 @@ async def main() -> None: # Demonstrate streaming execution await run_with_streaming(workflow) - # Demonstrate batch execution - # await run_with_result(workflow) - print("\n" + "-" * 40) print("=== Workflow Complete ===") print() diff --git a/python/samples/03-workflows/human-in-the-loop/agents_with_HITL.py b/python/samples/03-workflows/human-in-the-loop/agents_with_HITL.py index 4398b269d9..9330ba470c 100644 --- a/python/samples/03-workflows/human-in-the-loop/agents_with_HITL.py +++ b/python/samples/03-workflows/human-in-the-loop/agents_with_HITL.py @@ -23,7 +23,7 @@ from typing_extensions import Never """ -Sample: AzureOpenAI Chat Agents in workflow with human feedback +Sample: Azure AI Agents in workflow with human feedback Pipeline layout: writer_agent -> Coordinator -> writer_agent -> Coordinator -> final_editor_agent -> Coordinator -> output diff --git a/python/samples/03-workflows/orchestrations/magentic.py b/python/samples/03-workflows/orchestrations/magentic.py index 7ff0a08b1b..e06bcec0f2 100644 --- a/python/samples/03-workflows/orchestrations/magentic.py +++ b/python/samples/03-workflows/orchestrations/magentic.py @@ -17,6 +17,7 @@ logging.basicConfig(level=logging.WARNING) logger = logging.getLogger(__name__) + """ Sample: Magentic Orchestration (multi-agent) diff --git a/python/samples/03-workflows/tool-approval/group_chat_builder_tool_approval.py b/python/samples/03-workflows/tool-approval/group_chat_builder_tool_approval.py index d7472d13ca..960a547b10 100644 --- a/python/samples/03-workflows/tool-approval/group_chat_builder_tool_approval.py +++ b/python/samples/03-workflows/tool-approval/group_chat_builder_tool_approval.py @@ -148,21 +148,23 @@ async def main() -> None: name="DevOpsEngineer", instructions=( "You are a DevOps engineer responsible for deployments. First check staging " - "status and create a rollback plan, then proceed with production deployment. " - "Always ensure safety measures are in place before deploying." + "status and create a rollback plan, then proceed with production deployment " + "without the need for further instructions." ), tools=[check_staging_status, create_rollback_plan, deploy_to_production], ) # 4. Build a group chat workflow with the selector function - # max_rounds=4: Set a hard limit to 4 rounds + # max_rounds=2: Set a hard limit to 2 rounds # First round: QAEngineer speaks - # Second round: DevOpsEngineer speaks (check staging + create rollback) - # Third round: DevOpsEngineer speaks with an approval request (deploy to production) - # Fourth round: DevOpsEngineer speaks again after approval + # Second round: DevOpsEngineer speaks + # If the round limit is larger than 2, the selector will keep selecting DevOpsEngineer, + # which could result in empty messages sent to the DevOpsEngineer after the second round + # since there is no more input from the QAEngineer. This could lead to error from some LLMs + # if they do not accept empty input. Setting max_rounds=2 prevents this issue. workflow = GroupChatBuilder( participants=[qa_engineer, devops_engineer], - max_rounds=4, + max_rounds=2, selection_func=select_next_speaker, ).build()