Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 4 additions & 12 deletions python/packages/core/agent_framework/_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -2164,15 +2164,11 @@ async def _get_response() -> ChatResponse:
# Error threshold reached: force a final non-tool turn so
# function_call_output items are submitted before exit.
mutable_options["tool_choice"] = "none"
elif (
max_function_calls is not None
and total_function_calls >= max_function_calls
):
elif max_function_calls is not None and total_function_calls >= max_function_calls:
# Best-effort limit: checked after each batch of parallel calls completes,
# so the current batch always runs to completion even if it overshoots.
logger.info(
"Maximum function calls reached (%d/%d). "
"Stopping further function calls for this request.",
"Maximum function calls reached (%d/%d). Stopping further function calls for this request.",
total_function_calls,
max_function_calls,
)
Expand Down Expand Up @@ -2302,15 +2298,11 @@ async def _stream() -> AsyncIterable[ChatResponseUpdate]:
mutable_options["tool_choice"] = "none"
elif result["action"] != "continue":
return
elif (
max_function_calls is not None
and total_function_calls >= max_function_calls
):
elif max_function_calls is not None and total_function_calls >= max_function_calls:
# Best-effort limit: checked after each batch of parallel calls completes,
# so the current batch always runs to completion even if it overshoots.
logger.info(
"Maximum function calls reached (%d/%d). "
"Stopping further function calls for this request.",
"Maximum function calls reached (%d/%d). Stopping further function calls for this request.",
total_function_calls,
max_function_calls,
)
Expand Down
3 changes: 2 additions & 1 deletion python/packages/core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ dependencies = [
# connectors and functions
"openai>=1.99.0",
"azure-identity>=1,<2",
"azure-ai-projects >= 2.0.0b3",
# Pinned to 2.0.0b3 - breaking changes in 2.0.0b4, unpin once upgrades complete
"azure-ai-projects == 2.0.0b3",
"mcp[ws]>=1.24.0,<2",
"packaging>=24.1",
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
AgentResult,
ExternalLoopState,
InvokeAzureAgentExecutor,
InvokeToolExecutor,
)
from ._executors_basic import (
BASIC_ACTION_EXECUTORS,
Expand Down Expand Up @@ -68,6 +67,17 @@
RequestExternalInputExecutor,
WaitForInputExecutor,
)
from ._executors_tools import (
FUNCTION_TOOL_REGISTRY_KEY,
TOOL_ACTION_EXECUTORS,
TOOL_APPROVAL_STATE_KEY,
BaseToolExecutor,
InvokeFunctionToolExecutor,
ToolApprovalRequest,
ToolApprovalResponse,
ToolApprovalState,
ToolInvocationResult,
)
from ._factory import DeclarativeWorkflowError, WorkflowFactory
from ._state import WorkflowState

Expand All @@ -79,13 +89,17 @@
"CONTROL_FLOW_EXECUTORS",
"DECLARATIVE_STATE_KEY",
"EXTERNAL_INPUT_EXECUTORS",
"FUNCTION_TOOL_REGISTRY_KEY",
"TOOL_ACTION_EXECUTORS",
"TOOL_APPROVAL_STATE_KEY",
"TOOL_REGISTRY_KEY",
"ActionComplete",
"ActionTrigger",
"AgentExternalInputRequest",
"AgentExternalInputResponse",
"AgentResult",
"AppendValueExecutor",
"BaseToolExecutor",
"BreakLoopExecutor",
"ClearAllVariablesExecutor",
"ConfirmationExecutor",
Expand All @@ -107,7 +121,7 @@
"ForeachInitExecutor",
"ForeachNextExecutor",
"InvokeAzureAgentExecutor",
"InvokeToolExecutor",
"InvokeFunctionToolExecutor",
"JoinExecutor",
"LoopControl",
"LoopIterationResult",
Expand All @@ -119,6 +133,10 @@
"SetTextVariableExecutor",
"SetValueExecutor",
"SetVariableExecutor",
"ToolApprovalRequest",
"ToolApprovalResponse",
"ToolApprovalState",
"ToolInvocationResult",
"WaitForInputExecutor",
"WorkflowFactory",
"WorkflowState",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from __future__ import annotations

import logging
from typing import Any

from agent_framework import (
Expand All @@ -38,13 +39,18 @@
SwitchEvaluatorExecutor,
)
from ._executors_external_input import EXTERNAL_INPUT_EXECUTORS
from ._executors_tools import TOOL_ACTION_EXECUTORS, InvokeFunctionToolExecutor

logger = logging.getLogger(__name__)


# Combined mapping of all action kinds to executor classes
ALL_ACTION_EXECUTORS = {
**BASIC_ACTION_EXECUTORS,
**CONTROL_FLOW_EXECUTORS,
**AGENT_ACTION_EXECUTORS,
**EXTERNAL_INPUT_EXECUTORS,
**TOOL_ACTION_EXECUTORS,
}

# Action kinds that terminate control flow (no fall-through to successor)
Expand Down Expand Up @@ -78,6 +84,7 @@
"RequestHumanInput": ["variable"],
"WaitForHumanInput": ["variable"],
"EmitEvent": ["event"],
"InvokeFunctionTool": ["functionName"],
}

# Alternate field names that satisfy required field requirements
Expand Down Expand Up @@ -118,6 +125,7 @@ def __init__(
yaml_definition: dict[str, Any],
workflow_id: str | None = None,
agents: dict[str, Any] | None = None,
tools: dict[str, Any] | None = None,
checkpoint_storage: Any | None = None,
validate: bool = True,
max_iterations: int | None = None,
Expand All @@ -128,6 +136,7 @@ def __init__(
yaml_definition: The parsed YAML workflow definition
workflow_id: Optional ID for the workflow (defaults to name from YAML)
agents: Registry of agent instances by name (for InvokeAzureAgent actions)
tools: Registry of tool/function instances by name (for InvokeFunctionTool actions)
checkpoint_storage: Optional checkpoint storage for pause/resume support
validate: Whether to validate the workflow definition before building (default: True)
max_iterations: Maximum runner supersteps. Falls back to the YAML ``maxTurns``
Expand All @@ -138,6 +147,7 @@ def __init__(
self._executors: dict[str, Any] = {} # id -> executor
self._action_index = 0 # Counter for generating unique IDs
self._agents = agents or {} # Agent registry for agent executors
self._tools = tools or {} # Tool registry for tool executors
self._checkpoint_storage = checkpoint_storage
self._pending_gotos: list[tuple[Any, str]] = [] # (goto_executor, target_id)
self._validate = validate
Expand Down Expand Up @@ -423,8 +433,13 @@ def _create_executor_for_action(
executor_class = ALL_ACTION_EXECUTORS.get(kind)

if executor_class is None:
# Unknown action type - skip with warning
# In production, might want to log this
# Unknown action type - log warning and skip
logger.warning(
"Unknown action kind '%s' encountered at index %d - action will be skipped. Available action kinds: %s",
kind,
self._action_index,
list(ALL_ACTION_EXECUTORS.keys()),
)
return None

# Create the executor with ID
Expand All @@ -437,10 +452,12 @@ def _create_executor_for_action(
action_id = f"{parent_id}_{kind}_{self._action_index}" if parent_id else f"{kind}_{self._action_index}"
self._action_index += 1

# Pass agents to agent-related executors
# Pass agents/tools to specialized executors
executor: Any
if kind in ("InvokeAzureAgent",):
executor = InvokeAzureAgentExecutor(action_def, id=action_id, agents=self._agents)
elif kind == "InvokeFunctionTool":
executor = InvokeFunctionToolExecutor(action_def, id=action_id, tools=self._tools)
else:
executor = executor_class(action_def, id=action_id)
self._executors[action_id] = executor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1019,75 +1019,7 @@ async def handle_external_input_response(
await ctx.send_message(ActionComplete())


class InvokeToolExecutor(DeclarativeActionExecutor):
"""Executor that invokes a registered tool/function.

Tools are simpler than agents - they take input, perform an action,
and return a result synchronously (or with a simple async call).
"""

@handler
async def handle_action(
self,
trigger: Any,
ctx: WorkflowContext[ActionComplete],
) -> None:
"""Handle the tool invocation."""
state = await self._ensure_state_initialized(ctx, trigger)

tool_name = self._action_def.get("tool") or self._action_def.get("toolName", "")
input_expr = self._action_def.get("input")
output_property = self._action_def.get("output", {}).get("property") or self._action_def.get("resultProperty")
parameters = self._action_def.get("parameters", {})

# Get tools registry
try:
tool_registry: dict[str, Any] | None = ctx.state.get(TOOL_REGISTRY_KEY)
except KeyError:
tool_registry = {}

tool: Any = tool_registry.get(tool_name) if tool_registry else None

if tool is None:
error_msg = f"Tool '{tool_name}' not found in registry"
if output_property:
state.set(output_property, {"error": error_msg})
await ctx.send_message(ActionComplete())
return

# Build parameters
params: dict[str, Any] = {}
for param_name, param_expression in parameters.items():
params[param_name] = state.eval_if_expression(param_expression)

# Add main input if specified
if input_expr:
params["input"] = state.eval_if_expression(input_expr)

try:
# Invoke the tool
if callable(tool):
from inspect import isawaitable

result = tool(**params)
if isawaitable(result):
result = await result

# Store result
if output_property:
state.set(output_property, result)

except Exception as e:
if output_property:
state.set(output_property, {"error": str(e)})
await ctx.send_message(ActionComplete())
return

await ctx.send_message(ActionComplete())


# Mapping of agent action kinds to executor classes
AGENT_ACTION_EXECUTORS: dict[str, type[DeclarativeActionExecutor]] = {
"InvokeAzureAgent": InvokeAzureAgentExecutor,
"InvokeTool": InvokeToolExecutor,
}
Loading