diff --git a/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_builder.py b/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_builder.py index c4f9ecff59..cb7acd2c4b 100644 --- a/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_builder.py +++ b/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_builder.py @@ -49,7 +49,17 @@ # Action kinds that terminate control flow (no fall-through to successor) # These actions transfer control elsewhere and should not have sequential edges to the next action -TERMINATOR_ACTIONS = frozenset({"Goto", "GotoAction", "BreakLoop", "ContinueLoop", "EndWorkflow", "EndDialog"}) +TERMINATOR_ACTIONS = frozenset({ + "Goto", + "GotoAction", + "BreakLoop", + "ContinueLoop", + "EndWorkflow", + "EndDialog", + "EndConversation", + "CancelDialog", + "CancelAllDialogs", +}) # Required fields for specific action kinds (schema validation) # Each action needs at least one of the listed fields (checked with alternates) @@ -944,6 +954,11 @@ def _get_branch_exit(self, branch_entry: Any) -> Any | None: last_executor = chain[-1] + # Skip terminators — they handle their own control flow + action_def = getattr(last_executor, "_action_def", {}) + if isinstance(action_def, dict) and action_def.get("kind", "") in TERMINATOR_ACTIONS: + return None + # Check if last executor is a structure with branch_exits # In that case, we return the structure so its exits can be collected if hasattr(last_executor, "branch_exits"): diff --git a/python/packages/declarative/agent_framework_declarative/_workflows/_executors_agents.py b/python/packages/declarative/agent_framework_declarative/_workflows/_executors_agents.py index 44c9e958c2..97e9e0f946 100644 --- a/python/packages/declarative/agent_framework_declarative/_workflows/_executors_agents.py +++ b/python/packages/declarative/agent_framework_declarative/_workflows/_executors_agents.py @@ -442,17 +442,27 @@ def _get_agent_name(self, state: Any) -> str | None: agent_config = self._action_def.get("agent") if isinstance(agent_config, str): + if agent_config.startswith("="): + evaluated = state.eval_if_expression(agent_config) + return str(evaluated) if evaluated is not None else None return agent_config if isinstance(agent_config, dict): agent_dict = cast(dict[str, Any], agent_config) name = agent_dict.get("name") if name is not None and isinstance(name, str): - # Support dynamic agent name from expression (would need async eval) + if name.startswith("="): + evaluated = state.eval_if_expression(name) + return str(evaluated) if evaluated is not None else None return str(name) agent_name = self._action_def.get("agentName") - return str(agent_name) if isinstance(agent_name, str) else None + if isinstance(agent_name, str): + if agent_name.startswith("="): + evaluated = state.eval_if_expression(agent_name) + return str(evaluated) if evaluated is not None else None + return agent_name + return None def _get_input_config(self) -> tuple[dict[str, Any], Any, str | None, int]: """Parse input configuration. diff --git a/python/packages/declarative/agent_framework_declarative/_workflows/_executors_control_flow.py b/python/packages/declarative/agent_framework_declarative/_workflows/_executors_control_flow.py index f63e3ada50..da1118e3ec 100644 --- a/python/packages/declarative/agent_framework_declarative/_workflows/_executors_control_flow.py +++ b/python/packages/declarative/agent_framework_declarative/_workflows/_executors_control_flow.py @@ -496,6 +496,7 @@ async def handle_action( ctx: WorkflowContext[ActionComplete], ) -> None: """Simply pass through to continue the workflow.""" + await self._ensure_state_initialized(ctx, trigger) await ctx.send_message(ActionComplete()) diff --git a/python/packages/declarative/tests/test_graph_coverage.py b/python/packages/declarative/tests/test_graph_coverage.py index cf622f6467..a924c6077d 100644 --- a/python/packages/declarative/tests/test_graph_coverage.py +++ b/python/packages/declarative/tests/test_graph_coverage.py @@ -740,6 +740,90 @@ async def test_agent_executor_get_agent_name_legacy(self, mock_context, mock_sta name = executor._get_agent_name(state) assert name == "LegacyAgent" + async def test_agent_executor_get_agent_name_string_expression(self, mock_context, mock_state): + """Test agent name extraction from simple string expression.""" + from unittest.mock import patch + + from agent_framework_declarative._workflows._executors_agents import ( + InvokeAzureAgentExecutor, + ) + + action_def = { + "kind": "InvokeAzureAgent", + "agent": "=Local.SelectedAgent", + } + executor = InvokeAzureAgentExecutor(action_def) + + state = DeclarativeWorkflowState(mock_state) + state.initialize() + + with patch.object(state, "eval_if_expression", return_value="DynamicAgent"): + name = executor._get_agent_name(state) + assert name == "DynamicAgent" + + async def test_agent_executor_get_agent_name_dict_expression(self, mock_context, mock_state): + """Test agent name extraction from nested dict with expression.""" + from unittest.mock import patch + + from agent_framework_declarative._workflows._executors_agents import ( + InvokeAzureAgentExecutor, + ) + + action_def = { + "kind": "InvokeAzureAgent", + "agent": {"name": "=Local.ManagerResult.next_speaker.answer"}, + } + executor = InvokeAzureAgentExecutor(action_def) + + state = DeclarativeWorkflowState(mock_state) + state.initialize() + + with patch.object(state, "eval_if_expression", return_value="WeatherAgent"): + name = executor._get_agent_name(state) + assert name == "WeatherAgent" + + async def test_agent_executor_get_agent_name_legacy_expression(self, mock_context, mock_state): + """Test agent name extraction from legacy agentName with expression.""" + from unittest.mock import patch + + from agent_framework_declarative._workflows._executors_agents import ( + InvokeAzureAgentExecutor, + ) + + action_def = { + "kind": "InvokeAzureAgent", + "agentName": "=Local.NextAgent", + } + executor = InvokeAzureAgentExecutor(action_def) + + state = DeclarativeWorkflowState(mock_state) + state.initialize() + + with patch.object(state, "eval_if_expression", return_value="ResolvedAgent"): + name = executor._get_agent_name(state) + assert name == "ResolvedAgent" + + async def test_agent_executor_get_agent_name_expression_returns_none(self, mock_context, mock_state): + """Test agent name returns None when expression evaluates to None.""" + from unittest.mock import patch + + from agent_framework_declarative._workflows._executors_agents import ( + InvokeAzureAgentExecutor, + ) + + action_def = { + "kind": "InvokeAzureAgent", + "agent": {"name": "=Local.UndefinedVar"}, + } + executor = InvokeAzureAgentExecutor(action_def) + + state = DeclarativeWorkflowState(mock_state) + state.initialize() + + with patch.object(state, "eval_if_expression", return_value=None): + name = executor._get_agent_name(state) + assert name is None + async def test_agent_executor_get_input_config_simple(self, mock_context, mock_state): """Test input config parsing with simple non-dict input.""" from agent_framework_declarative._workflows._executors_agents import ( @@ -2336,6 +2420,89 @@ def test_get_branch_exit_none(self): exit_exec = graph_builder._get_branch_exit(None) assert exit_exec is None + def test_get_branch_exit_returns_none_for_goto_terminator(self): + """Test that _get_branch_exit returns None when branch ends with GotoAction. + + GotoAction is a terminator that handles its own control flow (jumping to + the target action). It should NOT be returned as a branch exit, because + that would cause the parent ConditionGroup to wire it to the next + sequential action, creating a dual-edge where both the goto target and + the next action receive messages. + """ + from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder + from agent_framework_declarative._workflows._executors_control_flow import JoinExecutor + + yaml_def = {"name": "test_workflow", "actions": []} + graph_builder = DeclarativeWorkflowBuilder(yaml_def) + + # GotoAction executor is a JoinExecutor with a GotoAction action_def + goto_executor = JoinExecutor( + {"kind": "GotoAction", "id": "goto_summary", "actionId": "invoke_summary"}, + id="goto_summary", + ) + + # Simulate a single-action branch chain + goto_executor._chain_executors = [goto_executor] # type: ignore[attr-defined] + + exit_exec = graph_builder._get_branch_exit(goto_executor) + assert exit_exec is None + + def test_get_branch_exit_returns_none_for_end_workflow_terminator(self): + """Test that _get_branch_exit returns None when branch ends with EndWorkflow.""" + from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder + from agent_framework_declarative._workflows._executors_control_flow import JoinExecutor + + yaml_def = {"name": "test_workflow", "actions": []} + graph_builder = DeclarativeWorkflowBuilder(yaml_def) + + end_executor = JoinExecutor( + {"kind": "EndWorkflow", "id": "end"}, + id="end", + ) + end_executor._chain_executors = [end_executor] # type: ignore[attr-defined] + + exit_exec = graph_builder._get_branch_exit(end_executor) + assert exit_exec is None + + def test_get_branch_exit_returns_none_for_goto_in_chain(self): + """Test that _get_branch_exit returns None when chain ends with GotoAction. + + Even when a branch has multiple actions before the GotoAction, + the branch exit should be None because the last action is a terminator. + """ + from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder + from agent_framework_declarative._workflows._executors_basic import SendActivityExecutor + from agent_framework_declarative._workflows._executors_control_flow import JoinExecutor + + yaml_def = {"name": "test_workflow", "actions": []} + graph_builder = DeclarativeWorkflowBuilder(yaml_def) + + # A branch with: SendActivity -> GotoAction + activity = SendActivityExecutor({"kind": "SendActivity", "activity": {"text": "msg"}}, id="msg") + goto = JoinExecutor( + {"kind": "GotoAction", "id": "goto_target", "actionId": "some_target"}, + id="goto_target", + ) + activity._chain_executors = [activity, goto] # type: ignore[attr-defined] + + exit_exec = graph_builder._get_branch_exit(activity) + assert exit_exec is None + + def test_get_branch_exit_returns_executor_for_non_terminator(self): + """Test that _get_branch_exit still returns the exit for non-terminator branches.""" + from agent_framework_declarative._workflows._declarative_builder import DeclarativeWorkflowBuilder + from agent_framework_declarative._workflows._executors_basic import SendActivityExecutor + + yaml_def = {"name": "test_workflow", "actions": []} + graph_builder = DeclarativeWorkflowBuilder(yaml_def) + + exec1 = SendActivityExecutor({"kind": "SendActivity", "activity": {"text": "1"}}, id="e1") + exec2 = SendActivityExecutor({"kind": "SendActivity", "activity": {"text": "2"}}, id="e2") + exec1._chain_executors = [exec1, exec2] # type: ignore[attr-defined] + + exit_exec = graph_builder._get_branch_exit(exec1) + assert exit_exec == exec2 + # --------------------------------------------------------------------------- # Agent executor external loop response handler tests diff --git a/python/samples/03-workflows/declarative/deep_research/main.py b/python/samples/03-workflows/declarative/deep_research/main.py index bb3dcc6f0d..9e6756e340 100644 --- a/python/samples/03-workflows/declarative/deep_research/main.py +++ b/python/samples/03-workflows/declarative/deep_research/main.py @@ -180,11 +180,7 @@ async def main() -> None: ) # Load workflow from YAML - samples_root = Path(__file__).parent.parent.parent.parent.parent.parent - workflow_path = samples_root / "workflow-samples" / "DeepResearch.yaml" - if not workflow_path.exists(): - # Fall back to local copy if workflow-samples doesn't exist - workflow_path = Path(__file__).parent / "workflow.yaml" + workflow_path = Path(__file__).parent / "workflow.yaml" workflow = factory.create_workflow_from_yaml_path(workflow_path) diff --git a/python/samples/03-workflows/declarative/deep_research/workflow.yaml b/python/samples/03-workflows/declarative/deep_research/workflow.yaml new file mode 100644 index 0000000000..fed959ab13 --- /dev/null +++ b/python/samples/03-workflows/declarative/deep_research/workflow.yaml @@ -0,0 +1,76 @@ +# DeepResearch workflow (Magentic orchestration pattern). +kind: Workflow +trigger: + + kind: OnConversationStart + id: deep_research_workflow + actions: + + # Analyze task and correlate facts + - kind: InvokeAzureAgent + id: invoke_research + conversationId: =System.ConversationId + agent: + name: ResearchAgent + + # Devise a plan + - kind: InvokeAzureAgent + id: invoke_planner + conversationId: =System.ConversationId + agent: + name: PlannerAgent + + - kind: SetVariable + id: init_counter + variable: Local.TurnCount + value: =0 + + # Manager evaluates and delegates (loop entry) + - kind: InvokeAzureAgent + id: invoke_manager + conversationId: =System.ConversationId + agent: + name: ManagerAgent + output: + responseObject: Local.ManagerResult + + # If satisfied, skip to summary + - kind: ConditionGroup + id: check_satisfied + conditions: + - id: is_satisfied + condition: =Local.ManagerResult.is_request_satisfied.answer + actions: + - kind: GotoAction + id: goto_summary + actionId: invoke_summary + + # Invoke the worker the manager selected + - kind: InvokeAzureAgent + id: invoke_worker + conversationId: =System.ConversationId + agent: + name: =Local.ManagerResult.next_speaker.answer + + # Increment and loop back if under limit + - kind: SetVariable + id: increment_counter + variable: Local.TurnCount + value: =Local.TurnCount + 1 + + - kind: ConditionGroup + id: check_limit + conditions: + - id: can_continue + condition: =Local.TurnCount < 10 + actions: + - kind: GotoAction + id: loop_back + actionId: invoke_manager + + # Synthesize final response + - kind: InvokeAzureAgent + id: invoke_summary + conversationId: =System.ConversationId + agent: + name: SummaryAgent