Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,17 @@

# Action kinds that terminate control flow (no fall-through to successor)
# These actions transfer control elsewhere and should not have sequential edges to the next action
TERMINATOR_ACTIONS = frozenset({"Goto", "GotoAction", "BreakLoop", "ContinueLoop", "EndWorkflow", "EndDialog"})
TERMINATOR_ACTIONS = frozenset({
"Goto",
"GotoAction",
"BreakLoop",
"ContinueLoop",
"EndWorkflow",
"EndDialog",
"EndConversation",
"CancelDialog",
"CancelAllDialogs",
})

# Required fields for specific action kinds (schema validation)
# Each action needs at least one of the listed fields (checked with alternates)
Expand Down Expand Up @@ -944,6 +954,11 @@ def _get_branch_exit(self, branch_entry: Any) -> Any | None:

last_executor = chain[-1]

# Skip terminators — they handle their own control flow
action_def = getattr(last_executor, "_action_def", {})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works, but could be improved, I think, if we add something like an is_terminator property on the base class. Just cause I see us reaching to the "private" _action_def attribute.

if isinstance(action_def, dict) and action_def.get("kind", "") in TERMINATOR_ACTIONS:
return None

# Check if last executor is a structure with branch_exits
# In that case, we return the structure so its exits can be collected
if hasattr(last_executor, "branch_exits"):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,17 +442,27 @@ def _get_agent_name(self, state: Any) -> str | None:
agent_config = self._action_def.get("agent")

if isinstance(agent_config, str):
if agent_config.startswith("="):
evaluated = state.eval_if_expression(agent_config)
return str(evaluated) if evaluated is not None else None
return agent_config

if isinstance(agent_config, dict):
agent_dict = cast(dict[str, Any], agent_config)
name = agent_dict.get("name")
if name is not None and isinstance(name, str):
# Support dynamic agent name from expression (would need async eval)
if name.startswith("="):
evaluated = state.eval_if_expression(name)
return str(evaluated) if evaluated is not None else None
return str(name)

agent_name = self._action_def.get("agentName")
return str(agent_name) if isinstance(agent_name, str) else None
if isinstance(agent_name, str):
if agent_name.startswith("="):
evaluated = state.eval_if_expression(agent_name)
return str(evaluated) if evaluated is not None else None
return agent_name
return None

def _get_input_config(self) -> tuple[dict[str, Any], Any, str | None, int]:
"""Parse input configuration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,7 @@ async def handle_action(
ctx: WorkflowContext[ActionComplete],
) -> None:
"""Simply pass through to continue the workflow."""
await self._ensure_state_initialized(ctx, trigger)
await ctx.send_message(ActionComplete())


Expand Down
167 changes: 167 additions & 0 deletions python/packages/declarative/tests/test_graph_coverage.py
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,90 @@ async def test_agent_executor_get_agent_name_legacy(self, mock_context, mock_sta
name = executor._get_agent_name(state)
assert name == "LegacyAgent"

async def test_agent_executor_get_agent_name_string_expression(self, mock_context, mock_state):
"""Test agent name extraction from simple string expression."""
from unittest.mock import patch

from agent_framework_declarative._workflows._executors_agents import (
InvokeAzureAgentExecutor,
)

action_def = {
"kind": "InvokeAzureAgent",
"agent": "=Local.SelectedAgent",
}
executor = InvokeAzureAgentExecutor(action_def)

state = DeclarativeWorkflowState(mock_state)
state.initialize()

with patch.object(state, "eval_if_expression", return_value="DynamicAgent"):
name = executor._get_agent_name(state)
assert name == "DynamicAgent"

async def test_agent_executor_get_agent_name_dict_expression(self, mock_context, mock_state):
"""Test agent name extraction from nested dict with expression."""
from unittest.mock import patch

from agent_framework_declarative._workflows._executors_agents import (
InvokeAzureAgentExecutor,
)

action_def = {
"kind": "InvokeAzureAgent",
"agent": {"name": "=Local.ManagerResult.next_speaker.answer"},
}
executor = InvokeAzureAgentExecutor(action_def)

state = DeclarativeWorkflowState(mock_state)
state.initialize()

with patch.object(state, "eval_if_expression", return_value="WeatherAgent"):
name = executor._get_agent_name(state)
assert name == "WeatherAgent"

async def test_agent_executor_get_agent_name_legacy_expression(self, mock_context, mock_state):
"""Test agent name extraction from legacy agentName with expression."""
from unittest.mock import patch

from agent_framework_declarative._workflows._executors_agents import (
InvokeAzureAgentExecutor,
)

action_def = {
"kind": "InvokeAzureAgent",
"agentName": "=Local.NextAgent",
}
executor = InvokeAzureAgentExecutor(action_def)

state = DeclarativeWorkflowState(mock_state)
state.initialize()

with patch.object(state, "eval_if_expression", return_value="ResolvedAgent"):
name = executor._get_agent_name(state)
assert name == "ResolvedAgent"

async def test_agent_executor_get_agent_name_expression_returns_none(self, mock_context, mock_state):
"""Test agent name returns None when expression evaluates to None."""
from unittest.mock import patch

from agent_framework_declarative._workflows._executors_agents import (
InvokeAzureAgentExecutor,
)

action_def = {
"kind": "InvokeAzureAgent",
"agent": {"name": "=Local.UndefinedVar"},
}
executor = InvokeAzureAgentExecutor(action_def)

state = DeclarativeWorkflowState(mock_state)
state.initialize()

with patch.object(state, "eval_if_expression", return_value=None):
name = executor._get_agent_name(state)
assert name is None

async def test_agent_executor_get_input_config_simple(self, mock_context, mock_state):
"""Test input config parsing with simple non-dict input."""
from agent_framework_declarative._workflows._executors_agents import (
Expand Down Expand Up @@ -2336,6 +2420,89 @@ def test_get_branch_exit_none(self):
exit_exec = graph_builder._get_branch_exit(None)
assert exit_exec is None

def test_get_branch_exit_returns_none_for_goto_terminator(self):
"""Test that _get_branch_exit returns None when branch ends with GotoAction.

GotoAction is a terminator that handles its own control flow (jumping to
the target action). It should NOT be returned as a branch exit, because
that would cause the parent ConditionGroup to wire it to the next
sequential action, creating a dual-edge where both the goto target and
the next action receive messages.
"""
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
from agent_framework_declarative._workflows._executors_control_flow import JoinExecutor

yaml_def = {"name": "test_workflow", "actions": []}
graph_builder = DeclarativeWorkflowBuilder(yaml_def)

# GotoAction executor is a JoinExecutor with a GotoAction action_def
goto_executor = JoinExecutor(
{"kind": "GotoAction", "id": "goto_summary", "actionId": "invoke_summary"},
id="goto_summary",
)

# Simulate a single-action branch chain
goto_executor._chain_executors = [goto_executor] # type: ignore[attr-defined]

exit_exec = graph_builder._get_branch_exit(goto_executor)
assert exit_exec is None

def test_get_branch_exit_returns_none_for_end_workflow_terminator(self):
"""Test that _get_branch_exit returns None when branch ends with EndWorkflow."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test looks like it's using the incorrect executor type. The true path is: EndWorkflow creates an EndWorkflowExecutor. This test seems to be working right now because both store _action_def, but we aren't validating the actual "prod path." How about:

from agent_framework_declarative._workflows._executors_control_flow import EndWorkflowExecutor

end_executor = EndWorkflowExecutor({"kind": "EndWorkflow", "id": "end"}, id="end")
end_executor._chain_executors = [end_executor]
exit_exec = graph_builder._get_branch_exit(end_executor)
assert exit_exec is None

from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
from agent_framework_declarative._workflows._executors_control_flow import JoinExecutor

yaml_def = {"name": "test_workflow", "actions": []}
graph_builder = DeclarativeWorkflowBuilder(yaml_def)

end_executor = JoinExecutor(
{"kind": "EndWorkflow", "id": "end"},
id="end",
)
end_executor._chain_executors = [end_executor] # type: ignore[attr-defined]

exit_exec = graph_builder._get_branch_exit(end_executor)
assert exit_exec is None

def test_get_branch_exit_returns_none_for_goto_in_chain(self):
"""Test that _get_branch_exit returns None when chain ends with GotoAction.

Even when a branch has multiple actions before the GotoAction,
the branch exit should be None because the last action is a terminator.
"""
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
from agent_framework_declarative._workflows._executors_basic import SendActivityExecutor
from agent_framework_declarative._workflows._executors_control_flow import JoinExecutor

yaml_def = {"name": "test_workflow", "actions": []}
graph_builder = DeclarativeWorkflowBuilder(yaml_def)

# A branch with: SendActivity -> GotoAction
activity = SendActivityExecutor({"kind": "SendActivity", "activity": {"text": "msg"}}, id="msg")
goto = JoinExecutor(
{"kind": "GotoAction", "id": "goto_target", "actionId": "some_target"},
id="goto_target",
)
activity._chain_executors = [activity, goto] # type: ignore[attr-defined]

exit_exec = graph_builder._get_branch_exit(activity)
assert exit_exec is None

def test_get_branch_exit_returns_executor_for_non_terminator(self):
"""Test that _get_branch_exit still returns the exit for non-terminator branches."""
from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder
from agent_framework_declarative._workflows._executors_basic import SendActivityExecutor

yaml_def = {"name": "test_workflow", "actions": []}
graph_builder = DeclarativeWorkflowBuilder(yaml_def)

exec1 = SendActivityExecutor({"kind": "SendActivity", "activity": {"text": "1"}}, id="e1")
exec2 = SendActivityExecutor({"kind": "SendActivity", "activity": {"text": "2"}}, id="e2")
exec1._chain_executors = [exec1, exec2] # type: ignore[attr-defined]

exit_exec = graph_builder._get_branch_exit(exec1)
assert exit_exec == exec2


# ---------------------------------------------------------------------------
# Agent executor external loop response handler tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,7 @@ async def main() -> None:
)

# Load workflow from YAML
samples_root = Path(__file__).parent.parent.parent.parent.parent.parent
workflow_path = samples_root / "workflow-samples" / "DeepResearch.yaml"
if not workflow_path.exists():
# Fall back to local copy if workflow-samples doesn't exist
workflow_path = Path(__file__).parent / "workflow.yaml"
workflow_path = Path(__file__).parent / "workflow.yaml"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the yaml workflow is supposed to be shared between Python and .Net. But let's wait for @moonbox3 's confirmation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we should be using the DeepResearch.yaml sample (the same one that Dotnet uses from MAF's root) as configured previously.


workflow = factory.create_workflow_from_yaml_path(workflow_path)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# DeepResearch workflow (Magentic orchestration pattern).
kind: Workflow
trigger:

kind: OnConversationStart
id: deep_research_workflow
actions:

# Analyze task and correlate facts
- kind: InvokeAzureAgent
id: invoke_research
conversationId: =System.ConversationId
agent:
name: ResearchAgent

# Devise a plan
- kind: InvokeAzureAgent
id: invoke_planner
conversationId: =System.ConversationId
agent:
name: PlannerAgent

- kind: SetVariable
id: init_counter
variable: Local.TurnCount
value: =0

# Manager evaluates and delegates (loop entry)
- kind: InvokeAzureAgent
id: invoke_manager
conversationId: =System.ConversationId
agent:
name: ManagerAgent
output:
responseObject: Local.ManagerResult

# If satisfied, skip to summary
- kind: ConditionGroup
id: check_satisfied
conditions:
- id: is_satisfied
condition: =Local.ManagerResult.is_request_satisfied.answer
actions:
- kind: GotoAction
id: goto_summary
actionId: invoke_summary

# Invoke the worker the manager selected
- kind: InvokeAzureAgent
id: invoke_worker
conversationId: =System.ConversationId
agent:
name: =Local.ManagerResult.next_speaker.answer

# Increment and loop back if under limit
- kind: SetVariable
id: increment_counter
variable: Local.TurnCount
value: =Local.TurnCount + 1

- kind: ConditionGroup
id: check_limit
conditions:
- id: can_continue
condition: =Local.TurnCount < 10
actions:
- kind: GotoAction
id: loop_back
actionId: invoke_manager

# Synthesize final response
- kind: InvokeAzureAgent
id: invoke_summary
conversationId: =System.ConversationId
agent:
name: SummaryAgent