From 2015f6f7e77562eda921badbaadeedba753301bc Mon Sep 17 00:00:00 2001 From: Bjoern Holtvogt Date: Sat, 31 Jan 2026 21:51:40 +0100 Subject: [PATCH 01/14] Add bulk executor registration method to WorkflowBuilder --- .../_workflows/_workflow_builder.py | 48 +++++++++++++++++++ .../tests/workflow/test_workflow_builder.py | 18 +++++++ 2 files changed, 66 insertions(+) diff --git a/python/packages/core/agent_framework/_workflows/_workflow_builder.py b/python/packages/core/agent_framework/_workflows/_workflow_builder.py index 14cabc219b..147a4503aa 100644 --- a/python/packages/core/agent_framework/_workflows/_workflow_builder.py +++ b/python/packages/core/agent_framework/_workflows/_workflow_builder.py @@ -332,6 +332,54 @@ async def log(self, message: str, ctx: WorkflowContext) -> None: return self + def register_executors(self, executor_factories: dict[str, Callable[[], Executor]]) -> Self: + """Register multiple executor factory functions for lazy initialization. + + This method allows you to register multiple factory functions at once. Each + executor is instantiated only when the workflow is built, enabling deferred + initialization and reducing builder boilerplate. + + Args: + executor_factories: A mapping of executor names to factory callables that + return Executor instances when called. + + Example: + .. code-block:: python + from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler + + + class UpperCaseExecutor(Executor): + @handler + async def process(self, text: str, ctx: WorkflowContext[str]) -> None: + await ctx.send_message(text.upper()) + + + class ReverseExecutor(Executor): + @handler + async def process(self, text: str, ctx: WorkflowContext[str]) -> None: + await ctx.yield_output(text[::-1]) + + + workflow = ( + WorkflowBuilder() + .register_executors({ + "UpperCase": lambda: UpperCaseExecutor(id="upper"), + "Reverse": lambda: ReverseExecutor(id="reverse"), + }) + .set_start_executor("UpperCase") + .add_edge("UpperCase", "Reverse") + .build() + ) + """ + for name in executor_factories: + if name in self._executor_registry: + raise ValueError(f"An executor factory with the name '{name}' is already registered.") + + for name, factory_function in executor_factories.items(): + self._executor_registry[name] = factory_function + + return self + def register_agent( self, factory_func: Callable[[], AgentProtocol], diff --git a/python/packages/core/tests/workflow/test_workflow_builder.py b/python/packages/core/tests/workflow/test_workflow_builder.py index ef572ba82b..c0c7b1b040 100644 --- a/python/packages/core/tests/workflow/test_workflow_builder.py +++ b/python/packages/core/tests/workflow/test_workflow_builder.py @@ -259,6 +259,24 @@ def test_register_multiple_executors(): assert workflow.start_executor_id == "ExecutorA" +def test_register_executors_bulk(): + """Test bulk executor registration with a mapping of factories.""" + builder = WorkflowBuilder() + + result = builder.register_executors({ + "ExecutorA": lambda: MockExecutor(id="ExecutorA"), + "ExecutorB": lambda: MockExecutor(id="ExecutorB"), + }) + + assert result is builder + + workflow = builder.set_start_executor("ExecutorA").add_edge("ExecutorA", "ExecutorB").build() + + assert "ExecutorA" in workflow.executors + assert "ExecutorB" in workflow.executors + assert workflow.start_executor_id == "ExecutorA" + + def test_register_with_multiple_names(): """Test registering the same factory function under multiple names.""" builder = WorkflowBuilder() From 1d5f150e586e7847faa8b03249b2b70f13679ce1 Mon Sep 17 00:00:00 2001 From: Bjoern Holtvogt Date: Thu, 5 Feb 2026 09:23:51 +0100 Subject: [PATCH 02/14] Improve robustness against empty and invalid executor registrations --- .../_workflows/_workflow_builder.py | 13 ++++++++++++- .../core/tests/workflow/test_workflow_builder.py | 14 ++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/python/packages/core/agent_framework/_workflows/_workflow_builder.py b/python/packages/core/agent_framework/_workflows/_workflow_builder.py index 147a4503aa..4087c5efb8 100644 --- a/python/packages/core/agent_framework/_workflows/_workflow_builder.py +++ b/python/packages/core/agent_framework/_workflows/_workflow_builder.py @@ -343,6 +343,10 @@ def register_executors(self, executor_factories: dict[str, Callable[[], Executor executor_factories: A mapping of executor names to factory callables that return Executor instances when called. + Raises: + ValueError: If executor_factories is empty or contains empty names, or if a name is already registered. + TypeError: If an executor factory is not callable. + Example: .. code-block:: python from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler @@ -371,7 +375,14 @@ async def process(self, text: str, ctx: WorkflowContext[str]) -> None: .build() ) """ - for name in executor_factories: + if not executor_factories: + raise ValueError("Executor factories cannot be empty.") + + for name, factory_function in executor_factories.items(): + if not name or not name.strip(): + raise ValueError("Executor factory name cannot be empty.") + if not callable(factory_function): + raise TypeError(f"Executor factory for '{name}' must be callable.") if name in self._executor_registry: raise ValueError(f"An executor factory with the name '{name}' is already registered.") diff --git a/python/packages/core/tests/workflow/test_workflow_builder.py b/python/packages/core/tests/workflow/test_workflow_builder.py index c0c7b1b040..61111ac0ae 100644 --- a/python/packages/core/tests/workflow/test_workflow_builder.py +++ b/python/packages/core/tests/workflow/test_workflow_builder.py @@ -277,6 +277,20 @@ def test_register_executors_bulk(): assert workflow.start_executor_id == "ExecutorA" +def test_register_executors_rejects_empty_inputs(): + """Test that empty executor mappings and entries are rejected.""" + builder = WorkflowBuilder() + + with pytest.raises(ValueError, match="cannot be empty"): + builder.register_executors({}) + + with pytest.raises(ValueError, match="name cannot be empty"): + builder.register_executors({"": lambda: MockExecutor(id="ExecutorA")}) + + with pytest.raises(TypeError, match="must be callable"): + builder.register_executors({"ExecutorA": None}) # type: ignore[arg-type] + + def test_register_with_multiple_names(): """Test registering the same factory function under multiple names.""" builder = WorkflowBuilder() From 194c247a34e297c1acaa69dde8d1eac016b20334 Mon Sep 17 00:00:00 2001 From: Bjoern Holtvogt Date: Thu, 5 Feb 2026 09:41:13 +0100 Subject: [PATCH 03/14] Refactor to dictionary-based register_executors in code and docs --- .../_workflows/_workflow_builder.py | 164 ++++++------------ .../tests/workflow/test_workflow_agent.py | 4 +- .../tests/workflow/test_workflow_builder.py | 93 +++++----- .../_start-here/step4_using_factories.py | 10 +- .../azure_ai_agents_with_shared_thread.py | 5 +- .../azure_chat_agents_function_bridge.py | 2 +- ...re_chat_agents_tool_calls_with_feedback.py | 15 +- .../agents/custom_agent_executors.py | 3 +- .../agents/mixed_agents_and_executors.py | 2 +- .../workflow_as_agent_human_in_the_loop.py | 18 +- .../workflow_as_agent_reflection_pattern.py | 12 +- .../checkpoint_with_human_in_the_loop.py | 8 +- .../checkpoint/checkpoint_with_resume.py | 8 +- .../checkpoint/sub_workflow_checkpoint.py | 7 +- .../composition/sub_workflow_basics.py | 10 +- .../sub_workflow_parallel_requests.py | 37 ++-- .../sub_workflow_request_interception.py | 29 ++-- .../workflows/control-flow/edge_condition.py | 10 +- .../multi_selection_edge_group.py | 22 ++- .../control-flow/sequential_executors.py | 8 +- .../control-flow/sequential_streaming.py | 8 +- .../workflows/control-flow/simple_loop.py | 10 +- .../control-flow/switch_case_edge_group.py | 16 +- .../control-flow/workflow_cancellation.py | 10 +- .../agents_with_approval_requests.py | 9 +- .../guessing_game_with_human_input.py | 2 +- .../aggregate_results_of_different_types.py | 12 +- .../parallelism/fan_out_fan_in_edges.py | 8 +- .../map_reduce_and_visualization.py | 34 ++-- .../shared_states_with_agents.py | 14 +- .../concurrent_with_visualization.py | 8 +- 31 files changed, 301 insertions(+), 297 deletions(-) diff --git a/python/packages/core/agent_framework/_workflows/_workflow_builder.py b/python/packages/core/agent_framework/_workflows/_workflow_builder.py index 4087c5efb8..a6ee707766 100644 --- a/python/packages/core/agent_framework/_workflows/_workflow_builder.py +++ b/python/packages/core/agent_framework/_workflows/_workflow_builder.py @@ -140,8 +140,10 @@ async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None: # Build a workflow workflow = ( WorkflowBuilder() - .register_executor(lambda: UpperCaseExecutor(id="upper"), name="UpperCase") - .register_executor(lambda: ReverseExecutor(id="reverse"), name="Reverse") + .register_executors({ + "UpperCase": lambda: UpperCaseExecutor(id="upper"), + "Reverse": lambda: ReverseExecutor(id="reverse"), + }) .add_edge("UpperCase", "Reverse") .set_start_executor("UpperCase") .build() @@ -260,78 +262,6 @@ def _maybe_wrap_agent( f"WorkflowBuilder expected an Executor or AgentProtocol instance; got {type(candidate).__name__}." ) - def register_executor(self, factory_func: Callable[[], Executor], name: str | list[str]) -> Self: - """Register an executor factory function for lazy initialization. - - This method allows you to register a factory function that creates an executor. - The executor will be instantiated only when the workflow is built, enabling - deferred initialization and potentially reducing startup time. - - Args: - factory_func: A callable that returns an Executor instance when called. - name: The name(s) of the registered executor factory. This doesn't have to match - the executor's ID, but it must be unique within the workflow. - - Example: - .. code-block:: python - from typing_extensions import Never - from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler - - - class UpperCaseExecutor(Executor): - @handler - async def process(self, text: str, ctx: WorkflowContext[str]) -> None: - await ctx.send_message(text.upper()) - - - class ReverseExecutor(Executor): - @handler - async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None: - await ctx.yield_output(text[::-1]) - - - # Build a workflow - workflow = ( - WorkflowBuilder() - .register_executor(lambda: UpperCaseExecutor(id="upper"), name="UpperCase") - .register_executor(lambda: ReverseExecutor(id="reverse"), name="Reverse") - .set_start_executor("UpperCase") - .add_edge("UpperCase", "Reverse") - .build() - ) - - If multiple names are provided, the same factory function will be registered under each name. - - .. code-block:: python - - from agent_framework import WorkflowBuilder, Executor, WorkflowContext, handler - - - class LoggerExecutor(Executor): - @handler - async def log(self, message: str, ctx: WorkflowContext) -> None: - print(f"Log: {message}") - - - # Register the same executor factory under multiple names - workflow = ( - WorkflowBuilder() - .register_executor(lambda: LoggerExecutor(id="logger"), name=["ExecutorA", "ExecutorB"]) - .set_start_executor("ExecutorA") - .add_edge("ExecutorA", "ExecutorB") - .build() - """ - names = [name] if isinstance(name, str) else name - - for n in names: - if n in self._executor_registry: - raise ValueError(f"An executor factory with the name '{n}' is already registered.") - - for n in names: - self._executor_registry[n] = factory_func - - return self - def register_executors(self, executor_factories: dict[str, Callable[[], Executor]]) -> Self: """Register multiple executor factory functions for lazy initialization. @@ -422,7 +352,7 @@ def register_agent( # Build a workflow workflow = ( WorkflowBuilder() - .register_executor(lambda: ..., name="SomeOtherExecutor") + .register_executors({"SomeOtherExecutor": lambda: ...}) .register_agent( lambda: AnthropicAgent(name="writer", model="claude-3-5-sonnet-20241022"), name="WriterAgent", @@ -524,7 +454,7 @@ def add_edge( Note: If instances are provided for both source and target, they will be shared across all workflow instances created from the built Workflow. To avoid this, consider - registering the executors and agents using `register_executor` and `register_agent` + registering the executors and agents using `register_executors` and `register_agent` and referencing them by factory name for lazy initialization instead. Returns: @@ -552,8 +482,10 @@ async def process(self, count: int, ctx: WorkflowContext[Never, str]) -> None: # Connect executors with an edge workflow = ( WorkflowBuilder() - .register_executor(lambda: ProcessorA(id="a"), name="ProcessorA") - .register_executor(lambda: ProcessorB(id="b"), name="ProcessorB") + .register_executors({ + "ProcessorA": lambda: ProcessorA(id="a"), + "ProcessorB": lambda: ProcessorB(id="b"), + }) .add_edge("ProcessorA", "ProcessorB") .set_start_executor("ProcessorA") .build() @@ -561,8 +493,10 @@ async def process(self, count: int, ctx: WorkflowContext[Never, str]) -> None: workflow = ( WorkflowBuilder() - .register_executor(lambda: ProcessorA(id="a"), name="ProcessorA") - .register_executor(lambda: ProcessorB(id="b"), name="ProcessorB") + .register_executors({ + "ProcessorA": lambda: ProcessorA(id="a"), + "ProcessorB": lambda: ProcessorB(id="b"), + }) .add_edge("ProcessorA", "ProcessorB", condition=only_large_numbers) .set_start_executor("ProcessorA") .build() @@ -608,7 +542,7 @@ def add_fan_out_edges( Note: If instances are provided for source and targets, they will be shared across all workflow instances created from the built Workflow. To avoid this, consider - registering the executors and agents using `register_executor` and `register_agent` + registering the executors and agents using `register_executors` and `register_agent` and referencing them by factory name for lazy initialization instead. Example: @@ -639,9 +573,11 @@ async def validate(self, data: str, ctx: WorkflowContext) -> None: # Broadcast to multiple validators workflow = ( WorkflowBuilder() - .register_executor(lambda: DataSource(id="source"), name="DataSource") - .register_executor(lambda: ValidatorA(id="val_a"), name="ValidatorA") - .register_executor(lambda: ValidatorB(id="val_b"), name="ValidatorB") + .register_executors({ + "DataSource": lambda: DataSource(id="source"), + "ValidatorA": lambda: ValidatorA(id="val_a"), + "ValidatorB": lambda: ValidatorB(id="val_b"), + }) .add_fan_out_edges("DataSource", ["ValidatorA", "ValidatorB"]) .set_start_executor("DataSource") .build() @@ -696,7 +632,7 @@ def add_switch_case_edge_group( Note: If instances are provided for source and case targets, they will be shared across all workflow instances created from the built Workflow. To avoid this, consider - registering the executors and agents using `register_executor` and `register_agent` + registering the executors and agents using `register_executors` and `register_agent` and referencing them by factory name for lazy initialization instead. Example: @@ -732,9 +668,11 @@ async def handle(self, result: Result, ctx: WorkflowContext) -> None: # Route based on score value workflow = ( WorkflowBuilder() - .register_executor(lambda: Evaluator(id="eval"), name="Evaluator") - .register_executor(lambda: HighScoreHandler(id="high"), name="HighScoreHandler") - .register_executor(lambda: LowScoreHandler(id="low"), name="LowScoreHandler") + .register_executors({ + "Evaluator": lambda: Evaluator(id="eval"), + "HighScoreHandler": lambda: HighScoreHandler(id="high"), + "LowScoreHandler": lambda: LowScoreHandler(id="low"), + }) .add_switch_case_edge_group( "Evaluator", [ @@ -802,7 +740,7 @@ def add_multi_selection_edge_group( Note: If instances are provided for source and targets, they will be shared across all workflow instances created from the built Workflow. To avoid this, consider - registering the executors and agents using `register_executor` and `register_agent` + registering the executors and agents using `register_executors` and `register_agent` and referencing them by factory name for lazy initialization instead. Example: @@ -846,9 +784,11 @@ def select_workers(task: Task, available: list[str]) -> list[str]: workflow = ( WorkflowBuilder() - .register_executor(lambda: TaskDispatcher(id="dispatcher"), name="TaskDispatcher") - .register_executor(lambda: WorkerA(id="worker_a"), name="WorkerA") - .register_executor(lambda: WorkerB(id="worker_b"), name="WorkerB") + .register_executors({ + "TaskDispatcher": lambda: TaskDispatcher(id="dispatcher"), + "WorkerA": lambda: WorkerA(id="worker_a"), + "WorkerB": lambda: WorkerB(id="worker_b"), + }) .add_multi_selection_edge_group( "TaskDispatcher", ["WorkerA", "WorkerB"], @@ -909,7 +849,7 @@ def add_fan_in_edges( Note: If instances are provided for sources and target, they will be shared across all workflow instances created from the built Workflow. To avoid this, consider - registering the executors and agents using `register_executor` and `register_agent` + registering the executors and agents using `register_executors` and `register_agent` and referencing them by factory name for lazy initialization instead. Example: @@ -935,9 +875,11 @@ async def aggregate(self, results: list[str], ctx: WorkflowContext[Never, str]) # Collect results from multiple producers workflow = ( WorkflowBuilder() - .register_executor(lambda: Producer(id="prod_1"), name="Producer1") - .register_executor(lambda: Producer(id="prod_2"), name="Producer2") - .register_executor(lambda: Aggregator(id="agg"), name="Aggregator") + .register_executors({ + "Producer1": lambda: Producer(id="prod_1"), + "Producer2": lambda: Producer(id="prod_2"), + "Aggregator": lambda: Aggregator(id="agg"), + }) .add_fan_in_edges(["Producer1", "Producer2"], "Aggregator") .set_start_executor("Producer1") .build() @@ -981,7 +923,7 @@ def add_chain(self, executors: Sequence[Executor | AgentProtocol | str]) -> Self Note: If executor instances are provided, they will be shared across all workflow instances created from the built Workflow. To avoid this, consider registering the executors and agents using - `register_executor` and `register_agent` and referencing them by factory name for lazy + `register_executors` and `register_agent` and referencing them by factory name for lazy initialization instead. Example: @@ -1012,9 +954,11 @@ async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None: # Chain executors in sequence workflow = ( WorkflowBuilder() - .register_executor(lambda: Step1(id="step1"), name="step1") - .register_executor(lambda: Step2(id="step2"), name="step2") - .register_executor(lambda: Step3(id="step3"), name="step3") + .register_executors({ + "step1": lambda: Step1(id="step1"), + "step2": lambda: Step2(id="step2"), + "step3": lambda: Step3(id="step3"), + }) .add_chain(["step1", "step2", "step3"]) .set_start_executor("step1") .build() @@ -1076,8 +1020,10 @@ async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None: workflow = ( WorkflowBuilder() - .register_executor(lambda: EntryPoint(id="entry"), name="EntryPoint") - .register_executor(lambda: Processor(id="proc"), name="Processor") + .register_executors({ + "EntryPoint": lambda: EntryPoint(id="entry"), + "Processor": lambda: Processor(id="proc"), + }) .add_edge("EntryPoint", "Processor") .set_start_executor("EntryPoint") .build() @@ -1134,8 +1080,10 @@ async def process(self, count: int, ctx: WorkflowContext[int]) -> None: workflow = ( WorkflowBuilder() .set_max_iterations(500) - .register_executor(lambda: StepA(id="step_a"), name="StepA") - .register_executor(lambda: StepB(id="step_b"), name="StepB") + .register_executors({ + "StepA": lambda: StepA(id="step_a"), + "StepB": lambda: StepB(id="step_b"), + }) .add_edge("StepA", "StepB") .add_edge("StepB", "StepA") # Cycle .set_start_executor("StepA") @@ -1184,8 +1132,10 @@ async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None: storage = FileCheckpointStorage("./checkpoints") workflow = ( WorkflowBuilder() - .register_executor(lambda: ProcessorA(id="proc_a"), name="ProcessorA") - .register_executor(lambda: ProcessorB(id="proc_b"), name="ProcessorB") + .register_executors({ + "ProcessorA": lambda: ProcessorA(id="proc_a"), + "ProcessorB": lambda: ProcessorB(id="proc_b"), + }) .add_edge("ProcessorA", "ProcessorB") .set_start_executor("ProcessorA") .with_checkpointing(storage) @@ -1309,7 +1259,7 @@ async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None: # Build and execute a workflow workflow = ( WorkflowBuilder() - .register_executor(lambda: MyExecutor(id="executor"), name="MyExecutor") + .register_executors({"MyExecutor": lambda: MyExecutor(id="executor")}) .set_start_executor("MyExecutor") .build() ) diff --git a/python/packages/core/tests/workflow/test_workflow_agent.py b/python/packages/core/tests/workflow/test_workflow_agent.py index 9514efdf74..9375544f07 100644 --- a/python/packages/core/tests/workflow/test_workflow_agent.py +++ b/python/packages/core/tests/workflow/test_workflow_agent.py @@ -603,7 +603,7 @@ async def start_executor(messages: list[ChatMessage], ctx: WorkflowContext) -> N # Build workflow: start -> agent1 (no output) -> agent2 (output_response=True) workflow = ( WorkflowBuilder() - .register_executor(lambda: start_executor, "start") + .register_executors({"start": lambda: start_executor}) .register_agent(lambda: MockAgent("agent1", "Agent1 output - should NOT appear"), "agent1") .register_agent( lambda: MockAgent("agent2", "Agent2 output - SHOULD appear"), "agent2", output_response=True @@ -675,7 +675,7 @@ async def start_executor(messages: list[ChatMessage], ctx: WorkflowContext) -> N # Build workflow with single agent that has output_response=True workflow = ( WorkflowBuilder() - .register_executor(lambda: start_executor, "start") + .register_executors({"start": lambda: start_executor}) .register_agent(lambda: MockAgent("agent", "Unique response text"), "agent", output_response=True) .set_start_executor("start") .add_edge("start", "agent") diff --git a/python/packages/core/tests/workflow/test_workflow_builder.py b/python/packages/core/tests/workflow/test_workflow_builder.py index 61111ac0ae..92d3210c52 100644 --- a/python/packages/core/tests/workflow/test_workflow_builder.py +++ b/python/packages/core/tests/workflow/test_workflow_builder.py @@ -218,12 +218,12 @@ def test_add_agent_duplicate_id_raises_error(): # Tests for new executor registration patterns -def test_register_executor_basic(): +def test_register_executors_basic(): """Test basic executor registration with lazy initialization.""" builder = WorkflowBuilder() # Register an executor factory - ID must match the registered name - result = builder.register_executor(lambda: MockExecutor(id="TestExecutor"), name="TestExecutor") + result = builder.register_executors({"TestExecutor": lambda: MockExecutor(id="TestExecutor")}) # Verify that register returns the builder for chaining assert result is builder @@ -235,15 +235,18 @@ def test_register_executor_basic(): def test_register_multiple_executors(): - """Test registering multiple executors and connecting them with edges.""" + """Test registering multiple executors with a mapping of factories and connecting them with edges.""" builder = WorkflowBuilder() - # Register multiple executors - IDs must match registered names - builder.register_executor(lambda: MockExecutor(id="ExecutorA"), name="ExecutorA") - builder.register_executor(lambda: MockExecutor(id="ExecutorB"), name="ExecutorB") - builder.register_executor(lambda: MockExecutor(id="ExecutorC"), name="ExecutorC") + # IDs must match registered names + result = builder.register_executors({ + "ExecutorA": lambda: MockExecutor(id="ExecutorA"), + "ExecutorB": lambda: MockExecutor(id="ExecutorB"), + "ExecutorC": lambda: MockExecutor(id="ExecutorC"), + }) + + assert result is builder - # Build workflow with edges using registered names workflow = ( builder .set_start_executor("ExecutorA") @@ -252,31 +255,12 @@ def test_register_multiple_executors(): .build() ) - # Verify all executors are present assert "ExecutorA" in workflow.executors assert "ExecutorB" in workflow.executors assert "ExecutorC" in workflow.executors assert workflow.start_executor_id == "ExecutorA" -def test_register_executors_bulk(): - """Test bulk executor registration with a mapping of factories.""" - builder = WorkflowBuilder() - - result = builder.register_executors({ - "ExecutorA": lambda: MockExecutor(id="ExecutorA"), - "ExecutorB": lambda: MockExecutor(id="ExecutorB"), - }) - - assert result is builder - - workflow = builder.set_start_executor("ExecutorA").add_edge("ExecutorA", "ExecutorB").build() - - assert "ExecutorA" in workflow.executors - assert "ExecutorB" in workflow.executors - assert workflow.start_executor_id == "ExecutorA" - - def test_register_executors_rejects_empty_inputs(): """Test that empty executor mappings and entries are rejected.""" builder = WorkflowBuilder() @@ -303,7 +287,7 @@ def make_executor(): counter["val"] += 1 return MockExecutor(id="ExecutorA" if counter["val"] == 1 else "ExecutorB") - builder.register_executor(make_executor, name=["ExecutorA", "ExecutorB"]) + builder.register_executors({"ExecutorA": make_executor, "ExecutorB": make_executor}) # Set up workflow workflow = builder.set_start_executor("ExecutorA").add_edge("ExecutorA", "ExecutorB").build() @@ -319,20 +303,19 @@ def test_register_duplicate_name_raises_error(): builder = WorkflowBuilder() # Register first executor - builder.register_executor(lambda: MockExecutor(id="executor_1"), name="MyExecutor") + builder.register_executors({"MyExecutor": lambda: MockExecutor(id="executor_1")}) # Registering second executor with same name should raise ValueError with pytest.raises(ValueError, match="already registered"): - builder.register_executor(lambda: MockExecutor(id="executor_2"), name="MyExecutor") + builder.register_executors({"MyExecutor": lambda: MockExecutor(id="executor_2")}) def test_register_duplicate_id_raises_error(): """Test that registering duplicate id raises an error.""" - builder = WorkflowBuilder() - - # Register first executor - builder.register_executor(lambda: MockExecutor(id="executor"), name="MyExecutor1") - builder.register_executor(lambda: MockExecutor(id="executor"), name="MyExecutor2") + builder = WorkflowBuilder().register_executors({ + "MyExecutor1": lambda: MockExecutor(id="executor"), + "MyExecutor2": lambda: MockExecutor(id="executor"), + }) builder.set_start_executor("MyExecutor1") # Registering second executor with same ID should raise ValueError @@ -399,8 +382,10 @@ def test_register_and_add_edge_with_strings(): builder = WorkflowBuilder() # Register executors - builder.register_executor(lambda: MockExecutor(id="source"), name="Source") - builder.register_executor(lambda: MockExecutor(id="target"), name="Target") + builder.register_executors({ + "Source": lambda: MockExecutor(id="source"), + "Target": lambda: MockExecutor(id="target"), + }) # Add edge using string names workflow = builder.set_start_executor("Source").add_edge("Source", "Target").build() @@ -434,9 +419,11 @@ def test_register_with_fan_out_edges(): builder = WorkflowBuilder() # Register executors - IDs must match registered names - builder.register_executor(lambda: MockExecutor(id="Source"), name="Source") - builder.register_executor(lambda: MockExecutor(id="Target1"), name="Target1") - builder.register_executor(lambda: MockExecutor(id="Target2"), name="Target2") + builder.register_executors({ + "Source": lambda: MockExecutor(id="Source"), + "Target1": lambda: MockExecutor(id="Target1"), + "Target2": lambda: MockExecutor(id="Target2"), + }) # Add fan-out edges using registered names workflow = builder.set_start_executor("Source").add_fan_out_edges("Source", ["Target1", "Target2"]).build() @@ -452,9 +439,11 @@ def test_register_with_fan_in_edges(): builder = WorkflowBuilder() # Register executors - IDs must match registered names - builder.register_executor(lambda: MockExecutor(id="Source1"), name="Source1") - builder.register_executor(lambda: MockExecutor(id="Source2"), name="Source2") - builder.register_executor(lambda: MockAggregator(id="Aggregator"), name="Aggregator") + builder.register_executors({ + "Source1": lambda: MockExecutor(id="Source1"), + "Source2": lambda: MockExecutor(id="Source2"), + "Aggregator": lambda: MockAggregator(id="Aggregator"), + }) # Add fan-in edges using registered names # Both Source1 and Source2 need to be reachable, so connect Source1 to Source2 @@ -477,9 +466,11 @@ def test_register_with_chain(): builder = WorkflowBuilder() # Register executors - IDs must match registered names - builder.register_executor(lambda: MockExecutor(id="Step1"), name="Step1") - builder.register_executor(lambda: MockExecutor(id="Step2"), name="Step2") - builder.register_executor(lambda: MockExecutor(id="Step3"), name="Step3") + builder.register_executors({ + "Step1": lambda: MockExecutor(id="Step1"), + "Step2": lambda: MockExecutor(id="Step2"), + "Step3": lambda: MockExecutor(id="Step3"), + }) # Add chain using registered names workflow = builder.add_chain(["Step1", "Step2", "Step3"]).set_start_executor("Step1").build() @@ -501,7 +492,7 @@ def factory(): return MockExecutor(id="Test") builder = WorkflowBuilder() - builder.register_executor(factory, name="Test") + builder.register_executors({"Test": factory}) # Factory should not be called yet assert call_count == 0 @@ -528,7 +519,7 @@ def test_mixing_eager_and_lazy_initialization_error(): eager_executor = MockExecutor(id="eager") # Register a lazy executor - builder.register_executor(lambda: MockExecutor(id="Lazy"), name="Lazy") + builder.register_executors({"Lazy": lambda: MockExecutor(id="Lazy")}) # Mixing eager and lazy should raise an error during add_edge with pytest.raises( @@ -549,8 +540,10 @@ def condition_func(msg: MockMessage) -> bool: return msg.data > 0 # Register executors - IDs must match registered names - builder.register_executor(lambda: MockExecutor(id="Source"), name="Source") - builder.register_executor(lambda: MockExecutor(id="Target"), name="Target") + builder.register_executors({ + "Source": lambda: MockExecutor(id="Source"), + "Target": lambda: MockExecutor(id="Target"), + }) # Add edge with condition workflow = builder.set_start_executor("Source").add_edge("Source", "Target", condition=condition_func).build() diff --git a/python/samples/getting_started/workflows/_start-here/step4_using_factories.py b/python/samples/getting_started/workflows/_start-here/step4_using_factories.py index f9d4f2b971..4f81478405 100644 --- a/python/samples/getting_started/workflows/_start-here/step4_using_factories.py +++ b/python/samples/getting_started/workflows/_start-here/step4_using_factories.py @@ -69,15 +69,19 @@ def create_agent() -> ChatAgent: async def main(): """Build and run a simple 2-step workflow using the fluent builder API.""" # Build the workflow using a fluent pattern: - # 1) register_executor(factory, name) registers an executor factory + # 1) register_executors({name: factory}) registers executor factories # 2) register_agent(factory, name) registers an agent factory # 3) add_chain([node_names]) adds a sequence of nodes to the workflow # 4) set_start_executor(node) declares the entry point # 5) build() finalizes and returns an immutable Workflow object workflow = ( WorkflowBuilder() - .register_executor(lambda: UpperCase(id="upper_case_executor"), name="UpperCase") - .register_executor(lambda: reverse_text, name="ReverseText") + .register_executors( + { + "UpperCase": lambda: UpperCase(id="upper_case_executor"), + "ReverseText": lambda: reverse_text, + } + ) .register_agent(create_agent, name="DecoderAgent", output_response=True) .add_chain(["UpperCase", "ReverseText", "DecoderAgent"]) .set_start_executor("UpperCase") diff --git a/python/samples/getting_started/workflows/agents/azure_ai_agents_with_shared_thread.py b/python/samples/getting_started/workflows/agents/azure_ai_agents_with_shared_thread.py index 874cb2956a..b695ebd93d 100644 --- a/python/samples/getting_started/workflows/agents/azure_ai_agents_with_shared_thread.py +++ b/python/samples/getting_started/workflows/agents/azure_ai_agents_with_shared_thread.py @@ -74,10 +74,7 @@ async def main() -> None: WorkflowBuilder() .register_agent(factory_func=lambda: writer, name="writer", agent_thread=shared_thread) .register_agent(factory_func=lambda: reviewer, name="reviewer", agent_thread=shared_thread) - .register_executor( - factory_func=lambda: intercept_agent_response, - name="intercept_agent_response", - ) + .register_executors({"intercept_agent_response": lambda: intercept_agent_response}) .add_chain(["writer", "intercept_agent_response", "reviewer"]) .set_start_executor("writer") .build() diff --git a/python/samples/getting_started/workflows/agents/azure_chat_agents_function_bridge.py b/python/samples/getting_started/workflows/agents/azure_chat_agents_function_bridge.py index 11bac9f2c9..1c94fcd119 100644 --- a/python/samples/getting_started/workflows/agents/azure_chat_agents_function_bridge.py +++ b/python/samples/getting_started/workflows/agents/azure_chat_agents_function_bridge.py @@ -112,7 +112,7 @@ async def main() -> None: WorkflowBuilder() .register_agent(create_research_agent, name="research_agent") .register_agent(create_final_editor_agent, name="final_editor_agent") - .register_executor(lambda: enrich_with_references, name="enrich_with_references") + .register_executors({"enrich_with_references": lambda: enrich_with_references}) .set_start_executor("research_agent") .add_edge("research_agent", "enrich_with_references") .add_edge("enrich_with_references", "final_editor_agent") diff --git a/python/samples/getting_started/workflows/agents/azure_chat_agents_tool_calls_with_feedback.py b/python/samples/getting_started/workflows/agents/azure_chat_agents_tool_calls_with_feedback.py index 1b97677374..2b72677ccb 100644 --- a/python/samples/getting_started/workflows/agents/azure_chat_agents_tool_calls_with_feedback.py +++ b/python/samples/getting_started/workflows/agents/azure_chat_agents_tool_calls_with_feedback.py @@ -246,13 +246,14 @@ async def main() -> None: WorkflowBuilder() .register_agent(create_writer_agent, name="writer_agent") .register_agent(create_final_editor_agent, name="final_editor_agent") - .register_executor( - lambda: Coordinator( - id="coordinator", - writer_id="writer_agent", - final_editor_id="final_editor_agent", - ), - name="coordinator", + .register_executors( + { + "coordinator": lambda: Coordinator( + id="coordinator", + writer_id="writer_agent", + final_editor_id="final_editor_agent", + ) + } ) .set_start_executor("writer_agent") .add_edge("writer_agent", "coordinator") diff --git a/python/samples/getting_started/workflows/agents/custom_agent_executors.py b/python/samples/getting_started/workflows/agents/custom_agent_executors.py index 3f95aab0e4..dc74fb52b9 100644 --- a/python/samples/getting_started/workflows/agents/custom_agent_executors.py +++ b/python/samples/getting_started/workflows/agents/custom_agent_executors.py @@ -111,8 +111,7 @@ async def main(): # Set the start node and connect an edge from writer to reviewer. workflow = ( WorkflowBuilder() - .register_executor(Writer, name="writer") - .register_executor(Reviewer, name="reviewer") + .register_executors({"writer": Writer, "reviewer": Reviewer}) .set_start_executor("writer") .add_edge("writer", "reviewer") .build() diff --git a/python/samples/getting_started/workflows/agents/mixed_agents_and_executors.py b/python/samples/getting_started/workflows/agents/mixed_agents_and_executors.py index ab36cf3962..593d084ab3 100644 --- a/python/samples/getting_started/workflows/agents/mixed_agents_and_executors.py +++ b/python/samples/getting_started/workflows/agents/mixed_agents_and_executors.py @@ -100,7 +100,7 @@ async def main(): workflow = ( WorkflowBuilder() .register_agent(lambda: create_coding_agent(chat_client), name="coding_agent") - .register_executor(lambda: Evaluator(id="evaluator"), name="evaluator") + .register_executors({"evaluator": lambda: Evaluator(id="evaluator")}) .set_start_executor("coding_agent") .add_edge("coding_agent", "evaluator") .build() diff --git a/python/samples/getting_started/workflows/agents/workflow_as_agent_human_in_the_loop.py b/python/samples/getting_started/workflows/agents/workflow_as_agent_human_in_the_loop.py index 118800765d..4365d0dd63 100644 --- a/python/samples/getting_started/workflows/agents/workflow_as_agent_human_in_the_loop.py +++ b/python/samples/getting_started/workflows/agents/workflow_as_agent_human_in_the_loop.py @@ -103,16 +103,14 @@ async def main() -> None: # and escalation paths for human review. agent = ( WorkflowBuilder() - .register_executor( - lambda: Worker( - id="sub-worker", - chat_client=AzureOpenAIChatClient(credential=AzureCliCredential()), - ), - name="worker", - ) - .register_executor( - lambda: ReviewerWithHumanInTheLoop(worker_id="sub-worker"), - name="reviewer", + .register_executors( + { + "worker": lambda: Worker( + id="sub-worker", + chat_client=AzureOpenAIChatClient(credential=AzureCliCredential()), + ), + "reviewer": lambda: ReviewerWithHumanInTheLoop(worker_id="sub-worker"), + } ) .add_edge("worker", "reviewer") # Worker sends requests to Reviewer .add_edge("reviewer", "worker") # Reviewer sends feedback to Worker diff --git a/python/samples/getting_started/workflows/agents/workflow_as_agent_reflection_pattern.py b/python/samples/getting_started/workflows/agents/workflow_as_agent_reflection_pattern.py index 9aa98f7b96..901edcf707 100644 --- a/python/samples/getting_started/workflows/agents/workflow_as_agent_reflection_pattern.py +++ b/python/samples/getting_started/workflows/agents/workflow_as_agent_reflection_pattern.py @@ -199,13 +199,11 @@ async def main() -> None: print("Building workflow with Worker ↔ Reviewer cycle...") agent = ( WorkflowBuilder() - .register_executor( - lambda: Worker(id="worker", chat_client=OpenAIChatClient(model_id="gpt-4.1-nano")), - name="worker", - ) - .register_executor( - lambda: Reviewer(id="reviewer", chat_client=OpenAIChatClient(model_id="gpt-4.1")), - name="reviewer", + .register_executors( + { + "worker": lambda: Worker(id="worker", chat_client=OpenAIChatClient(model_id="gpt-4.1-nano")), + "reviewer": lambda: Reviewer(id="reviewer", chat_client=OpenAIChatClient(model_id="gpt-4.1")), + } ) .add_edge("worker", "reviewer") # Worker sends responses to Reviewer .add_edge("reviewer", "worker") # Reviewer provides feedback to Worker diff --git a/python/samples/getting_started/workflows/checkpoint/checkpoint_with_human_in_the_loop.py b/python/samples/getting_started/workflows/checkpoint/checkpoint_with_human_in_the_loop.py index a2628592ea..257cb45cd1 100644 --- a/python/samples/getting_started/workflows/checkpoint/checkpoint_with_human_in_the_loop.py +++ b/python/samples/getting_started/workflows/checkpoint/checkpoint_with_human_in_the_loop.py @@ -186,8 +186,12 @@ def create_workflow(checkpoint_storage: FileCheckpointStorage) -> Workflow: ), name="writer", ) - .register_executor(lambda: ReviewGateway(id="review_gateway", writer_id="writer"), name="review_gateway") - .register_executor(lambda: BriefPreparer(id="prepare_brief", agent_id="writer"), name="prepare_brief") + .register_executors( + { + "review_gateway": lambda: ReviewGateway(id="review_gateway", writer_id="writer"), + "prepare_brief": lambda: BriefPreparer(id="prepare_brief", agent_id="writer"), + } + ) .set_start_executor("prepare_brief") .add_edge("prepare_brief", "writer") .add_edge("writer", "review_gateway") diff --git a/python/samples/getting_started/workflows/checkpoint/checkpoint_with_resume.py b/python/samples/getting_started/workflows/checkpoint/checkpoint_with_resume.py index bfa2484d63..4de3e7a6a7 100644 --- a/python/samples/getting_started/workflows/checkpoint/checkpoint_with_resume.py +++ b/python/samples/getting_started/workflows/checkpoint/checkpoint_with_resume.py @@ -103,8 +103,12 @@ async def main(): # Build workflow with checkpointing enabled workflow_builder = ( WorkflowBuilder() - .register_executor(lambda: StartExecutor(id="start"), name="start") - .register_executor(lambda: WorkerExecutor(id="worker"), name="worker") + .register_executors( + { + "start": lambda: StartExecutor(id="start"), + "worker": lambda: WorkerExecutor(id="worker"), + } + ) .set_start_executor("start") .add_edge("start", "worker") .add_edge("worker", "worker") # Self-loop for iterative processing diff --git a/python/samples/getting_started/workflows/checkpoint/sub_workflow_checkpoint.py b/python/samples/getting_started/workflows/checkpoint/sub_workflow_checkpoint.py index d35fd5e41f..93384d81a9 100644 --- a/python/samples/getting_started/workflows/checkpoint/sub_workflow_checkpoint.py +++ b/python/samples/getting_started/workflows/checkpoint/sub_workflow_checkpoint.py @@ -296,9 +296,7 @@ def build_sub_workflow() -> WorkflowExecutor: """Assemble the sub-workflow used by the parent workflow executor.""" sub_workflow = ( WorkflowBuilder() - .register_executor(DraftWriter, name="writer") - .register_executor(DraftReviewRouter, name="router") - .register_executor(DraftFinaliser, name="finaliser") + .register_executors({"writer": DraftWriter, "router": DraftReviewRouter, "finaliser": DraftFinaliser}) .set_start_executor("writer") .add_edge("writer", "router") .add_edge("router", "finaliser") @@ -313,8 +311,7 @@ def build_parent_workflow(storage: FileCheckpointStorage) -> Workflow: """Assemble the parent workflow that embeds the sub-workflow.""" return ( WorkflowBuilder() - .register_executor(LaunchCoordinator, name="coordinator") - .register_executor(build_sub_workflow, name="sub_executor") + .register_executors({"coordinator": LaunchCoordinator, "sub_executor": build_sub_workflow}) .set_start_executor("coordinator") .add_edge("coordinator", "sub_executor") .add_edge("sub_executor", "coordinator") diff --git a/python/samples/getting_started/workflows/composition/sub_workflow_basics.py b/python/samples/getting_started/workflows/composition/sub_workflow_basics.py index cb789850c4..e88d5b4c23 100644 --- a/python/samples/getting_started/workflows/composition/sub_workflow_basics.py +++ b/python/samples/getting_started/workflows/composition/sub_workflow_basics.py @@ -143,7 +143,7 @@ def create_sub_workflow() -> WorkflowExecutor: processing_workflow = ( WorkflowBuilder() - .register_executor(TextProcessor, name="text_processor") + .register_executors({"text_processor": TextProcessor}) .set_start_executor("text_processor") .build() ) @@ -157,8 +157,12 @@ async def main(): # Step 1: Create the parent workflow main_workflow = ( WorkflowBuilder() - .register_executor(TextProcessingOrchestrator, name="text_orchestrator") - .register_executor(create_sub_workflow, name="text_processor_workflow") + .register_executors( + { + "text_orchestrator": TextProcessingOrchestrator, + "text_processor_workflow": create_sub_workflow, + } + ) .set_start_executor("text_orchestrator") .add_edge("text_orchestrator", "text_processor_workflow") .add_edge("text_processor_workflow", "text_orchestrator") diff --git a/python/samples/getting_started/workflows/composition/sub_workflow_parallel_requests.py b/python/samples/getting_started/workflows/composition/sub_workflow_parallel_requests.py index dadb4325d9..e9162df5ef 100644 --- a/python/samples/getting_started/workflows/composition/sub_workflow_parallel_requests.py +++ b/python/samples/getting_started/workflows/composition/sub_workflow_parallel_requests.py @@ -172,10 +172,14 @@ async def collect(self, response: ResourceResponse | PolicyResponse, ctx: Workfl return ( WorkflowBuilder() - .register_executor(lambda: RequestDistribution("orchestrator"), name="orchestrator") - .register_executor(lambda: ResourceRequester("resource_requester"), name="resource_requester") - .register_executor(lambda: PolicyChecker("policy_checker"), name="policy_checker") - .register_executor(lambda: ResultCollector("result_collector"), name="result_collector") + .register_executors( + { + "orchestrator": lambda: RequestDistribution("orchestrator"), + "resource_requester": lambda: ResourceRequester("resource_requester"), + "policy_checker": lambda: PolicyChecker("policy_checker"), + "result_collector": lambda: ResultCollector("result_collector"), + } + ) .set_start_executor("orchestrator") .add_edge("orchestrator", "resource_requester") .add_edge("orchestrator", "policy_checker") @@ -291,18 +295,19 @@ async def main() -> None: # Build the main workflow main_workflow = ( WorkflowBuilder() - .register_executor(lambda: ResourceAllocator("resource_allocator"), name="resource_allocator") - .register_executor(lambda: PolicyEngine("policy_engine"), name="policy_engine") - .register_executor( - lambda: WorkflowExecutor( - build_resource_request_distribution_workflow(), - "sub_workflow_executor", - # Setting allow_direct_output=True to let the sub-workflow output directly. - # This is because the sub-workflow is the both the entry point and the exit - # point of the main workflow. - allow_direct_output=True, - ), - name="sub_workflow_executor", + .register_executors( + { + "resource_allocator": lambda: ResourceAllocator("resource_allocator"), + "policy_engine": lambda: PolicyEngine("policy_engine"), + "sub_workflow_executor": lambda: WorkflowExecutor( + build_resource_request_distribution_workflow(), + "sub_workflow_executor", + # Setting allow_direct_output=True to let the sub-workflow output directly. + # This is because the sub-workflow is the both the entry point and the exit + # point of the main workflow. + allow_direct_output=True, + ), + } ) .set_start_executor("sub_workflow_executor") .add_edge("sub_workflow_executor", "resource_allocator") diff --git a/python/samples/getting_started/workflows/composition/sub_workflow_request_interception.py b/python/samples/getting_started/workflows/composition/sub_workflow_request_interception.py index e21c74039a..defba8fd77 100644 --- a/python/samples/getting_started/workflows/composition/sub_workflow_request_interception.py +++ b/python/samples/getting_started/workflows/composition/sub_workflow_request_interception.py @@ -157,9 +157,13 @@ async def handle_domain_validation_response( # Build the workflow return ( WorkflowBuilder() - .register_executor(lambda: EmailSanitizer(id="email_sanitizer"), name="email_sanitizer") - .register_executor(lambda: EmailFormatValidator(id="email_format_validator"), name="email_format_validator") - .register_executor(lambda: DomainValidator(id="domain_validator"), name="domain_validator") + .register_executors( + { + "email_sanitizer": lambda: EmailSanitizer(id="email_sanitizer"), + "email_format_validator": lambda: EmailFormatValidator(id="email_format_validator"), + "domain_validator": lambda: DomainValidator(id="domain_validator"), + } + ) .set_start_executor("email_sanitizer") .add_edge("email_sanitizer", "email_format_validator") .add_edge("email_format_validator", "domain_validator") @@ -273,14 +277,17 @@ async def main() -> None: # Build the main workflow workflow = ( WorkflowBuilder() - .register_executor( - lambda: SmartEmailOrchestrator(id="smart_email_orchestrator", approved_domains=approved_domains), - name="smart_email_orchestrator", - ) - .register_executor(lambda: EmailDelivery(id="email_delivery"), name="email_delivery") - .register_executor( - lambda: WorkflowExecutor(build_email_address_validation_workflow(), id="email_validation_workflow"), - name="email_validation_workflow", + .register_executors( + { + "smart_email_orchestrator": lambda: SmartEmailOrchestrator( + id="smart_email_orchestrator", approved_domains=approved_domains + ), + "email_delivery": lambda: EmailDelivery(id="email_delivery"), + "email_validation_workflow": lambda: WorkflowExecutor( + build_email_address_validation_workflow(), + id="email_validation_workflow", + ), + } ) .set_start_executor("smart_email_orchestrator") .add_edge("smart_email_orchestrator", "email_validation_workflow") diff --git a/python/samples/getting_started/workflows/control-flow/edge_condition.py b/python/samples/getting_started/workflows/control-flow/edge_condition.py index 6d1a8ffb0f..6d58dc4382 100644 --- a/python/samples/getting_started/workflows/control-flow/edge_condition.py +++ b/python/samples/getting_started/workflows/control-flow/edge_condition.py @@ -167,9 +167,13 @@ async def main() -> None: WorkflowBuilder() .register_agent(create_spam_detector_agent, name="spam_detection_agent") .register_agent(create_email_assistant_agent, name="email_assistant_agent") - .register_executor(lambda: to_email_assistant_request, name="to_email_assistant_request") - .register_executor(lambda: handle_email_response, name="send_email") - .register_executor(lambda: handle_spam_classifier_response, name="handle_spam") + .register_executors( + { + "to_email_assistant_request": lambda: to_email_assistant_request, + "send_email": lambda: handle_email_response, + "handle_spam": lambda: handle_spam_classifier_response, + } + ) .set_start_executor("spam_detection_agent") # Not spam path: transform response -> request for assistant -> assistant -> send email .add_edge("spam_detection_agent", "to_email_assistant_request", condition=get_condition(False)) diff --git a/python/samples/getting_started/workflows/control-flow/multi_selection_edge_group.py b/python/samples/getting_started/workflows/control-flow/multi_selection_edge_group.py index 44385bffca..6967de717a 100644 --- a/python/samples/getting_started/workflows/control-flow/multi_selection_edge_group.py +++ b/python/samples/getting_started/workflows/control-flow/multi_selection_edge_group.py @@ -232,15 +232,19 @@ def select_targets(analysis: AnalysisResult, target_ids: list[str]) -> list[str] .register_agent(create_email_analysis_agent, name="email_analysis_agent") .register_agent(create_email_assistant_agent, name="email_assistant_agent") .register_agent(create_email_summary_agent, name="email_summary_agent") - .register_executor(lambda: store_email, name="store_email") - .register_executor(lambda: to_analysis_result, name="to_analysis_result") - .register_executor(lambda: submit_to_email_assistant, name="submit_to_email_assistant") - .register_executor(lambda: finalize_and_send, name="finalize_and_send") - .register_executor(lambda: summarize_email, name="summarize_email") - .register_executor(lambda: merge_summary, name="merge_summary") - .register_executor(lambda: handle_spam, name="handle_spam") - .register_executor(lambda: handle_uncertain, name="handle_uncertain") - .register_executor(lambda: database_access, name="database_access") + .register_executors( + { + "store_email": lambda: store_email, + "to_analysis_result": lambda: to_analysis_result, + "submit_to_email_assistant": lambda: submit_to_email_assistant, + "finalize_and_send": lambda: finalize_and_send, + "summarize_email": lambda: summarize_email, + "merge_summary": lambda: merge_summary, + "handle_spam": lambda: handle_spam, + "handle_uncertain": lambda: handle_uncertain, + "database_access": lambda: database_access, + } + ) ) workflow = ( diff --git a/python/samples/getting_started/workflows/control-flow/sequential_executors.py b/python/samples/getting_started/workflows/control-flow/sequential_executors.py index 0fedfcf1cd..582d9254e5 100644 --- a/python/samples/getting_started/workflows/control-flow/sequential_executors.py +++ b/python/samples/getting_started/workflows/control-flow/sequential_executors.py @@ -66,8 +66,12 @@ async def main() -> None: # Order matters. We connect upper_case_executor -> reverse_text_executor and set the start. workflow = ( WorkflowBuilder() - .register_executor(lambda: UpperCaseExecutor(id="upper_case_executor"), name="upper_case_executor") - .register_executor(lambda: ReverseTextExecutor(id="reverse_text_executor"), name="reverse_text_executor") + .register_executors( + { + "upper_case_executor": lambda: UpperCaseExecutor(id="upper_case_executor"), + "reverse_text_executor": lambda: ReverseTextExecutor(id="reverse_text_executor"), + } + ) .add_edge("upper_case_executor", "reverse_text_executor") .set_start_executor("upper_case_executor") .build() diff --git a/python/samples/getting_started/workflows/control-flow/sequential_streaming.py b/python/samples/getting_started/workflows/control-flow/sequential_streaming.py index ce7bc92758..2399b8d5a4 100644 --- a/python/samples/getting_started/workflows/control-flow/sequential_streaming.py +++ b/python/samples/getting_started/workflows/control-flow/sequential_streaming.py @@ -56,8 +56,12 @@ async def main(): # Order matters. upper_case_executor runs first, then reverse_text_executor. workflow = ( WorkflowBuilder() - .register_executor(lambda: to_upper_case, name="upper_case_executor") - .register_executor(lambda: reverse_text, name="reverse_text_executor") + .register_executors( + { + "upper_case_executor": lambda: to_upper_case, + "reverse_text_executor": lambda: reverse_text, + } + ) .add_edge("upper_case_executor", "reverse_text_executor") .set_start_executor("upper_case_executor") .build() diff --git a/python/samples/getting_started/workflows/control-flow/simple_loop.py b/python/samples/getting_started/workflows/control-flow/simple_loop.py index d458589123..35a5cbc450 100644 --- a/python/samples/getting_started/workflows/control-flow/simple_loop.py +++ b/python/samples/getting_started/workflows/control-flow/simple_loop.py @@ -130,10 +130,14 @@ async def main(): # This time we are creating a loop in the workflow. workflow = ( WorkflowBuilder() - .register_executor(lambda: GuessNumberExecutor((1, 100), "guess_number"), name="guess_number") + .register_executors( + { + "guess_number": lambda: GuessNumberExecutor((1, 100), "guess_number"), + "submit_judge": lambda: SubmitToJudgeAgent(judge_agent_id="judge_agent", target=30), + "parse_judge": lambda: ParseJudgeResponse(id="parse_judge"), + } + ) .register_agent(create_judge_agent, name="judge_agent") - .register_executor(lambda: SubmitToJudgeAgent(judge_agent_id="judge_agent", target=30), name="submit_judge") - .register_executor(lambda: ParseJudgeResponse(id="parse_judge"), name="parse_judge") .add_edge("guess_number", "submit_judge") .add_edge("submit_judge", "judge_agent") .add_edge("judge_agent", "parse_judge") diff --git a/python/samples/getting_started/workflows/control-flow/switch_case_edge_group.py b/python/samples/getting_started/workflows/control-flow/switch_case_edge_group.py index f2090e4acc..26f9cb35e2 100644 --- a/python/samples/getting_started/workflows/control-flow/switch_case_edge_group.py +++ b/python/samples/getting_started/workflows/control-flow/switch_case_edge_group.py @@ -184,12 +184,16 @@ async def main(): WorkflowBuilder() .register_agent(create_spam_detection_agent, name="spam_detection_agent") .register_agent(create_email_assistant_agent, name="email_assistant_agent") - .register_executor(lambda: store_email, name="store_email") - .register_executor(lambda: to_detection_result, name="to_detection_result") - .register_executor(lambda: submit_to_email_assistant, name="submit_to_email_assistant") - .register_executor(lambda: finalize_and_send, name="finalize_and_send") - .register_executor(lambda: handle_spam, name="handle_spam") - .register_executor(lambda: handle_uncertain, name="handle_uncertain") + .register_executors( + { + "store_email": lambda: store_email, + "to_detection_result": lambda: to_detection_result, + "submit_to_email_assistant": lambda: submit_to_email_assistant, + "finalize_and_send": lambda: finalize_and_send, + "handle_spam": lambda: handle_spam, + "handle_uncertain": lambda: handle_uncertain, + } + ) .set_start_executor("store_email") .add_edge("store_email", "spam_detection_agent") .add_edge("spam_detection_agent", "to_detection_result") diff --git a/python/samples/getting_started/workflows/control-flow/workflow_cancellation.py b/python/samples/getting_started/workflows/control-flow/workflow_cancellation.py index 2ebd5bd128..1c49dea346 100644 --- a/python/samples/getting_started/workflows/control-flow/workflow_cancellation.py +++ b/python/samples/getting_started/workflows/control-flow/workflow_cancellation.py @@ -52,9 +52,13 @@ def build_workflow(): """Build a simple 3-step sequential workflow (~6 seconds total).""" return ( WorkflowBuilder() - .register_executor(lambda: step1, name="step1") - .register_executor(lambda: step2, name="step2") - .register_executor(lambda: step3, name="step3") + .register_executors( + { + "step1": lambda: step1, + "step2": lambda: step2, + "step3": lambda: step3, + } + ) .add_edge("step1", "step2") .add_edge("step2", "step3") .set_start_executor("step1") diff --git a/python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py b/python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py index c49d2c1308..eab2a70a8f 100644 --- a/python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py +++ b/python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py @@ -232,11 +232,12 @@ async def main() -> None: workflow = ( WorkflowBuilder() .register_agent(create_email_writer_agent, name="email_writer") - .register_executor( - lambda: EmailPreprocessor(special_email_addresses={"mike@contoso.com"}), - name="email_preprocessor", + .register_executors( + { + "email_preprocessor": lambda: EmailPreprocessor(special_email_addresses={"mike@contoso.com"}), + "conclude_workflow": lambda: conclude_workflow, + } ) - .register_executor(lambda: conclude_workflow, name="conclude_workflow") .set_start_executor("email_preprocessor") .add_edge("email_preprocessor", "email_writer") .add_edge("email_writer", "conclude_workflow") diff --git a/python/samples/getting_started/workflows/human-in-the-loop/guessing_game_with_human_input.py b/python/samples/getting_started/workflows/human-in-the-loop/guessing_game_with_human_input.py index 52a9d72901..7c46acbce8 100644 --- a/python/samples/getting_started/workflows/human-in-the-loop/guessing_game_with_human_input.py +++ b/python/samples/getting_started/workflows/human-in-the-loop/guessing_game_with_human_input.py @@ -166,7 +166,7 @@ async def main() -> None: workflow = ( WorkflowBuilder() .register_agent(create_guessing_agent, name="guessing_agent") - .register_executor(lambda: TurnManager(id="turn_manager"), name="turn_manager") + .register_executors({"turn_manager": lambda: TurnManager(id="turn_manager")}) .set_start_executor("turn_manager") .add_edge("turn_manager", "guessing_agent") # Ask agent to make/adjust a guess .add_edge("guessing_agent", "turn_manager") # Agent's response comes back to coordinator diff --git a/python/samples/getting_started/workflows/parallelism/aggregate_results_of_different_types.py b/python/samples/getting_started/workflows/parallelism/aggregate_results_of_different_types.py index f59b1ea0c8..f991c4b6fe 100644 --- a/python/samples/getting_started/workflows/parallelism/aggregate_results_of_different_types.py +++ b/python/samples/getting_started/workflows/parallelism/aggregate_results_of_different_types.py @@ -75,10 +75,14 @@ async def main() -> None: # 1) Build a simple fan out and fan in workflow workflow = ( WorkflowBuilder() - .register_executor(lambda: Dispatcher(id="dispatcher"), name="dispatcher") - .register_executor(lambda: Average(id="average"), name="average") - .register_executor(lambda: Sum(id="summation"), name="summation") - .register_executor(lambda: Aggregator(id="aggregator"), name="aggregator") + .register_executors( + { + "dispatcher": lambda: Dispatcher(id="dispatcher"), + "average": lambda: Average(id="average"), + "summation": lambda: Sum(id="summation"), + "aggregator": lambda: Aggregator(id="aggregator"), + } + ) .set_start_executor("dispatcher") .add_fan_out_edges("dispatcher", ["average", "summation"]) .add_fan_in_edges(["average", "summation"], "aggregator") diff --git a/python/samples/getting_started/workflows/parallelism/fan_out_fan_in_edges.py b/python/samples/getting_started/workflows/parallelism/fan_out_fan_in_edges.py index 36c2ca24f6..3a3eb85789 100644 --- a/python/samples/getting_started/workflows/parallelism/fan_out_fan_in_edges.py +++ b/python/samples/getting_started/workflows/parallelism/fan_out_fan_in_edges.py @@ -133,8 +133,12 @@ async def main() -> None: .register_agent(create_researcher_agent, name="researcher") .register_agent(create_marketer_agent, name="marketer") .register_agent(create_legal_agent, name="legal") - .register_executor(lambda: DispatchToExperts(id="dispatcher"), name="dispatcher") - .register_executor(lambda: AggregateInsights(id="aggregator"), name="aggregator") + .register_executors( + { + "dispatcher": lambda: DispatchToExperts(id="dispatcher"), + "aggregator": lambda: AggregateInsights(id="aggregator"), + } + ) .set_start_executor("dispatcher") .add_fan_out_edges("dispatcher", ["researcher", "marketer", "legal"]) # Parallel branches .add_fan_in_edges(["researcher", "marketer", "legal"], "aggregator") # Join at the aggregator diff --git a/python/samples/getting_started/workflows/parallelism/map_reduce_and_visualization.py b/python/samples/getting_started/workflows/parallelism/map_reduce_and_visualization.py index d98c6cb78b..a7e9d1ab68 100644 --- a/python/samples/getting_started/workflows/parallelism/map_reduce_and_visualization.py +++ b/python/samples/getting_started/workflows/parallelism/map_reduce_and_visualization.py @@ -262,27 +262,25 @@ async def main(): """Construct the map reduce workflow, visualize it, then run it over a sample file.""" # Step 1: Create the workflow builder and register executors. - workflow_builder = ( - WorkflowBuilder() - .register_executor(lambda: Map(id="map_executor_0"), name="map_executor_0") - .register_executor(lambda: Map(id="map_executor_1"), name="map_executor_1") - .register_executor(lambda: Map(id="map_executor_2"), name="map_executor_2") - .register_executor( - lambda: Split(["map_executor_0", "map_executor_1", "map_executor_2"], id="split_data_executor"), - name="split_data_executor", - ) - .register_executor(lambda: Reduce(id="reduce_executor_0"), name="reduce_executor_0") - .register_executor(lambda: Reduce(id="reduce_executor_1"), name="reduce_executor_1") - .register_executor(lambda: Reduce(id="reduce_executor_2"), name="reduce_executor_2") - .register_executor(lambda: Reduce(id="reduce_executor_3"), name="reduce_executor_3") - .register_executor( - lambda: Shuffle( + workflow_builder = WorkflowBuilder().register_executors( + { + "map_executor_0": lambda: Map(id="map_executor_0"), + "map_executor_1": lambda: Map(id="map_executor_1"), + "map_executor_2": lambda: Map(id="map_executor_2"), + "split_data_executor": lambda: Split( + ["map_executor_0", "map_executor_1", "map_executor_2"], + id="split_data_executor", + ), + "reduce_executor_0": lambda: Reduce(id="reduce_executor_0"), + "reduce_executor_1": lambda: Reduce(id="reduce_executor_1"), + "reduce_executor_2": lambda: Reduce(id="reduce_executor_2"), + "reduce_executor_3": lambda: Reduce(id="reduce_executor_3"), + "shuffle_executor": lambda: Shuffle( ["reduce_executor_0", "reduce_executor_1", "reduce_executor_2", "reduce_executor_3"], id="shuffle_executor", ), - name="shuffle_executor", - ) - .register_executor(lambda: CompletionExecutor(id="completion_executor"), name="completion_executor") + "completion_executor": lambda: CompletionExecutor(id="completion_executor"), + } ) # Step 2: Build the workflow graph using fan out and fan in edges. diff --git a/python/samples/getting_started/workflows/state-management/shared_states_with_agents.py b/python/samples/getting_started/workflows/state-management/shared_states_with_agents.py index 700dcb1b95..b72abe301b 100644 --- a/python/samples/getting_started/workflows/state-management/shared_states_with_agents.py +++ b/python/samples/getting_started/workflows/state-management/shared_states_with_agents.py @@ -194,11 +194,15 @@ async def main() -> None: WorkflowBuilder() .register_agent(create_spam_detection_agent, name="spam_detection_agent") .register_agent(create_email_assistant_agent, name="email_assistant_agent") - .register_executor(lambda: store_email, name="store_email") - .register_executor(lambda: to_detection_result, name="to_detection_result") - .register_executor(lambda: submit_to_email_assistant, name="submit_to_email_assistant") - .register_executor(lambda: finalize_and_send, name="finalize_and_send") - .register_executor(lambda: handle_spam, name="handle_spam") + .register_executors( + { + "store_email": lambda: store_email, + "to_detection_result": lambda: to_detection_result, + "submit_to_email_assistant": lambda: submit_to_email_assistant, + "finalize_and_send": lambda: finalize_and_send, + "handle_spam": lambda: handle_spam, + } + ) .set_start_executor("store_email") .add_edge("store_email", "spam_detection_agent") .add_edge("spam_detection_agent", "to_detection_result") diff --git a/python/samples/getting_started/workflows/visualization/concurrent_with_visualization.py b/python/samples/getting_started/workflows/visualization/concurrent_with_visualization.py index 877bb13038..1cb98d6744 100644 --- a/python/samples/getting_started/workflows/visualization/concurrent_with_visualization.py +++ b/python/samples/getting_started/workflows/visualization/concurrent_with_visualization.py @@ -129,8 +129,12 @@ async def main() -> None: .register_agent(create_researcher_agent, name="researcher") .register_agent(create_marketer_agent, name="marketer") .register_agent(create_legal_agent, name="legal") - .register_executor(lambda: DispatchToExperts(id="dispatcher"), name="dispatcher") - .register_executor(lambda: AggregateInsights(id="aggregator"), name="aggregator") + .register_executors( + { + "dispatcher": lambda: DispatchToExperts(id="dispatcher"), + "aggregator": lambda: AggregateInsights(id="aggregator"), + } + ) .set_start_executor("dispatcher") .add_fan_out_edges("dispatcher", ["researcher", "marketer", "legal"]) .add_fan_in_edges(["researcher", "marketer", "legal"], "aggregator") From b89ebe74950c78229af61f8db116ed7a72375bd2 Mon Sep 17 00:00:00 2001 From: Bjoern Holtvogt Date: Thu, 5 Feb 2026 10:58:32 +0100 Subject: [PATCH 04/14] Fix test to use new register_executors API --- .../packages/core/tests/workflow/test_workflow_builder.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/python/packages/core/tests/workflow/test_workflow_builder.py b/python/packages/core/tests/workflow/test_workflow_builder.py index aa829674f0..fb90d6c981 100644 --- a/python/packages/core/tests/workflow/test_workflow_builder.py +++ b/python/packages/core/tests/workflow/test_workflow_builder.py @@ -538,8 +538,10 @@ def test_with_output_from_with_registered_names(): """Test with_output_from with registered factory names (strings).""" workflow = ( WorkflowBuilder() - .register_executor(lambda: MockExecutor(id="ExecutorA"), name="ExecutorAFactory") - .register_executor(lambda: MockExecutor(id="ExecutorB"), name="ExecutorBFactory") + .register_executors({ + "ExecutorAFactory": lambda: MockExecutor(id="ExecutorA"), + "ExecutorBFactory": lambda: MockExecutor(id="ExecutorB"), + }) .set_start_executor("ExecutorAFactory") .add_edge("ExecutorAFactory", "ExecutorBFactory") .with_output_from(["ExecutorBFactory"]) From f81f1ba955e4f1b8bb6b5193af8396bc2b34881b Mon Sep 17 00:00:00 2001 From: Bjoern Holtvogt Date: Thu, 5 Feb 2026 16:31:41 +0100 Subject: [PATCH 05/14] Add support for async executor factories Introduces build_async() to support async executor factories while minimizing code duplication. The sync and async build paths differ only in how they instantiate executors (sync raises on awaitables, async awaits them). All other logic (validation, edge resolution, workflow creation) is extracted into shared helpers to ensure maintainability and avoid duplicating the logic. --- .../_workflows/_workflow_builder.py | 368 +++++++++++++----- .../tests/workflow/test_workflow_builder.py | 47 +++ 2 files changed, 328 insertions(+), 87 deletions(-) diff --git a/python/packages/core/agent_framework/_workflows/_workflow_builder.py b/python/packages/core/agent_framework/_workflows/_workflow_builder.py index bac49febbd..04e4847ae2 100644 --- a/python/packages/core/agent_framework/_workflows/_workflow_builder.py +++ b/python/packages/core/agent_framework/_workflows/_workflow_builder.py @@ -1,8 +1,9 @@ # Copyright (c) Microsoft. All rights reserved. +import inspect import logging import sys -from collections.abc import Callable, Sequence +from collections.abc import Awaitable, Callable, Sequence from dataclasses import dataclass from typing import Any @@ -186,7 +187,7 @@ def __init__( | _MultiSelectionEdgeGroupRegistration | _FanInEdgeRegistration ] = [] - self._executor_registry: dict[str, Callable[[], Executor]] = {} + self._executor_registry: dict[str, Callable[[], Executor | Awaitable[Executor]]] = {} # Output executors filter; if set, only outputs from these executors are yielded self._output_executors: list[Executor | AgentProtocol | str] = [] @@ -249,7 +250,7 @@ def _maybe_wrap_agent(self, candidate: Executor | AgentProtocol) -> Executor: f"WorkflowBuilder expected an Executor or AgentProtocol instance; got {type(candidate).__name__}." ) - def register_executors(self, executor_factories: dict[str, Callable[[], Executor]]) -> Self: + def register_executors(self, executor_factories: dict[str, Callable[[], Executor | Awaitable[Executor]]]) -> Self: """Register multiple executor factory functions for lazy initialization. This method allows you to register multiple factory functions at once. Each @@ -1092,62 +1093,43 @@ def with_output_from(self, executors: list[Executor | AgentProtocol | str]) -> S self._output_executors = list(executors) return self - def _resolve_edge_registry(self) -> tuple[Executor, dict[str, Executor], list[EdgeGroup]]: - """Resolve deferred edge registrations into executors and edge groups. + def _resolve_edge_registrations(self, factory_name_to_instance: dict[str, Executor]) -> list[EdgeGroup]: + """Process edge registrations into edge groups using resolved executor instances. - Returns: - tuple: A tuple containing: - - The starting Executor instance. - - A dictionary mapping registered factory names to resolved Executor instances. - - A list of EdgeGroup instances representing the workflow edges composed of resolved executors. + Args: + factory_name_to_instance: Mapping of registered factory names to executor instances. - Notes: - Non-factory executors (i.e., those added directly) are not included in the returned list, - as they are already part of the workflow builder's internal state. + Returns: + list[EdgeGroup]: A list of EdgeGroup instances for the registered edges. """ - if not self._start_executor: - raise ValueError("Starting executor must be set using set_start_executor before building the workflow.") - - start_executor: Executor | None = None - if isinstance(self._start_executor, Executor): - start_executor = self._start_executor - # Maps registered factory names to created executor instances for edge resolution - factory_name_to_instance: dict[str, Executor] = {} - # Maps executor IDs to created executor instances to prevent duplicates - executor_id_to_instance: dict[str, Executor] = {} - deferred_edge_groups: list[EdgeGroup] = [] - for name, exec_factory in self._executor_registry.items(): - instance = exec_factory() - if instance.id in executor_id_to_instance: - raise ValueError(f"Executor with ID '{instance.id}' has already been registered.") - if instance.id in self._executors: - raise ValueError(f"Executor ID collision: An executor with ID '{instance.id}' already exists.") - executor_id_to_instance[instance.id] = instance + def _get_executor(name: str) -> Executor: + """Helper to get executor by the registered name. - if isinstance(self._start_executor, str) and name == self._start_executor: - start_executor = instance + Args: + name: The registered name of the executor to get. - # All executors will get their own internal edge group for receiving system messages - deferred_edge_groups.append(InternalEdgeGroup(instance.id)) # type: ignore[call-arg] - factory_name_to_instance[name] = instance + Returns: + Executor: The Executor instance. - def _get_executor(name: str) -> Executor: - """Helper to get executor by the registered name. Raises if not found.""" + Raises: + ValueError: If the executor is not found. + """ if name not in factory_name_to_instance: raise ValueError(f"Factory '{name}' has not been registered.") return factory_name_to_instance[name] + edge_groups: list[EdgeGroup] = [] for registration in self._edge_registry: match registration: case _EdgeRegistration(source, target, condition): source_exec: Executor = _get_executor(source) target_exec: Executor = _get_executor(target) - deferred_edge_groups.append(SingleEdgeGroup(source_exec.id, target_exec.id, condition)) # type: ignore[call-arg] + edge_groups.append(SingleEdgeGroup(source_exec.id, target_exec.id, condition)) # type: ignore[call-arg] case _FanOutEdgeRegistration(source, targets): source_exec = _get_executor(source) target_execs = [_get_executor(t) for t in targets] - deferred_edge_groups.append(FanOutEdgeGroup(source_exec.id, [t.id for t in target_execs])) # type: ignore[call-arg] + edge_groups.append(FanOutEdgeGroup(source_exec.id, [t.id for t in target_execs])) # type: ignore[call-arg] case _SwitchCaseEdgeGroupRegistration(source, cases): source_exec = _get_executor(source) cases_converted: list[SwitchCaseEdgeGroupCase | SwitchCaseEdgeGroupDefault] = [] @@ -1161,22 +1143,207 @@ def _get_executor(name: str) -> Executor: cases_converted.append( SwitchCaseEdgeGroupCase(condition=case.condition, target_id=target_exec.id) ) - deferred_edge_groups.append(SwitchCaseEdgeGroup(source_exec.id, cases_converted)) # type: ignore[call-arg] + edge_groups.append(SwitchCaseEdgeGroup(source_exec.id, cases_converted)) # type: ignore[call-arg] case _MultiSelectionEdgeGroupRegistration(source, targets, selection_func): source_exec = _get_executor(source) target_execs = [_get_executor(t) for t in targets] - deferred_edge_groups.append( + edge_groups.append( FanOutEdgeGroup(source_exec.id, [t.id for t in target_execs], selection_func) # type: ignore[call-arg] ) case _FanInEdgeRegistration(sources, target): source_execs = [_get_executor(s) for s in sources] target_exec = _get_executor(target) - deferred_edge_groups.append(FanInEdgeGroup([s.id for s in source_execs], target_exec.id)) # type: ignore[call-arg] + edge_groups.append(FanInEdgeGroup([s.id for s in source_execs], target_exec.id)) # type: ignore[call-arg] + return edge_groups + + def _process_instantiated_executors( + self, + factory_name_to_instance: dict[str, Executor], + ) -> tuple[Executor, dict[str, Executor], list[EdgeGroup]]: + """Process instantiated executors: validate IDs, create edge groups, resolve edges. + + Args: + factory_name_to_instance: Mapping of factory names to already-instantiated executors. + + Returns: + tuple: A tuple containing: + - The starting Executor instance. + - The factory_name_to_instance mapping. + - A list of EdgeGroup instances representing the workflow edges. + + Raises: + ValueError: If start executor is not set, if duplicate executor IDs are found, + or if start executor cannot be resolved. + """ + if not self._start_executor: + raise ValueError("Starting executor must be set using set_start_executor before building the workflow.") + + start_executor: Executor | None = None + if isinstance(self._start_executor, Executor): + start_executor = self._start_executor + + deferred_edge_groups: list[EdgeGroup] = [] + executor_ids_seen: set[str] = set() + + for name, instance in factory_name_to_instance.items(): + if instance.id in executor_ids_seen: + raise ValueError(f"Executor with ID '{instance.id}' has already been registered.") + if instance.id in self._executors: + raise ValueError(f"Executor ID collision: An executor with ID '{instance.id}' already exists.") + executor_ids_seen.add(instance.id) + + if isinstance(self._start_executor, str) and name == self._start_executor: + start_executor = instance + + # All executors will get their own internal edge group for receiving system messages + deferred_edge_groups.append(InternalEdgeGroup(instance.id)) # type: ignore[call-arg] + + # Process edge registrations using shared helper + deferred_edge_groups.extend(self._resolve_edge_registrations(factory_name_to_instance)) + if start_executor is None: raise ValueError("Failed to resolve starting executor from registered factories.") return (start_executor, factory_name_to_instance, deferred_edge_groups) + def _resolve_edge_registry(self) -> tuple[Executor, dict[str, Executor], list[EdgeGroup]]: + """Resolve deferred edge registrations into executors and edge groups. + + Factory functions are called synchronously to instantiate executors. If any factory + returns an awaitable, an error is raised directing the caller to use :meth:`build_async` + instead. + + Returns: + tuple: A tuple containing: + - The starting Executor instance. + - A dictionary mapping registered factory names to resolved Executor instances. + - A list of EdgeGroup instances representing the workflow edges composed of resolved executors. + + Raises: + ValueError: If an async executor factory is detected. + + Notes: + Non-factory executors (i.e., those added directly) are not included in the returned list, + as they are already part of the workflow builder's internal state. + """ + factory_name_to_instance: dict[str, Executor] = {} + for name, executor_factory in self._executor_registry.items(): + instance = executor_factory() + if inspect.isawaitable(instance): + # Close un-awaited coroutines to avoid runtime warnings or memory leaks + if hasattr(instance, "close"): + instance.close() # type: ignore[reportGeneralTypeIssues] + raise ValueError("Async executor factories were detected. Use build_async() instead.") + factory_name_to_instance[name] = instance + + return self._process_instantiated_executors(factory_name_to_instance) + + async def _resolve_edge_registry_async(self) -> tuple[Executor, dict[str, Executor], list[EdgeGroup]]: + """Resolve deferred edge registrations with support for async executor factories. + + This async variant of :meth:`_resolve_edge_registry` allows executor factories to + return awaitables. Factory functions are called and any awaitable results are awaited + before processing. All other logic (validation, edge resolution) is identical to the + sync version. + + Returns: + tuple: A tuple containing: + - The starting Executor instance. + - A dictionary mapping registered factory names to resolved Executor instances. + - A list of EdgeGroup instances representing the workflow edges composed of resolved executors. + + Notes: + Non-factory executors (i.e., those added directly) are not included in the returned list, + as they are already part of the workflow builder's internal state. + + See Also: + :meth:`_resolve_edge_registry`: Synchronous version that raises an error on async factories. + """ + factory_name_to_instance: dict[str, Executor] = {} + for name, executor_factory in self._executor_registry.items(): + instance = executor_factory() + if inspect.isawaitable(instance): + # Handle maybe awaitable executor instances + instance = await instance + factory_name_to_instance[name] = instance + + return self._process_instantiated_executors(factory_name_to_instance) + + def _create_workflow_from_resolved_registry( + self, + span: Any, + start_executor: Executor, + deferred_executors: dict[str, Executor], + deferred_edge_groups: list[EdgeGroup], + ) -> Workflow: + """Create and validate a Workflow instance from resolved executor registry data. + + This is a shared helper used by both :meth:`build` and :meth:`build_async` to avoid + code duplication in the workflow creation and validation logic. + + Args: + span: The OpenTelemetry span for tracking the build process. + start_executor: The resolved starting executor instance. + deferred_executors: Mapping of factory names to instantiated executors. + deferred_edge_groups: List of edge groups from resolved registrations. + + Returns: + Workflow: A fully constructed and validated Workflow instance. + + Raises: + WorkflowValidationError: If workflow validation fails. + """ + executors = self._executors | {exe.id: exe for exe in deferred_executors.values()} + edge_groups = self._edge_groups + deferred_edge_groups + output_executors = ( + [ + deferred_executors[factory_name].id + for factory_name in self._output_executors + if isinstance(factory_name, str) + ] + + [ex.id for ex in self._output_executors if isinstance(ex, Executor)] + + [resolve_agent_id(agent) for agent in self._output_executors if isinstance(agent, AgentProtocol)] + ) + + # Perform validation before creating the workflow + validate_workflow_graph( + edge_groups, + executors, + start_executor, + output_executors, + ) + + # Add validation completed event + span.add_event(OtelAttr.BUILD_VALIDATION_COMPLETED) + + context = InProcRunnerContext(self._checkpoint_storage) + + # Create workflow instance after validation + workflow = Workflow( + edge_groups, + executors, + start_executor, + context, + self._max_iterations, + name=self._name, + description=self._description, + output_executors=output_executors, + ) + build_attributes: dict[str, Any] = { + OtelAttr.WORKFLOW_ID: workflow.id, + OtelAttr.WORKFLOW_DEFINITION: workflow.to_json(), + } + if workflow.name: + build_attributes[OtelAttr.WORKFLOW_NAME] = workflow.name + if workflow.description: + build_attributes[OtelAttr.WORKFLOW_DESCRIPTION] = workflow.description + span.set_attributes(build_attributes) + + # Add workflow build completed event + span.add_event(OtelAttr.BUILD_COMPLETED) + + return workflow + def build(self) -> Workflow: """Build and return the constructed workflow. @@ -1231,57 +1398,84 @@ async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None: # Resolve lazy edge registrations start_executor, deferred_executors, deferred_edge_groups = self._resolve_edge_registry() - executors = self._executors | {exe.id: exe for exe in deferred_executors.values()} - edge_groups = self._edge_groups + deferred_edge_groups - output_executors = ( - [ - deferred_executors[factory_name].id - for factory_name in self._output_executors - if isinstance(factory_name, str) - ] - + [ex.id for ex in self._output_executors if isinstance(ex, Executor)] - + [resolve_agent_id(agent) for agent in self._output_executors if isinstance(agent, AgentProtocol)] - ) - # Perform validation before creating the workflow - validate_workflow_graph( - edge_groups, - executors, - start_executor, - output_executors, + # Create and validate workflow + return self._create_workflow_from_resolved_registry( + span, start_executor, deferred_executors, deferred_edge_groups ) + except Exception as exc: + attributes = { + OtelAttr.BUILD_ERROR_MESSAGE: str(exc), + OtelAttr.BUILD_ERROR_TYPE: type(exc).__name__, + } + span.add_event(OtelAttr.BUILD_ERROR, attributes) # type: ignore[reportArgumentType, arg-type] + capture_exception(span, exc) + raise + + async def build_async(self) -> Workflow: + """Build and return the constructed workflow with async executor factory support. + + This async version of :meth:`build` supports executor factories that return + awaitables (coroutines). Use this method when any of your registered executor + factories are async functions. - # Add validation completed event - span.add_event(OtelAttr.BUILD_VALIDATION_COMPLETED) - - context = InProcRunnerContext(self._checkpoint_storage) - - # Create workflow instance after validation - workflow = Workflow( - edge_groups, - executors, - start_executor, - context, - self._max_iterations, - name=self._name, - description=self._description, - output_executors=output_executors, + This method performs validation before building the workflow to ensure: + - A starting executor has been set + - All edges connect valid executors + - The graph is properly connected + - Type compatibility between connected executors + + Returns: + Workflow: An immutable Workflow instance ready for execution. + + Raises: + ValueError: If starting executor is not set. + WorkflowValidationError: If workflow validation fails (includes EdgeDuplicationError, + TypeCompatibilityError, and GraphConnectivityError subclasses). + + Example: + .. code-block:: python + + from typing_extensions import Never + from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler + + + class MyExecutor(Executor): + @handler + async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None: + await ctx.yield_output(text.upper()) + + + async def create_executor() -> MyExecutor: + # Async initialization (e.g., fetching config, establishing connections) + return MyExecutor(id="executor") + + + # Build with async executor factories + workflow = await ( + WorkflowBuilder() + .register_executors({"MyExecutor": create_executor}) + .set_start_executor("MyExecutor") + .build_async() ) - build_attributes: dict[str, Any] = { - OtelAttr.WORKFLOW_ID: workflow.id, - OtelAttr.WORKFLOW_DEFINITION: workflow.to_json(), - } - if workflow.name: - build_attributes[OtelAttr.WORKFLOW_NAME] = workflow.name - if workflow.description: - build_attributes[OtelAttr.WORKFLOW_DESCRIPTION] = workflow.description - span.set_attributes(build_attributes) - # Add workflow build completed event - span.add_event(OtelAttr.BUILD_COMPLETED) + # The workflow is now immutable and ready to run + events = await workflow.run("hello") + print(events.get_outputs()) # ['HELLO'] + """ + # Create workflow build span that includes validation and workflow creation + with create_workflow_span(OtelAttr.WORKFLOW_BUILD_SPAN) as span: + try: + # Add workflow build started event + span.add_event(OtelAttr.BUILD_STARTED) - return workflow + # Resolve lazy edge registrations with support for async executor factories + start_executor, deferred_executors, deferred_edge_groups = await self._resolve_edge_registry_async() + # Create and validate workflow + return self._create_workflow_from_resolved_registry( + span, start_executor, deferred_executors, deferred_edge_groups + ) except Exception as exc: attributes = { OtelAttr.BUILD_ERROR_MESSAGE: str(exc), diff --git a/python/packages/core/tests/workflow/test_workflow_builder.py b/python/packages/core/tests/workflow/test_workflow_builder.py index fb90d6c981..3c2d54078f 100644 --- a/python/packages/core/tests/workflow/test_workflow_builder.py +++ b/python/packages/core/tests/workflow/test_workflow_builder.py @@ -134,6 +134,53 @@ def test_add_agent_duplicate_id_raises_error(): builder.set_start_executor(agent1).add_edge(agent1, agent2).build() +# Tests for build() with async executor factories + + +async def test_build_async_with_async_executor_factories(): + """Test that build_async() works with async executor factories.""" + + async def create_executor_a() -> MockExecutor: + return MockExecutor(id="executor_a") + + async def create_executor_b() -> MockExecutor: + return MockExecutor(id="executor_b") + + # Build workflow with async executor factories + workflow = await ( + WorkflowBuilder() + .register_executors({ + "ExecutorA": create_executor_a, + "ExecutorB": create_executor_b, + }) + .set_start_executor("ExecutorA") + .add_edge("ExecutorA", "ExecutorB") + .build_async() + ) + + assert workflow.start_executor_id == "executor_a" + assert len(workflow.executors) == 2 + assert "executor_a" in workflow.executors + assert "executor_b" in workflow.executors + + +def test_build_raises_error_with_async_executor_factories(): + """Test that build() raises ValueError when async executor factories are detected.""" + + async def create_async_executor() -> MockExecutor: + return MockExecutor(id="executor_async") + + builder = ( + WorkflowBuilder() + .register_executors({"AsyncExecutor": create_async_executor}) + .set_start_executor("AsyncExecutor") + ) + + # Attempting to build synchronously with async factories should raise an error + with pytest.raises(ValueError, match="Async executor factories were detected."): + builder.build() + + # Tests for new executor registration patterns From 4082efbbfd4567697a31a958bafcab1b2b1bb5c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Holtvogt?= Date: Fri, 6 Feb 2026 10:01:31 +0100 Subject: [PATCH 06/14] Update error message on empty factory names Co-authored-by: Evan Mattson <35585003+moonbox3@users.noreply.github.com> --- .../core/agent_framework/_workflows/_workflow_builder.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/packages/core/agent_framework/_workflows/_workflow_builder.py b/python/packages/core/agent_framework/_workflows/_workflow_builder.py index 04e4847ae2..02f201800e 100644 --- a/python/packages/core/agent_framework/_workflows/_workflow_builder.py +++ b/python/packages/core/agent_framework/_workflows/_workflow_builder.py @@ -298,7 +298,7 @@ async def process(self, text: str, ctx: WorkflowContext[str]) -> None: for name, factory_function in executor_factories.items(): if not name or not name.strip(): - raise ValueError("Executor factory name cannot be empty.") + raise ValueError("Executor factory name cannot be empty or whitespace-only") if not callable(factory_function): raise TypeError(f"Executor factory for '{name}' must be callable.") if name in self._executor_registry: From 52d9070b78bb288f723969f53ee254d936504363 Mon Sep 17 00:00:00 2001 From: Bjoern Holtvogt Date: Fri, 6 Feb 2026 14:58:33 +0100 Subject: [PATCH 07/14] Use trace.Span for type safety --- .../core/agent_framework/_workflows/_workflow_builder.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/packages/core/agent_framework/_workflows/_workflow_builder.py b/python/packages/core/agent_framework/_workflows/_workflow_builder.py index 02f201800e..a809ffb869 100644 --- a/python/packages/core/agent_framework/_workflows/_workflow_builder.py +++ b/python/packages/core/agent_framework/_workflows/_workflow_builder.py @@ -7,6 +7,8 @@ from dataclasses import dataclass from typing import Any +from opentelemetry import trace + from .._agents import AgentProtocol from .._threads import AgentThread from ..observability import OtelAttr, capture_exception, create_workflow_span @@ -1271,7 +1273,7 @@ async def _resolve_edge_registry_async(self) -> tuple[Executor, dict[str, Execut def _create_workflow_from_resolved_registry( self, - span: Any, + span: trace.Span, start_executor: Executor, deferred_executors: dict[str, Executor], deferred_edge_groups: list[EdgeGroup], From 7c2d7fa5c99ce9d99d5423d08cecbad4934a9913 Mon Sep 17 00:00:00 2001 From: Bjoern Holtvogt Date: Fri, 6 Feb 2026 15:02:32 +0100 Subject: [PATCH 08/14] Guarantee ValueError despite close() failures --- .../core/agent_framework/_workflows/_workflow_builder.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/python/packages/core/agent_framework/_workflows/_workflow_builder.py b/python/packages/core/agent_framework/_workflows/_workflow_builder.py index a809ffb869..4ae487b4be 100644 --- a/python/packages/core/agent_framework/_workflows/_workflow_builder.py +++ b/python/packages/core/agent_framework/_workflows/_workflow_builder.py @@ -1233,9 +1233,11 @@ def _resolve_edge_registry(self) -> tuple[Executor, dict[str, Executor], list[Ed instance = executor_factory() if inspect.isawaitable(instance): # Close un-awaited coroutines to avoid runtime warnings or memory leaks - if hasattr(instance, "close"): - instance.close() # type: ignore[reportGeneralTypeIssues] - raise ValueError("Async executor factories were detected. Use build_async() instead.") + try: + if hasattr(instance, "close"): + instance.close() # type: ignore[reportGeneralTypeIssues] + finally: + raise ValueError("Async executor factories were detected. Use build_async() instead.") factory_name_to_instance[name] = instance return self._process_instantiated_executors(factory_name_to_instance) From 081a435a1ade15b9c9896bae78f33bd752fd4285 Mon Sep 17 00:00:00 2001 From: Bjoern Holtvogt Date: Fri, 6 Feb 2026 16:00:28 +0100 Subject: [PATCH 09/14] Enhance error messages when registering executors For duplicate IDs, the new messages specify the conflicting factories, improving debugging. Added tests to verify that conflicts between factory-produced and directly added executors raise appropriate errors. --- .../_workflows/_workflow_builder.py | 15 ++++++++----- .../tests/workflow/test_workflow_builder.py | 21 ++++++++++++++++++- 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/python/packages/core/agent_framework/_workflows/_workflow_builder.py b/python/packages/core/agent_framework/_workflows/_workflow_builder.py index 4ae487b4be..a5a052de3f 100644 --- a/python/packages/core/agent_framework/_workflows/_workflow_builder.py +++ b/python/packages/core/agent_framework/_workflows/_workflow_builder.py @@ -1185,14 +1185,19 @@ def _process_instantiated_executors( start_executor = self._start_executor deferred_edge_groups: list[EdgeGroup] = [] - executor_ids_seen: set[str] = set() + # Map of executor IDs to their factory names for error message clarity + executor_id_to_factory_name: dict[str, str] = {} for name, instance in factory_name_to_instance.items(): - if instance.id in executor_ids_seen: - raise ValueError(f"Executor with ID '{instance.id}' has already been registered.") + if instance.id in executor_id_to_factory_name: + existing_factory = executor_id_to_factory_name[instance.id] + raise ValueError( + f"Executor with ID '{instance.id}' from factory '{name}' " + f"conflicts with existing factory '{existing_factory}'." + ) if instance.id in self._executors: - raise ValueError(f"Executor ID collision: An executor with ID '{instance.id}' already exists.") - executor_ids_seen.add(instance.id) + raise ValueError(f"Executor with ID '{instance.id}' has already been added to the workflow.") + executor_id_to_factory_name[instance.id] = name if isinstance(self._start_executor, str) and name == self._start_executor: start_executor = instance diff --git a/python/packages/core/tests/workflow/test_workflow_builder.py b/python/packages/core/tests/workflow/test_workflow_builder.py index 3c2d54078f..1b8dba7457 100644 --- a/python/packages/core/tests/workflow/test_workflow_builder.py +++ b/python/packages/core/tests/workflow/test_workflow_builder.py @@ -285,7 +285,26 @@ def test_register_duplicate_id_raises_error(): builder.set_start_executor("MyExecutor1") # Registering second executor with same ID should raise ValueError - with pytest.raises(ValueError, match="Executor with ID 'executor' has already been registered."): + with pytest.raises( + ValueError, + match=r"Executor with ID 'executor' from factory 'MyExecutor2' conflicts with existing factory 'MyExecutor1'\.", + ): + builder.build() + + +def test_register_factory_conflicts_with_direct_executor(): + """Test that a factory-produced executor conflicting with a directly-added executor raises an error.""" + # Add an executor directly via add_edge + direct_executor = MockExecutor(id="executor") + builder = WorkflowBuilder() + builder.add_edge(direct_executor, direct_executor) # Add it to self._executors + + # Now register a factory that produces the same ID + builder.register_executors({"MyFactory": lambda: MockExecutor(id="executor")}) + builder.set_start_executor("MyFactory") + + # Building should raise ValueError + with pytest.raises(ValueError, match=r"Executor with ID 'executor' has already been added to the workflow\."): builder.build() From d086aa2a98ad8ea8705623245691e0f7b36ea75f Mon Sep 17 00:00:00 2001 From: Bjoern Holtvogt Date: Fri, 6 Feb 2026 16:14:12 +0100 Subject: [PATCH 10/14] Add type checks for executor factories --- .../_workflows/_workflow_builder.py | 8 +++++ .../tests/workflow/test_workflow_builder.py | 34 +++++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/python/packages/core/agent_framework/_workflows/_workflow_builder.py b/python/packages/core/agent_framework/_workflows/_workflow_builder.py index a5a052de3f..d2132c8d7f 100644 --- a/python/packages/core/agent_framework/_workflows/_workflow_builder.py +++ b/python/packages/core/agent_framework/_workflows/_workflow_builder.py @@ -1243,6 +1243,10 @@ def _resolve_edge_registry(self) -> tuple[Executor, dict[str, Executor], list[Ed instance.close() # type: ignore[reportGeneralTypeIssues] finally: raise ValueError("Async executor factories were detected. Use build_async() instead.") + + if not isinstance(instance, Executor): + raise TypeError(f"Factory '{name}' returned {type(instance).__name__} instead of an Executor.") + factory_name_to_instance[name] = instance return self._process_instantiated_executors(factory_name_to_instance) @@ -1274,6 +1278,10 @@ async def _resolve_edge_registry_async(self) -> tuple[Executor, dict[str, Execut if inspect.isawaitable(instance): # Handle maybe awaitable executor instances instance = await instance + + if not isinstance(instance, Executor): + raise TypeError(f"Factory '{name}' returned {type(instance).__name__} instead of an Executor.") + factory_name_to_instance[name] = instance return self._process_instantiated_executors(factory_name_to_instance) diff --git a/python/packages/core/tests/workflow/test_workflow_builder.py b/python/packages/core/tests/workflow/test_workflow_builder.py index 1b8dba7457..610018fd4e 100644 --- a/python/packages/core/tests/workflow/test_workflow_builder.py +++ b/python/packages/core/tests/workflow/test_workflow_builder.py @@ -181,6 +181,40 @@ async def create_async_executor() -> MockExecutor: builder.build() +def test_build_raises_error_when_factory_returns_non_executor(): + """Test that build() raises TypeError when a factory returns a non-Executor type.""" + + def create_invalid_executor() -> MockExecutor: # type: ignore[reportReturnType] + return "not an executor" # type: ignore[return-value] + + builder = ( + WorkflowBuilder() + .register_executors({"InvalidExecutor": create_invalid_executor}) + .set_start_executor("InvalidExecutor") + ) + + # Attempting to build with a factory that returns non-Executor should raise TypeError + with pytest.raises(TypeError, match=r"Factory 'InvalidExecutor' returned str instead of an Executor\."): + builder.build() + + +async def test_build_async_raises_error_when_factory_returns_non_executor(): + """Test that build_async() raises TypeError when an async factory returns a non-Executor type.""" + + async def create_invalid_executor() -> MockExecutor: # type: ignore[reportReturnType] + return None # type: ignore[return-value] + + builder = ( + WorkflowBuilder() + .register_executors({"InvalidExecutor": create_invalid_executor}) + .set_start_executor("InvalidExecutor") + ) + + # Attempting to build with an async factory that returns non-Executor should raise TypeError + with pytest.raises(TypeError, match=r"Factory 'InvalidExecutor' returned NoneType instead of an Executor\."): + await builder.build_async() + + # Tests for new executor registration patterns From 30470edcddbf72cb7d081b1f548b93b77f447242 Mon Sep 17 00:00:00 2001 From: Bjoern Holtvogt Date: Fri, 6 Feb 2026 16:32:45 +0100 Subject: [PATCH 11/14] Add multiple tests for WorkflowBuilder --- .../tests/workflow/test_workflow_builder.py | 234 ++++++++++++++++-- 1 file changed, 210 insertions(+), 24 deletions(-) diff --git a/python/packages/core/tests/workflow/test_workflow_builder.py b/python/packages/core/tests/workflow/test_workflow_builder.py index 610018fd4e..545b39be2f 100644 --- a/python/packages/core/tests/workflow/test_workflow_builder.py +++ b/python/packages/core/tests/workflow/test_workflow_builder.py @@ -134,7 +134,153 @@ def test_add_agent_duplicate_id_raises_error(): builder.set_start_executor(agent1).add_edge(agent1, agent2).build() -# Tests for build() with async executor factories +# Tests for build() and build_async() with executor factories + + +def test_build_raises_error_with_async_executor_factories(): + """Test that build() raises ValueError when async executor factories are detected.""" + + async def create_async_executor() -> MockExecutor: + return MockExecutor(id="executor_async") + + builder = ( + WorkflowBuilder() + .register_executors({"AsyncExecutor": create_async_executor}) + .set_start_executor("AsyncExecutor") + ) + + # Attempting to build synchronously with async factories should raise an error + with pytest.raises(ValueError, match="Async executor factories were detected."): + builder.build() + + +def test_build_raises_error_when_factory_returns_non_executor(): + """Test that build() raises TypeError when a factory returns a non-Executor type.""" + + def create_invalid_executor() -> MockExecutor: # type: ignore[reportReturnType] + return "not an executor" # type: ignore[return-value] + + builder = ( + WorkflowBuilder() + .register_executors({"InvalidExecutor": create_invalid_executor}) + .set_start_executor("InvalidExecutor") + ) + + # Attempting to build with a factory that returns non-Executor should raise TypeError + with pytest.raises(TypeError, match=r"Factory 'InvalidExecutor' returned str instead of an Executor\."): + builder.build() + + +def test_build_raises_error_when_factory_returns_none(): + """Test that build() raises TypeError when a factory returns None.""" + + def create_none_executor() -> MockExecutor: # type: ignore[reportReturnType] + return None # type: ignore[return-value] + + builder = ( + WorkflowBuilder().register_executors({"NoneExecutor": create_none_executor}).set_start_executor("NoneExecutor") + ) + + # Attempting to build with a factory that returns None should raise TypeError + with pytest.raises(TypeError, match=r"Factory 'NoneExecutor' returned NoneType instead of an Executor\."): + builder.build() + + +def test_build_raises_error_when_sync_factory_raises_exception(): + """Test that build() propagates exceptions raised by sync factories.""" + + def create_failing_executor() -> MockExecutor: + raise RuntimeError("Sync factory failed!") + + builder = ( + WorkflowBuilder() + .register_executors({"FailingExecutor": create_failing_executor}) + .set_start_executor("FailingExecutor") + ) + + # Exception from sync factory should propagate + with pytest.raises(RuntimeError, match="Sync factory failed!"): + builder.build() + + +def test_build_raises_error_when_close_throws_exception(): + """Test that build() raises ValueError even when instance.close() throws an exception.""" + + class FailingCoroutine: + """A mock coroutine that raises an exception when close() is called.""" + + def close(self): + raise RuntimeError("Close failed!") + + def __await__(self): + # This makes it awaitable but we won't get here + return self # type: ignore[return-value] + + def create_failing_async_executor() -> MockExecutor: # type: ignore[reportReturnType] + return FailingCoroutine() # type: ignore[return-value] + + builder = ( + WorkflowBuilder() + .register_executors({"FailingExecutor": create_failing_async_executor}) + .set_start_executor("FailingExecutor") + ) + + # The ValueError about async factories should still be raised despite close() failing + with pytest.raises(ValueError, match="Async executor factories were detected."): + builder.build() + + +def test_build_calls_close_on_async_factory(): + """Test that build() actually calls close() on awaitable instances.""" + close_called = {"value": False} + + class TrackingCoroutine: + """A mock coroutine that tracks if close() is called.""" + + def close(self): + close_called["value"] = True + + def __await__(self): + return self # type: ignore[return-value] + + def create_tracking_async_executor() -> MockExecutor: # type: ignore[reportReturnType] + return TrackingCoroutine() # type: ignore[return-value] + + builder = ( + WorkflowBuilder() + .register_executors({"TrackingExecutor": create_tracking_async_executor}) + .set_start_executor("TrackingExecutor") + ) + + # Build should detect async factory and call close() + with pytest.raises(ValueError, match="Async executor factories were detected."): + builder.build() + + # Verify close() was actually called + assert close_called["value"], "close() should have been called on the awaitable instance" + + +def test_build_handles_awaitable_without_close_method(): + """Test that build() handles awaitable instances that don't have a close() method.""" + + class AwaitableWithoutClose: + """A mock awaitable without a close() method.""" + + def __await__(self): + return self # type: ignore[return-value] + + def create_awaitable_executor() -> MockExecutor: # type: ignore[reportReturnType] + return AwaitableWithoutClose() # type: ignore[return-value] + + builder = ( + WorkflowBuilder() + .register_executors({"AwaitableExecutor": create_awaitable_executor}) + .set_start_executor("AwaitableExecutor") + ) + + # Should still raise ValueError about async factories, even without close() method + with pytest.raises(ValueError, match="Async executor factories were detected."): + builder.build() async def test_build_async_with_async_executor_factories(): @@ -164,28 +310,38 @@ async def create_executor_b() -> MockExecutor: assert "executor_b" in workflow.executors -def test_build_raises_error_with_async_executor_factories(): - """Test that build() raises ValueError when async executor factories are detected.""" +async def test_build_async_with_mixed_sync_and_async_factories(): + """Test that build_async() works with a mix of sync and async executor factories.""" + + def create_sync_executor() -> MockExecutor: + return MockExecutor(id="executor_sync") async def create_async_executor() -> MockExecutor: return MockExecutor(id="executor_async") - builder = ( + # Build workflow with mixed sync and async executor factories + workflow = await ( WorkflowBuilder() - .register_executors({"AsyncExecutor": create_async_executor}) - .set_start_executor("AsyncExecutor") + .register_executors({ + "SyncExecutor": create_sync_executor, + "AsyncExecutor": create_async_executor, + }) + .set_start_executor("SyncExecutor") + .add_edge("SyncExecutor", "AsyncExecutor") + .build_async() ) - # Attempting to build synchronously with async factories should raise an error - with pytest.raises(ValueError, match="Async executor factories were detected."): - builder.build() + assert workflow.start_executor_id == "executor_sync" + assert len(workflow.executors) == 2 + assert "executor_sync" in workflow.executors + assert "executor_async" in workflow.executors -def test_build_raises_error_when_factory_returns_non_executor(): - """Test that build() raises TypeError when a factory returns a non-Executor type.""" +async def test_build_async_raises_error_when_factory_returns_non_executor(): + """Test that build_async() raises TypeError when an async factory returns a non-Executor type.""" - def create_invalid_executor() -> MockExecutor: # type: ignore[reportReturnType] - return "not an executor" # type: ignore[return-value] + async def create_invalid_executor() -> MockExecutor: # type: ignore[reportReturnType] + return None # type: ignore[return-value] builder = ( WorkflowBuilder() @@ -193,25 +349,25 @@ def create_invalid_executor() -> MockExecutor: # type: ignore[reportReturnType] .set_start_executor("InvalidExecutor") ) - # Attempting to build with a factory that returns non-Executor should raise TypeError - with pytest.raises(TypeError, match=r"Factory 'InvalidExecutor' returned str instead of an Executor\."): - builder.build() + # Attempting to build with an async factory that returns non-Executor should raise TypeError + with pytest.raises(TypeError, match=r"Factory 'InvalidExecutor' returned NoneType instead of an Executor\."): + await builder.build_async() -async def test_build_async_raises_error_when_factory_returns_non_executor(): - """Test that build_async() raises TypeError when an async factory returns a non-Executor type.""" +async def test_build_async_raises_error_when_factory_raises_exception(): + """Test that build_async() propagates exceptions raised by async factories.""" - async def create_invalid_executor() -> MockExecutor: # type: ignore[reportReturnType] - return None # type: ignore[return-value] + async def create_failing_executor() -> MockExecutor: + raise RuntimeError("Factory failed!") builder = ( WorkflowBuilder() - .register_executors({"InvalidExecutor": create_invalid_executor}) - .set_start_executor("InvalidExecutor") + .register_executors({"FailingExecutor": create_failing_executor}) + .set_start_executor("FailingExecutor") ) - # Attempting to build with an async factory that returns non-Executor should raise TypeError - with pytest.raises(TypeError, match=r"Factory 'InvalidExecutor' returned NoneType instead of an Executor\."): + # Exception from async factory should propagate + with pytest.raises(RuntimeError, match="Factory failed!"): await builder.build_async() @@ -261,6 +417,33 @@ def test_register_multiple_executors(): assert workflow.start_executor_id == "ExecutorA" +def test_register_executors_multiple_calls_with_non_overlapping_names(): + """Test that register_executors can be called multiple times with non-overlapping names.""" + builder = WorkflowBuilder() + + # First registration + builder.register_executors({"ExecutorA": lambda: MockExecutor(id="ExecutorA")}) + + # Second registration with different names + builder.register_executors({ + "ExecutorB": lambda: MockExecutor(id="ExecutorB"), + "ExecutorC": lambda: MockExecutor(id="ExecutorC"), + }) + + # Build workflow and verify all executors are present + workflow = ( + builder + .set_start_executor("ExecutorA") + .add_edge("ExecutorA", "ExecutorB") + .add_edge("ExecutorB", "ExecutorC") + .build() + ) + + assert "ExecutorA" in workflow.executors + assert "ExecutorB" in workflow.executors + assert "ExecutorC" in workflow.executors + + def test_register_executors_rejects_empty_inputs(): """Test that empty executor mappings and entries are rejected.""" builder = WorkflowBuilder() @@ -271,6 +454,9 @@ def test_register_executors_rejects_empty_inputs(): with pytest.raises(ValueError, match="name cannot be empty"): builder.register_executors({"": lambda: MockExecutor(id="ExecutorA")}) + with pytest.raises(ValueError, match="cannot be empty or whitespace-only"): + builder.register_executors({" ": lambda: MockExecutor(id="ExecutorA")}) + with pytest.raises(TypeError, match="must be callable"): builder.register_executors({"ExecutorA": None}) # type: ignore[arg-type] From 7bd19ae737bc6b249fe913624c109108d88fac06 Mon Sep 17 00:00:00 2001 From: Bjoern Holtvogt Date: Fri, 6 Feb 2026 16:49:38 +0100 Subject: [PATCH 12/14] Add tests for unregistered factory and executor --- .../tests/workflow/test_workflow_builder.py | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/python/packages/core/tests/workflow/test_workflow_builder.py b/python/packages/core/tests/workflow/test_workflow_builder.py index 6f01b69e49..94df4adbfb 100644 --- a/python/packages/core/tests/workflow/test_workflow_builder.py +++ b/python/packages/core/tests/workflow/test_workflow_builder.py @@ -533,6 +533,35 @@ def test_register_factory_conflicts_with_direct_executor(): builder.build() +def test_unregistered_factory_name_in_edge_raises_error(): + """Test that referencing an unregistered factory name in an edge raises ValueError.""" + builder = WorkflowBuilder() + + # Register only ExecutorA + builder.register_executors({"ExecutorA": lambda: MockExecutor(id="ExecutorA")}) + builder.set_start_executor("ExecutorA") + + # Add edge to unregistered ExecutorB + builder.add_edge("ExecutorA", "ExecutorB") + + # Building should raise ValueError because ExecutorB was never registered + with pytest.raises(ValueError, match="Factory 'ExecutorB' has not been registered"): + builder.build() + + +def test_unregistered_start_executor_factory_name_raises_error(): + """Test that setting an unregistered factory name as start executor raises ValueError.""" + builder = WorkflowBuilder() + + # Register ExecutorA but set start to unregistered ExecutorB + builder.register_executors({"ExecutorA": lambda: MockExecutor(id="ExecutorA")}) + builder.set_start_executor("ExecutorB") + + # Building should raise ValueError because ExecutorB was never registered + with pytest.raises(ValueError, match="Failed to resolve starting executor from registered factories"): + builder.build() + + def test_register_agent_basic(): """Test basic agent registration with lazy initialization.""" builder = WorkflowBuilder() From 095c74a0850521cbdd3314129f9bb9e893e9f423 Mon Sep 17 00:00:00 2001 From: Bjoern Holtvogt Date: Fri, 6 Feb 2026 16:59:00 +0100 Subject: [PATCH 13/14] Use new register_executors API --- .../agents/azure_chat_agents_tool_calls_with_feedback.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/python/samples/getting_started/workflows/agents/azure_chat_agents_tool_calls_with_feedback.py b/python/samples/getting_started/workflows/agents/azure_chat_agents_tool_calls_with_feedback.py index 457defcf51..5c87c4e824 100644 --- a/python/samples/getting_started/workflows/agents/azure_chat_agents_tool_calls_with_feedback.py +++ b/python/samples/getting_started/workflows/agents/azure_chat_agents_tool_calls_with_feedback.py @@ -243,14 +243,13 @@ async def main() -> None: WorkflowBuilder() .register_agent(create_writer_agent, name="writer_agent") .register_agent(create_final_editor_agent, name="final_editor_agent") - .register_executor( - lambda: Coordinator( + .register_executors({ + "coordinator": lambda: Coordinator( id="coordinator", writer_id="writer_agent", final_editor_id="final_editor_agent", ), - name="coordinator", - ) + }) .set_start_executor("writer_agent") .add_edge("writer_agent", "coordinator") .add_edge("coordinator", "writer_agent") From 66484539278f7779344144ad9e815d3c974a7177 Mon Sep 17 00:00:00 2001 From: Bjoern Holtvogt Date: Fri, 6 Feb 2026 20:46:02 +0100 Subject: [PATCH 14/14] Apply format changes and fix typos --- .../_start-here/step4_using_factories.py | 22 +++++++++++-------- .../agents/custom_agent_executors.py | 12 +++++++--- .../agents_with_approval_requests.py | 13 ++++++++--- .../guessing_game_with_human_input.py | 16 ++++++++++---- 4 files changed, 44 insertions(+), 19 deletions(-) diff --git a/python/samples/getting_started/workflows/_start-here/step4_using_factories.py b/python/samples/getting_started/workflows/_start-here/step4_using_factories.py index 3e482e0238..93283783d6 100644 --- a/python/samples/getting_started/workflows/_start-here/step4_using_factories.py +++ b/python/samples/getting_started/workflows/_start-here/step4_using_factories.py @@ -2,9 +2,15 @@ import asyncio -from agent_framework import (AgentResponseUpdate, ChatAgent, Executor, - WorkflowBuilder, WorkflowContext, executor, - handler) +from agent_framework import ( + AgentResponseUpdate, + ChatAgent, + Executor, + WorkflowBuilder, + WorkflowContext, + executor, + handler, +) from agent_framework.azure import AzureOpenAIChatClient from azure.identity import AzureCliCredential @@ -68,12 +74,10 @@ async def main(): # 5) build() finalizes and returns an immutable Workflow object workflow = ( WorkflowBuilder() - .register_executors( - { - "UpperCase": lambda: UpperCase(id="upper_case_executor"), - "ReverseText": lambda: reverse_text, - } - ) + .register_executors({ + "UpperCase": lambda: UpperCase(id="upper_case_executor"), + "ReverseText": lambda: reverse_text, + }) .register_agent(create_agent, name="DecoderAgent") .add_chain(["UpperCase", "ReverseText", "DecoderAgent"]) .set_start_executor("UpperCase") diff --git a/python/samples/getting_started/workflows/agents/custom_agent_executors.py b/python/samples/getting_started/workflows/agents/custom_agent_executors.py index 97641ebe7f..906e9f7e09 100644 --- a/python/samples/getting_started/workflows/agents/custom_agent_executors.py +++ b/python/samples/getting_started/workflows/agents/custom_agent_executors.py @@ -2,8 +2,14 @@ import asyncio -from agent_framework import (ChatAgent, ChatMessage, Executor, WorkflowBuilder, - WorkflowContext, handler) +from agent_framework import ( + ChatAgent, + ChatMessage, + Executor, + WorkflowBuilder, + WorkflowContext, + handler, +) from agent_framework.azure import AzureOpenAIChatClient from azure.identity import AzureCliCredential @@ -18,7 +24,7 @@ pattern with typed inputs and typed WorkflowContext[T] outputs, connect executors with the fluent WorkflowBuilder, and finish by yielding outputs from the terminal node. -Note: When an agent is passed to a workflow, the workflow essenatially wrap the agent in a more sophisticated executor. +Note: When an agent is passed to a workflow, the workflow essentially wraps the agent in a more sophisticated executor. Prerequisites: - Azure OpenAI configured for AzureOpenAIChatClient with required environment variables. diff --git a/python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py b/python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py index 1d624a66d6..fff5185a76 100644 --- a/python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py +++ b/python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py @@ -5,9 +5,16 @@ from dataclasses import dataclass from typing import Annotated -from agent_framework import (AgentExecutorResponse, Content, Executor, - WorkflowBuilder, WorkflowContext, executor, - handler, tool) +from agent_framework import ( + AgentExecutorResponse, + Content, + Executor, + WorkflowBuilder, + WorkflowContext, + executor, + handler, + tool, +) from agent_framework.openai import OpenAIChatClient from typing_extensions import Never diff --git a/python/samples/getting_started/workflows/human-in-the-loop/guessing_game_with_human_input.py b/python/samples/getting_started/workflows/human-in-the-loop/guessing_game_with_human_input.py index ebb6c7ae65..bee4aeb61d 100644 --- a/python/samples/getting_started/workflows/human-in-the-loop/guessing_game_with_human_input.py +++ b/python/samples/getting_started/workflows/human-in-the-loop/guessing_game_with_human_input.py @@ -4,10 +4,18 @@ from collections.abc import AsyncIterable from dataclasses import dataclass -from agent_framework import (AgentExecutorRequest, AgentExecutorResponse, - AgentResponseUpdate, ChatMessage, Executor, - WorkflowBuilder, WorkflowContext, WorkflowEvent, - handler, response_handler) +from agent_framework import ( + AgentExecutorRequest, + AgentExecutorResponse, + AgentResponseUpdate, + ChatMessage, + Executor, + WorkflowBuilder, + WorkflowContext, + WorkflowEvent, + handler, + response_handler, +) from agent_framework.azure import AzureOpenAIChatClient from azure.identity import AzureCliCredential from pydantic import BaseModel