Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"""

import inspect
import json
import logging
import sys
from collections.abc import Awaitable, Callable, Sequence
Expand Down Expand Up @@ -139,7 +140,10 @@ async def process(
from agent_framework._middleware import MiddlewareTermination

# Short-circuit execution and provide deterministic response payload for the tool call.
context.result = {HANDOFF_FUNCTION_RESULT_KEY: self._handoff_functions[context.function.name]}
# Parse the result using the default parser to ensure in a form that can be passed directly to LLM APIs.
context.result = FunctionTool.parse_result({
HANDOFF_FUNCTION_RESULT_KEY: self._handoff_functions[context.function.name]
})
raise MiddlewareTermination(result=context.result)


Expand Down Expand Up @@ -493,9 +497,22 @@ def _is_handoff_requested(self, response: AgentResponse) -> str | None:
last_message = response.messages[-1]
for content in last_message.contents:
if content.type == "function_result":
# Use string comparison instead of isinstance to improve performance
if content.result and isinstance(content.result, dict):
handoff_target = content.result.get(HANDOFF_FUNCTION_RESULT_KEY) # type: ignore
if not content.result:
continue

parsed_result: dict[str, Any] | None = None
if isinstance(content.result, dict):
parsed_result = content.result
elif isinstance(content.result, str):
try:
loaded_result = json.loads(content.result)
except json.JSONDecodeError:
continue
if isinstance(loaded_result, dict):
parsed_result = loaded_result

if parsed_result is not None:
handoff_target = parsed_result.get(HANDOFF_FUNCTION_RESULT_KEY)
if isinstance(handoff_target, str):
return handoff_target
else:
Expand Down
49 changes: 47 additions & 2 deletions python/packages/orchestrations/tests/test_handoff.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,17 @@
resolve_agent_id,
)
from agent_framework._clients import BaseChatClient
from agent_framework._middleware import ChatMiddlewareLayer
from agent_framework._tools import FunctionInvocationLayer
from agent_framework._middleware import ChatMiddlewareLayer, FunctionInvocationContext, MiddlewareTermination
from agent_framework._tools import FunctionInvocationLayer, FunctionTool, tool
from agent_framework.orchestrations import HandoffAgentUserRequest, HandoffBuilder

from agent_framework_orchestrations._handoff import (
HANDOFF_FUNCTION_RESULT_KEY,
HandoffConfiguration,
_AutoHandoffMiddleware, # pyright: ignore[reportPrivateUsage]
get_handoff_tool_name,
)


class MockChatClient(ChatMiddlewareLayer[Any], FunctionInvocationLayer[Any], BaseChatClient[Any]):
"""Mock chat client for testing handoff workflows."""
Expand Down Expand Up @@ -365,3 +372,41 @@ def test_handoff_builder_accepts_all_instances_in_add_handoff():
assert "triage" in workflow.executors
assert "specialist_a" in workflow.executors
assert "specialist_b" in workflow.executors


async def test_auto_handoff_middleware_intercepts_handoff_tool_call() -> None:
"""Middleware should short-circuit matching handoff tool calls with a synthetic result."""
target_id = "specialist"
middleware = _AutoHandoffMiddleware([HandoffConfiguration(target=target_id)])

@tool(name=get_handoff_tool_name(target_id), approval_mode="never_require")
def handoff_tool() -> str:
return "unreachable"

context = FunctionInvocationContext(function=handoff_tool, arguments={})
call_next = AsyncMock()

with pytest.raises(MiddlewareTermination) as exc_info:
await middleware.process(context, call_next)

call_next.assert_not_awaited()
expected_result = FunctionTool.parse_result({HANDOFF_FUNCTION_RESULT_KEY: target_id})
assert context.result == expected_result
assert exc_info.value.result == expected_result


async def test_auto_handoff_middleware_calls_next_for_non_handoff_tool() -> None:
"""Middleware should pass through when the function name is not a configured handoff tool."""
middleware = _AutoHandoffMiddleware([HandoffConfiguration(target="specialist")])

@tool(name="regular_tool", approval_mode="never_require")
def regular_tool() -> str:
return "ok"

context = FunctionInvocationContext(function=regular_tool, arguments={})
call_next = AsyncMock()

await middleware.process(context, call_next)

call_next.assert_awaited_once()
assert context.result is None
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import asyncio
import json
import os
from collections.abc import AsyncIterable
from dataclasses import dataclass, field
from typing import Annotated

Expand All @@ -12,7 +13,6 @@
AgentExecutorRequest,
AgentExecutorResponse,
AgentResponse,
AgentResponseUpdate,
Executor,
Message,
WorkflowBuilder,
Expand Down Expand Up @@ -246,6 +246,31 @@ def display_agent_run_update(event: WorkflowEvent, last_executor: str | None) ->
print(update, end="", flush=True)


async def consume_stream(stream: AsyncIterable[WorkflowEvent]) -> dict[str, str] | None:
"""Consume a workflow event stream, printing outputs and returning any pending human responses."""
requests: list[WorkflowEvent] = []
async for event in stream:
if event.type == "request_info" and isinstance(event.data, DraftFeedbackRequest):
# Stash the request so we can prompt the human after the stream completes.
requests.append(event)

if requests:
pending_responses: dict[str, str] = {}
for request in requests:
print("\n----- Writer draft -----")
print(request.data.draft_text.strip())
print("\nProvide guidance for the editor (or 'approve' to accept the draft).")
answer = input("Human feedback: ").strip() # noqa: ASYNC250
if answer.lower() == "exit":
print("Exiting...")
exit(0)
pending_responses[request.request_id] = answer

return pending_responses

return None


async def main() -> None:
"""Run the workflow and bridge human feedback between two agents."""

Expand All @@ -267,66 +292,23 @@ async def main() -> None:
.build()
)

# Switch to turn on agent run update display.
# By default this is off to reduce clutter during human input.
display_agent_run_update_switch = False

print(
"Interactive mode. When prompted, provide a short feedback note for the editor.",
flush=True,
)

pending_responses: dict[str, str] | None = None
completed = False
initial_run = True
# Initiate the first run of the workflow.
# Runs are not isolated; state is preserved across multiple calls to run.
stream = workflow.run(
"Create a short launch blurb for the LumenX desk lamp. Emphasize adjustability and warm lighting.",
stream=True,
)
pending_responses = await consume_stream(stream)

while not completed:
last_executor: str | None = None
if initial_run:
stream = workflow.run(
"Create a short launch blurb for the LumenX desk lamp. Emphasize adjustability and warm lighting.",
stream=True,
)
initial_run = False
elif pending_responses is not None:
stream = workflow.run(stream=True, responses=pending_responses)
pending_responses = None
else:
break

requests: list[tuple[str, DraftFeedbackRequest]] = []

async for event in stream:
if (
event.type == "output"
and isinstance(event.data, AgentResponseUpdate)
and display_agent_run_update_switch
):
display_agent_run_update(event, last_executor)
if event.type == "request_info" and isinstance(event.data, DraftFeedbackRequest):
# Stash the request so we can prompt the human after the stream completes.
requests.append((event.request_id, event.data))
last_executor = None
elif event.type == "output" and not isinstance(event.data, AgentResponseUpdate):
# Only mark as completed for final outputs, not streaming updates
last_executor = None
response = event.data
final_text = getattr(response, "text", str(response))
print(final_text, flush=True, end="")
completed = True

if requests and not completed:
responses: dict[str, str] = {}
for request_id, request in requests:
print("\n----- Writer draft -----")
print(request.draft_text.strip())
print("\nProvide guidance for the editor (or 'approve' to accept the draft).")
answer = input("Human feedback: ").strip() # noqa: ASYNC250
if answer.lower() == "exit":
print("Exiting...")
return
responses[request_id] = answer
pending_responses = responses
# Run until there are no more requests
while pending_responses is not None:
stream = workflow.run(stream=True, responses=pending_responses)
pending_responses = await consume_stream(stream)

print("Workflow complete.")

Expand Down
12 changes: 0 additions & 12 deletions python/samples/03-workflows/agents/concurrent_workflow_as_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,6 @@
"""


def clear_and_redraw(buffers: dict[str, str], agent_order: list[str]) -> None:
"""Clear terminal and redraw all agent outputs grouped together."""
# ANSI escape: clear screen and move cursor to top-left
print("\033[2J\033[H", end="")
print("===== Concurrent Agent Streaming (Live) =====\n")
for name in agent_order:
print(f"--- {name} ---")
print(buffers.get(name, ""))
print()
print("", end="", flush=True)


async def main() -> None:
# 1) Create three domain agents using AzureOpenAIResponsesClient
client = AzureOpenAIResponsesClient(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ async def main() -> None:
# and escalation paths for human review.
worker = Worker(
id="worker",
chat_client=AzureOpenAIResponsesClient(
client=AzureOpenAIResponsesClient(
project_endpoint=os.environ["AZURE_AI_PROJECT_ENDPOINT"],
deployment_name=os.environ["AZURE_AI_MODEL_DEPLOYMENT_NAME"],
credential=AzureCliCredential(),
Expand Down Expand Up @@ -161,7 +161,7 @@ async def main() -> None:

request_id = agent_request.request_id
# Mock a human response approval for demonstration purposes.
human_response = ReviewResponse(request_id=request_id, feedback="Approved", approved=True)
human_response = ReviewResponse(request_id=request_id, feedback="", approved=True)

# Create the function call result object to send back to the agent.
human_review_function_result = Content.from_function_result(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,7 @@ async def step3(text: str, ctx: WorkflowContext[Never, str]) -> None:

def build_workflow():
"""Build a simple 3-step sequential workflow (~6 seconds total)."""
return (
WorkflowBuilder(start_executor=step1)
.add_edge(step1, step2)
.add_edge(step2, step3)
.build()
)
return WorkflowBuilder(start_executor=step1).add_edge(step1, step2).add_edge(step2, step3).build()


async def run_with_cancellation() -> None:
Expand All @@ -64,7 +59,7 @@ async def run_with_cancellation() -> None:
workflow = build_workflow()

# Wrap workflow.run() in a task to enable cancellation
task = asyncio.create_task(workflow.run("hello world"))
task = asyncio.ensure_future(workflow.run("hello world"))

# Wait 3 seconds (Step1 completes, Step2 is mid-execution), then cancel
await asyncio.sleep(3)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ async def main() -> None:
)

# Load workflow from YAML
samples_root = Path(__file__).parent.parent.parent.parent.parent.parent.parent
samples_root = Path(__file__).parent.parent.parent.parent.parent.parent
workflow_path = samples_root / "workflow-samples" / "DeepResearch.yaml"
if not workflow_path.exists():
# Fall back to local copy if workflow-samples doesn't exist
Expand Down
33 changes: 11 additions & 22 deletions python/samples/03-workflows/declarative/human_in_loop/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import asyncio
from pathlib import Path
from typing import cast

from agent_framework import Workflow
from agent_framework.declarative import ExternalInputRequest, WorkflowFactory
Expand All @@ -31,27 +32,18 @@ async def run_with_streaming(workflow: Workflow) -> None:
data = event.data
if isinstance(data, TextOutputEvent):
print(f"[Bot]: {data.text}")
elif isinstance(data, ExternalInputRequest):
# In a real scenario, you would:
# 1. Display the prompt to the user
# 2. Wait for their response
# 3. Use the response to continue the workflow
output_property = data.metadata.get("output_property", "unknown")
print(f"[System] Input requested for: {output_property}")
if data.message:
print(f"[System] Prompt: {data.message}")
else:
print(f"[Output]: {data}")


async def run_with_result(workflow: Workflow) -> None:
"""Demonstrate batch workflow execution with run()."""
print("\n=== Batch Execution (run) ===")
print("-" * 40)

result = await workflow.run({})
for output in result.get_outputs():
print(f" Output: {output}")
elif event.type == "request_info":
request = cast(ExternalInputRequest, event.data)
# In a real scenario, you would:
# 1. Display the prompt to the user
# 2. Wait for their response
# 3. Use the response to continue the workflow
output_property = request.metadata.get("output_property", "unknown")
print(f"[System] Input requested for: {output_property}")
if request.message:
print(f"[System] Prompt: {request.message}")


async def main() -> None:
Expand All @@ -70,9 +62,6 @@ async def main() -> None:
# Demonstrate streaming execution
await run_with_streaming(workflow)

# Demonstrate batch execution
# await run_with_result(workflow)

print("\n" + "-" * 40)
print("=== Workflow Complete ===")
print()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from typing_extensions import Never

"""
Sample: AzureOpenAI Chat Agents in workflow with human feedback
Sample: Azure AI Agents in workflow with human feedback

Pipeline layout:
writer_agent -> Coordinator -> writer_agent -> Coordinator -> final_editor_agent -> Coordinator -> output
Expand Down
1 change: 1 addition & 0 deletions python/samples/03-workflows/orchestrations/magentic.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger(__name__)


"""
Sample: Magentic Orchestration (multi-agent)

Expand Down
Loading