diff --git a/python/packages/core/agent_framework/_workflows/_workflow_builder.py b/python/packages/core/agent_framework/_workflows/_workflow_builder.py index e47279b1a2..04293115b3 100644 --- a/python/packages/core/agent_framework/_workflows/_workflow_builder.py +++ b/python/packages/core/agent_framework/_workflows/_workflow_builder.py @@ -1,11 +1,14 @@ # Copyright (c) Microsoft. All rights reserved. +import inspect import logging import sys -from collections.abc import Callable, Sequence +from collections.abc import Awaitable, Callable, Sequence from dataclasses import dataclass from typing import Any +from opentelemetry import trace + from .._agents import SupportsAgentRun from .._threads import AgentThread from ..observability import OtelAttr, capture_exception, create_workflow_span @@ -139,8 +142,10 @@ async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None: # Build a workflow workflow = ( WorkflowBuilder() - .register_executor(lambda: UpperCaseExecutor(id="upper"), name="UpperCase") - .register_executor(lambda: ReverseExecutor(id="reverse"), name="Reverse") + .register_executors({ + "UpperCase": lambda: UpperCaseExecutor(id="upper"), + "Reverse": lambda: ReverseExecutor(id="reverse"), + }) .add_edge("UpperCase", "Reverse") .set_start_executor("UpperCase") .build() @@ -184,7 +189,7 @@ def __init__( | _MultiSelectionEdgeGroupRegistration | _FanInEdgeRegistration ] = [] - self._executor_registry: dict[str, Callable[[], Executor]] = {} + self._executor_registry: dict[str, Callable[[], Executor | Awaitable[Executor]]] = {} # Output executors filter; if set, only outputs from these executors are yielded self._output_executors: list[Executor | SupportsAgentRun | str] = [] @@ -247,21 +252,23 @@ def _maybe_wrap_agent(self, candidate: Executor | SupportsAgentRun) -> Executor: f"WorkflowBuilder expected an Executor or SupportsAgentRun instance; got {type(candidate).__name__}." ) - def register_executor(self, factory_func: Callable[[], Executor], name: str | list[str]) -> Self: - """Register an executor factory function for lazy initialization. + def register_executors(self, executor_factories: dict[str, Callable[[], Executor | Awaitable[Executor]]]) -> Self: + """Register multiple executor factory functions for lazy initialization. - This method allows you to register a factory function that creates an executor. - The executor will be instantiated only when the workflow is built, enabling - deferred initialization and potentially reducing startup time. + This method allows you to register multiple factory functions at once. Each + executor is instantiated only when the workflow is built, enabling deferred + initialization and reducing builder boilerplate. Args: - factory_func: A callable that returns an Executor instance when called. - name: The name(s) of the registered executor factory. This doesn't have to match - the executor's ID, but it must be unique within the workflow. + executor_factories: A mapping of executor names to factory callables that + return Executor instances when called. + + Raises: + ValueError: If executor_factories is empty or contains empty names, or if a name is already registered. + TypeError: If an executor factory is not callable. Example: .. code-block:: python - from typing_extensions import Never from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler @@ -273,49 +280,34 @@ async def process(self, text: str, ctx: WorkflowContext[str]) -> None: class ReverseExecutor(Executor): @handler - async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None: + async def process(self, text: str, ctx: WorkflowContext[str]) -> None: await ctx.yield_output(text[::-1]) - # Build a workflow workflow = ( WorkflowBuilder() - .register_executor(lambda: UpperCaseExecutor(id="upper"), name="UpperCase") - .register_executor(lambda: ReverseExecutor(id="reverse"), name="Reverse") + .register_executors({ + "UpperCase": lambda: UpperCaseExecutor(id="upper"), + "Reverse": lambda: ReverseExecutor(id="reverse"), + }) .set_start_executor("UpperCase") .add_edge("UpperCase", "Reverse") .build() ) - - If multiple names are provided, the same factory function will be registered under each name. - - .. code-block:: python - - from agent_framework import WorkflowBuilder, Executor, WorkflowContext, handler - - - class LoggerExecutor(Executor): - @handler - async def log(self, message: str, ctx: WorkflowContext) -> None: - print(f"Log: {message}") - - - # Register the same executor factory under multiple names - workflow = ( - WorkflowBuilder() - .register_executor(lambda: LoggerExecutor(id="logger"), name=["ExecutorA", "ExecutorB"]) - .set_start_executor("ExecutorA") - .add_edge("ExecutorA", "ExecutorB") - .build() """ - names = [name] if isinstance(name, str) else name + if not executor_factories: + raise ValueError("Executor factories cannot be empty.") - for n in names: - if n in self._executor_registry: - raise ValueError(f"An executor factory with the name '{n}' is already registered.") + for name, factory_function in executor_factories.items(): + if not name or not name.strip(): + raise ValueError("Executor factory name cannot be empty or whitespace-only") + if not callable(factory_function): + raise TypeError(f"Executor factory for '{name}' must be callable.") + if name in self._executor_registry: + raise ValueError(f"An executor factory with the name '{name}' is already registered.") - for n in names: - self._executor_registry[n] = factory_func + for name, factory_function in executor_factories.items(): + self._executor_registry[name] = factory_function return self @@ -348,7 +340,7 @@ def register_agent( # Build a workflow workflow = ( WorkflowBuilder() - .register_executor(lambda: ..., name="SomeOtherExecutor") + .register_executors({"SomeOtherExecutor": lambda: ...}) .register_agent( lambda: AnthropicAgent(name="writer", model="claude-3-5-sonnet-20241022"), name="WriterAgent", @@ -393,7 +385,7 @@ def add_edge( Note: If instances are provided for both source and target, they will be shared across all workflow instances created from the built Workflow. To avoid this, consider - registering the executors and agents using `register_executor` and `register_agent` + registering the executors and agents using `register_executors` and `register_agent` and referencing them by factory name for lazy initialization instead. Returns: @@ -421,8 +413,10 @@ async def process(self, count: int, ctx: WorkflowContext[Never, str]) -> None: # Connect executors with an edge workflow = ( WorkflowBuilder() - .register_executor(lambda: ProcessorA(id="a"), name="ProcessorA") - .register_executor(lambda: ProcessorB(id="b"), name="ProcessorB") + .register_executors({ + "ProcessorA": lambda: ProcessorA(id="a"), + "ProcessorB": lambda: ProcessorB(id="b"), + }) .add_edge("ProcessorA", "ProcessorB") .set_start_executor("ProcessorA") .build() @@ -430,8 +424,10 @@ async def process(self, count: int, ctx: WorkflowContext[Never, str]) -> None: workflow = ( WorkflowBuilder() - .register_executor(lambda: ProcessorA(id="a"), name="ProcessorA") - .register_executor(lambda: ProcessorB(id="b"), name="ProcessorB") + .register_executors({ + "ProcessorA": lambda: ProcessorA(id="a"), + "ProcessorB": lambda: ProcessorB(id="b"), + }) .add_edge("ProcessorA", "ProcessorB", condition=only_large_numbers) .set_start_executor("ProcessorA") .build() @@ -477,7 +473,7 @@ def add_fan_out_edges( Note: If instances are provided for source and targets, they will be shared across all workflow instances created from the built Workflow. To avoid this, consider - registering the executors and agents using `register_executor` and `register_agent` + registering the executors and agents using `register_executors` and `register_agent` and referencing them by factory name for lazy initialization instead. Example: @@ -508,9 +504,11 @@ async def validate(self, data: str, ctx: WorkflowContext) -> None: # Broadcast to multiple validators workflow = ( WorkflowBuilder() - .register_executor(lambda: DataSource(id="source"), name="DataSource") - .register_executor(lambda: ValidatorA(id="val_a"), name="ValidatorA") - .register_executor(lambda: ValidatorB(id="val_b"), name="ValidatorB") + .register_executors({ + "DataSource": lambda: DataSource(id="source"), + "ValidatorA": lambda: ValidatorA(id="val_a"), + "ValidatorB": lambda: ValidatorB(id="val_b"), + }) .add_fan_out_edges("DataSource", ["ValidatorA", "ValidatorB"]) .set_start_executor("DataSource") .build() @@ -565,7 +563,7 @@ def add_switch_case_edge_group( Note: If instances are provided for source and case targets, they will be shared across all workflow instances created from the built Workflow. To avoid this, consider - registering the executors and agents using `register_executor` and `register_agent` + registering the executors and agents using `register_executors` and `register_agent` and referencing them by factory name for lazy initialization instead. Example: @@ -601,9 +599,11 @@ async def handle(self, result: Result, ctx: WorkflowContext) -> None: # Route based on score value workflow = ( WorkflowBuilder() - .register_executor(lambda: Evaluator(id="eval"), name="Evaluator") - .register_executor(lambda: HighScoreHandler(id="high"), name="HighScoreHandler") - .register_executor(lambda: LowScoreHandler(id="low"), name="LowScoreHandler") + .register_executors({ + "Evaluator": lambda: Evaluator(id="eval"), + "HighScoreHandler": lambda: HighScoreHandler(id="high"), + "LowScoreHandler": lambda: LowScoreHandler(id="low"), + }) .add_switch_case_edge_group( "Evaluator", [ @@ -671,7 +671,7 @@ def add_multi_selection_edge_group( Note: If instances are provided for source and targets, they will be shared across all workflow instances created from the built Workflow. To avoid this, consider - registering the executors and agents using `register_executor` and `register_agent` + registering the executors and agents using `register_executors` and `register_agent` and referencing them by factory name for lazy initialization instead. Example: @@ -715,9 +715,11 @@ def select_workers(task: Task, available: list[str]) -> list[str]: workflow = ( WorkflowBuilder() - .register_executor(lambda: TaskDispatcher(id="dispatcher"), name="TaskDispatcher") - .register_executor(lambda: WorkerA(id="worker_a"), name="WorkerA") - .register_executor(lambda: WorkerB(id="worker_b"), name="WorkerB") + .register_executors({ + "TaskDispatcher": lambda: TaskDispatcher(id="dispatcher"), + "WorkerA": lambda: WorkerA(id="worker_a"), + "WorkerB": lambda: WorkerB(id="worker_b"), + }) .add_multi_selection_edge_group( "TaskDispatcher", ["WorkerA", "WorkerB"], @@ -778,7 +780,7 @@ def add_fan_in_edges( Note: If instances are provided for sources and target, they will be shared across all workflow instances created from the built Workflow. To avoid this, consider - registering the executors and agents using `register_executor` and `register_agent` + registering the executors and agents using `register_executors` and `register_agent` and referencing them by factory name for lazy initialization instead. Example: @@ -804,9 +806,11 @@ async def aggregate(self, results: list[str], ctx: WorkflowContext[Never, str]) # Collect results from multiple producers workflow = ( WorkflowBuilder() - .register_executor(lambda: Producer(id="prod_1"), name="Producer1") - .register_executor(lambda: Producer(id="prod_2"), name="Producer2") - .register_executor(lambda: Aggregator(id="agg"), name="Aggregator") + .register_executors({ + "Producer1": lambda: Producer(id="prod_1"), + "Producer2": lambda: Producer(id="prod_2"), + "Aggregator": lambda: Aggregator(id="agg"), + }) .add_fan_in_edges(["Producer1", "Producer2"], "Aggregator") .set_start_executor("Producer1") .build() @@ -850,7 +854,7 @@ def add_chain(self, executors: Sequence[Executor | SupportsAgentRun | str]) -> S Note: If executor instances are provided, they will be shared across all workflow instances created from the built Workflow. To avoid this, consider registering the executors and agents using - `register_executor` and `register_agent` and referencing them by factory name for lazy + `register_executors` and `register_agent` and referencing them by factory name for lazy initialization instead. Example: @@ -881,9 +885,11 @@ async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None: # Chain executors in sequence workflow = ( WorkflowBuilder() - .register_executor(lambda: Step1(id="step1"), name="step1") - .register_executor(lambda: Step2(id="step2"), name="step2") - .register_executor(lambda: Step3(id="step3"), name="step3") + .register_executors({ + "step1": lambda: Step1(id="step1"), + "step2": lambda: Step2(id="step2"), + "step3": lambda: Step3(id="step3"), + }) .add_chain(["step1", "step2", "step3"]) .set_start_executor("step1") .build() @@ -945,8 +951,10 @@ async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None: workflow = ( WorkflowBuilder() - .register_executor(lambda: EntryPoint(id="entry"), name="EntryPoint") - .register_executor(lambda: Processor(id="proc"), name="Processor") + .register_executors({ + "EntryPoint": lambda: EntryPoint(id="entry"), + "Processor": lambda: Processor(id="proc"), + }) .add_edge("EntryPoint", "Processor") .set_start_executor("EntryPoint") .build() @@ -1003,8 +1011,10 @@ async def process(self, count: int, ctx: WorkflowContext[int]) -> None: workflow = ( WorkflowBuilder() .set_max_iterations(500) - .register_executor(lambda: StepA(id="step_a"), name="StepA") - .register_executor(lambda: StepB(id="step_b"), name="StepB") + .register_executors({ + "StepA": lambda: StepA(id="step_a"), + "StepB": lambda: StepB(id="step_b"), + }) .add_edge("StepA", "StepB") .add_edge("StepB", "StepA") # Cycle .set_start_executor("StepA") @@ -1053,8 +1063,10 @@ async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None: storage = FileCheckpointStorage("./checkpoints") workflow = ( WorkflowBuilder() - .register_executor(lambda: ProcessorA(id="proc_a"), name="ProcessorA") - .register_executor(lambda: ProcessorB(id="proc_b"), name="ProcessorB") + .register_executors({ + "ProcessorA": lambda: ProcessorA(id="proc_a"), + "ProcessorB": lambda: ProcessorB(id="proc_b"), + }) .add_edge("ProcessorA", "ProcessorB") .set_start_executor("ProcessorA") .with_checkpointing(storage) @@ -1083,62 +1095,43 @@ def with_output_from(self, executors: list[Executor | SupportsAgentRun | str]) - self._output_executors = list(executors) return self - def _resolve_edge_registry(self) -> tuple[Executor, dict[str, Executor], list[EdgeGroup]]: - """Resolve deferred edge registrations into executors and edge groups. + def _resolve_edge_registrations(self, factory_name_to_instance: dict[str, Executor]) -> list[EdgeGroup]: + """Process edge registrations into edge groups using resolved executor instances. - Returns: - tuple: A tuple containing: - - The starting Executor instance. - - A dictionary mapping registered factory names to resolved Executor instances. - - A list of EdgeGroup instances representing the workflow edges composed of resolved executors. + Args: + factory_name_to_instance: Mapping of registered factory names to executor instances. - Notes: - Non-factory executors (i.e., those added directly) are not included in the returned list, - as they are already part of the workflow builder's internal state. + Returns: + list[EdgeGroup]: A list of EdgeGroup instances for the registered edges. """ - if not self._start_executor: - raise ValueError("Starting executor must be set using set_start_executor before building the workflow.") - start_executor: Executor | None = None - if isinstance(self._start_executor, Executor): - start_executor = self._start_executor + def _get_executor(name: str) -> Executor: + """Helper to get executor by the registered name. - # Maps registered factory names to created executor instances for edge resolution - factory_name_to_instance: dict[str, Executor] = {} - # Maps executor IDs to created executor instances to prevent duplicates - executor_id_to_instance: dict[str, Executor] = {} - deferred_edge_groups: list[EdgeGroup] = [] - for name, exec_factory in self._executor_registry.items(): - instance = exec_factory() - if instance.id in executor_id_to_instance: - raise ValueError(f"Executor with ID '{instance.id}' has already been registered.") - if instance.id in self._executors: - raise ValueError(f"Executor ID collision: An executor with ID '{instance.id}' already exists.") - executor_id_to_instance[instance.id] = instance + Args: + name: The registered name of the executor to get. - if isinstance(self._start_executor, str) and name == self._start_executor: - start_executor = instance + Returns: + Executor: The Executor instance. - # All executors will get their own internal edge group for receiving system messages - deferred_edge_groups.append(InternalEdgeGroup(instance.id)) # type: ignore[call-arg] - factory_name_to_instance[name] = instance - - def _get_executor(name: str) -> Executor: - """Helper to get executor by the registered name. Raises if not found.""" + Raises: + ValueError: If the executor is not found. + """ if name not in factory_name_to_instance: raise ValueError(f"Factory '{name}' has not been registered.") return factory_name_to_instance[name] + edge_groups: list[EdgeGroup] = [] for registration in self._edge_registry: match registration: case _EdgeRegistration(source, target, condition): source_exec: Executor = _get_executor(source) target_exec: Executor = _get_executor(target) - deferred_edge_groups.append(SingleEdgeGroup(source_exec.id, target_exec.id, condition)) # type: ignore[call-arg] + edge_groups.append(SingleEdgeGroup(source_exec.id, target_exec.id, condition)) # type: ignore[call-arg] case _FanOutEdgeRegistration(source, targets): source_exec = _get_executor(source) target_execs = [_get_executor(t) for t in targets] - deferred_edge_groups.append(FanOutEdgeGroup(source_exec.id, [t.id for t in target_execs])) # type: ignore[call-arg] + edge_groups.append(FanOutEdgeGroup(source_exec.id, [t.id for t in target_execs])) # type: ignore[call-arg] case _SwitchCaseEdgeGroupRegistration(source, cases): source_exec = _get_executor(source) cases_converted: list[SwitchCaseEdgeGroupCase | SwitchCaseEdgeGroupDefault] = [] @@ -1152,22 +1145,222 @@ def _get_executor(name: str) -> Executor: cases_converted.append( SwitchCaseEdgeGroupCase(condition=case.condition, target_id=target_exec.id) ) - deferred_edge_groups.append(SwitchCaseEdgeGroup(source_exec.id, cases_converted)) # type: ignore[call-arg] + edge_groups.append(SwitchCaseEdgeGroup(source_exec.id, cases_converted)) # type: ignore[call-arg] case _MultiSelectionEdgeGroupRegistration(source, targets, selection_func): source_exec = _get_executor(source) target_execs = [_get_executor(t) for t in targets] - deferred_edge_groups.append( + edge_groups.append( FanOutEdgeGroup(source_exec.id, [t.id for t in target_execs], selection_func) # type: ignore[call-arg] ) case _FanInEdgeRegistration(sources, target): source_execs = [_get_executor(s) for s in sources] target_exec = _get_executor(target) - deferred_edge_groups.append(FanInEdgeGroup([s.id for s in source_execs], target_exec.id)) # type: ignore[call-arg] + edge_groups.append(FanInEdgeGroup([s.id for s in source_execs], target_exec.id)) # type: ignore[call-arg] + return edge_groups + + def _process_instantiated_executors( + self, + factory_name_to_instance: dict[str, Executor], + ) -> tuple[Executor, dict[str, Executor], list[EdgeGroup]]: + """Process instantiated executors: validate IDs, create edge groups, resolve edges. + + Args: + factory_name_to_instance: Mapping of factory names to already-instantiated executors. + + Returns: + tuple: A tuple containing: + - The starting Executor instance. + - The factory_name_to_instance mapping. + - A list of EdgeGroup instances representing the workflow edges. + + Raises: + ValueError: If start executor is not set, if duplicate executor IDs are found, + or if start executor cannot be resolved. + """ + if not self._start_executor: + raise ValueError("Starting executor must be set using set_start_executor before building the workflow.") + + start_executor: Executor | None = None + if isinstance(self._start_executor, Executor): + start_executor = self._start_executor + + deferred_edge_groups: list[EdgeGroup] = [] + # Map of executor IDs to their factory names for error message clarity + executor_id_to_factory_name: dict[str, str] = {} + + for name, instance in factory_name_to_instance.items(): + if instance.id in executor_id_to_factory_name: + existing_factory = executor_id_to_factory_name[instance.id] + raise ValueError( + f"Executor with ID '{instance.id}' from factory '{name}' " + f"conflicts with existing factory '{existing_factory}'." + ) + if instance.id in self._executors: + raise ValueError(f"Executor with ID '{instance.id}' has already been added to the workflow.") + executor_id_to_factory_name[instance.id] = name + + if isinstance(self._start_executor, str) and name == self._start_executor: + start_executor = instance + + # All executors will get their own internal edge group for receiving system messages + deferred_edge_groups.append(InternalEdgeGroup(instance.id)) # type: ignore[call-arg] + + # Process edge registrations using shared helper + deferred_edge_groups.extend(self._resolve_edge_registrations(factory_name_to_instance)) + if start_executor is None: raise ValueError("Failed to resolve starting executor from registered factories.") return (start_executor, factory_name_to_instance, deferred_edge_groups) + def _resolve_edge_registry(self) -> tuple[Executor, dict[str, Executor], list[EdgeGroup]]: + """Resolve deferred edge registrations into executors and edge groups. + + Factory functions are called synchronously to instantiate executors. If any factory + returns an awaitable, an error is raised directing the caller to use :meth:`build_async` + instead. + + Returns: + tuple: A tuple containing: + - The starting Executor instance. + - A dictionary mapping registered factory names to resolved Executor instances. + - A list of EdgeGroup instances representing the workflow edges composed of resolved executors. + + Raises: + ValueError: If an async executor factory is detected. + + Notes: + Non-factory executors (i.e., those added directly) are not included in the returned list, + as they are already part of the workflow builder's internal state. + """ + factory_name_to_instance: dict[str, Executor] = {} + for name, executor_factory in self._executor_registry.items(): + instance = executor_factory() + if inspect.isawaitable(instance): + # Close un-awaited coroutines to avoid runtime warnings or memory leaks + try: + if hasattr(instance, "close"): + instance.close() # type: ignore[reportGeneralTypeIssues] + finally: + raise ValueError("Async executor factories were detected. Use build_async() instead.") + + if not isinstance(instance, Executor): + raise TypeError(f"Factory '{name}' returned {type(instance).__name__} instead of an Executor.") + + factory_name_to_instance[name] = instance + + return self._process_instantiated_executors(factory_name_to_instance) + + async def _resolve_edge_registry_async(self) -> tuple[Executor, dict[str, Executor], list[EdgeGroup]]: + """Resolve deferred edge registrations with support for async executor factories. + + This async variant of :meth:`_resolve_edge_registry` allows executor factories to + return awaitables. Factory functions are called and any awaitable results are awaited + before processing. All other logic (validation, edge resolution) is identical to the + sync version. + + Returns: + tuple: A tuple containing: + - The starting Executor instance. + - A dictionary mapping registered factory names to resolved Executor instances. + - A list of EdgeGroup instances representing the workflow edges composed of resolved executors. + + Notes: + Non-factory executors (i.e., those added directly) are not included in the returned list, + as they are already part of the workflow builder's internal state. + + See Also: + :meth:`_resolve_edge_registry`: Synchronous version that raises an error on async factories. + """ + factory_name_to_instance: dict[str, Executor] = {} + for name, executor_factory in self._executor_registry.items(): + instance = executor_factory() + if inspect.isawaitable(instance): + # Handle maybe awaitable executor instances + instance = await instance + + if not isinstance(instance, Executor): + raise TypeError(f"Factory '{name}' returned {type(instance).__name__} instead of an Executor.") + + factory_name_to_instance[name] = instance + + return self._process_instantiated_executors(factory_name_to_instance) + + def _create_workflow_from_resolved_registry( + self, + span: trace.Span, + start_executor: Executor, + deferred_executors: dict[str, Executor], + deferred_edge_groups: list[EdgeGroup], + ) -> Workflow: + """Create and validate a Workflow instance from resolved executor registry data. + + This is a shared helper used by both :meth:`build` and :meth:`build_async` to avoid + code duplication in the workflow creation and validation logic. + + Args: + span: The OpenTelemetry span for tracking the build process. + start_executor: The resolved starting executor instance. + deferred_executors: Mapping of factory names to instantiated executors. + deferred_edge_groups: List of edge groups from resolved registrations. + + Returns: + Workflow: A fully constructed and validated Workflow instance. + + Raises: + WorkflowValidationError: If workflow validation fails. + """ + executors = self._executors | {exe.id: exe for exe in deferred_executors.values()} + edge_groups = self._edge_groups + deferred_edge_groups + output_executors = ( + [ + deferred_executors[factory_name].id + for factory_name in self._output_executors + if isinstance(factory_name, str) + ] + + [ex.id for ex in self._output_executors if isinstance(ex, Executor)] + + [resolve_agent_id(agent) for agent in self._output_executors if isinstance(agent, SupportsAgentRun)] + ) + + # Perform validation before creating the workflow + validate_workflow_graph( + edge_groups, + executors, + start_executor, + output_executors, + ) + + # Add validation completed event + span.add_event(OtelAttr.BUILD_VALIDATION_COMPLETED) + + context = InProcRunnerContext(self._checkpoint_storage) + + # Create workflow instance after validation + workflow = Workflow( + edge_groups, + executors, + start_executor, + context, + self._max_iterations, + name=self._name, + description=self._description, + output_executors=output_executors, + ) + build_attributes: dict[str, Any] = { + OtelAttr.WORKFLOW_ID: workflow.id, + OtelAttr.WORKFLOW_DEFINITION: workflow.to_json(), + } + if workflow.name: + build_attributes[OtelAttr.WORKFLOW_NAME] = workflow.name + if workflow.description: + build_attributes[OtelAttr.WORKFLOW_DESCRIPTION] = workflow.description + span.set_attributes(build_attributes) + + # Add workflow build completed event + span.add_event(OtelAttr.BUILD_COMPLETED) + + return workflow + def build(self) -> Workflow: """Build and return the constructed workflow. @@ -1201,7 +1394,7 @@ async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None: # Build and execute a workflow workflow = ( WorkflowBuilder() - .register_executor(lambda: MyExecutor(id="executor"), name="MyExecutor") + .register_executors({"MyExecutor": lambda: MyExecutor(id="executor")}) .set_start_executor("MyExecutor") .build() ) @@ -1222,61 +1415,84 @@ async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None: # Resolve lazy edge registrations start_executor, deferred_executors, deferred_edge_groups = self._resolve_edge_registry() - executors = self._executors | {exe.id: exe for exe in deferred_executors.values()} - edge_groups = self._edge_groups + deferred_edge_groups - output_executors = ( - [ - deferred_executors[factory_name].id - for factory_name in self._output_executors - if isinstance(factory_name, str) - ] - + [ex.id for ex in self._output_executors if isinstance(ex, Executor)] - + [ - resolve_agent_id(agent) - for agent in self._output_executors - if isinstance(agent, SupportsAgentRun) - ] - ) - # Perform validation before creating the workflow - validate_workflow_graph( - edge_groups, - executors, - start_executor, - output_executors, + # Create and validate workflow + return self._create_workflow_from_resolved_registry( + span, start_executor, deferred_executors, deferred_edge_groups ) + except Exception as exc: + attributes = { + OtelAttr.BUILD_ERROR_MESSAGE: str(exc), + OtelAttr.BUILD_ERROR_TYPE: type(exc).__name__, + } + span.add_event(OtelAttr.BUILD_ERROR, attributes) # type: ignore[reportArgumentType, arg-type] + capture_exception(span, exc) + raise + + async def build_async(self) -> Workflow: + """Build and return the constructed workflow with async executor factory support. + + This async version of :meth:`build` supports executor factories that return + awaitables (coroutines). Use this method when any of your registered executor + factories are async functions. + + This method performs validation before building the workflow to ensure: + - A starting executor has been set + - All edges connect valid executors + - The graph is properly connected + - Type compatibility between connected executors + + Returns: + Workflow: An immutable Workflow instance ready for execution. + + Raises: + ValueError: If starting executor is not set. + WorkflowValidationError: If workflow validation fails (includes EdgeDuplicationError, + TypeCompatibilityError, and GraphConnectivityError subclasses). + + Example: + .. code-block:: python - # Add validation completed event - span.add_event(OtelAttr.BUILD_VALIDATION_COMPLETED) - - context = InProcRunnerContext(self._checkpoint_storage) - - # Create workflow instance after validation - workflow = Workflow( - edge_groups, - executors, - start_executor, - context, - self._max_iterations, - name=self._name, - description=self._description, - output_executors=output_executors, + from typing_extensions import Never + from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler + + + class MyExecutor(Executor): + @handler + async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None: + await ctx.yield_output(text.upper()) + + + async def create_executor() -> MyExecutor: + # Async initialization (e.g., fetching config, establishing connections) + return MyExecutor(id="executor") + + + # Build with async executor factories + workflow = await ( + WorkflowBuilder() + .register_executors({"MyExecutor": create_executor}) + .set_start_executor("MyExecutor") + .build_async() ) - build_attributes: dict[str, Any] = { - OtelAttr.WORKFLOW_ID: workflow.id, - OtelAttr.WORKFLOW_DEFINITION: workflow.to_json(), - } - if workflow.name: - build_attributes[OtelAttr.WORKFLOW_NAME] = workflow.name - if workflow.description: - build_attributes[OtelAttr.WORKFLOW_DESCRIPTION] = workflow.description - span.set_attributes(build_attributes) - # Add workflow build completed event - span.add_event(OtelAttr.BUILD_COMPLETED) + # The workflow is now immutable and ready to run + events = await workflow.run("hello") + print(events.get_outputs()) # ['HELLO'] + """ + # Create workflow build span that includes validation and workflow creation + with create_workflow_span(OtelAttr.WORKFLOW_BUILD_SPAN) as span: + try: + # Add workflow build started event + span.add_event(OtelAttr.BUILD_STARTED) - return workflow + # Resolve lazy edge registrations with support for async executor factories + start_executor, deferred_executors, deferred_edge_groups = await self._resolve_edge_registry_async() + # Create and validate workflow + return self._create_workflow_from_resolved_registry( + span, start_executor, deferred_executors, deferred_edge_groups + ) except Exception as exc: attributes = { OtelAttr.BUILD_ERROR_MESSAGE: str(exc), diff --git a/python/packages/core/tests/workflow/test_workflow_agent.py b/python/packages/core/tests/workflow/test_workflow_agent.py index f0f0ff7660..0f11818e84 100644 --- a/python/packages/core/tests/workflow/test_workflow_agent.py +++ b/python/packages/core/tests/workflow/test_workflow_agent.py @@ -677,7 +677,7 @@ async def start_executor(messages: list[ChatMessage], ctx: WorkflowContext[Agent # Build workflow: start -> agent1 (no output) -> agent2 (output_response=True) workflow = ( WorkflowBuilder() - .register_executor(lambda: start_executor, "start") + .register_executors({"start": lambda: start_executor}) .register_agent(lambda: MockAgent("agent1", "Agent1 output - should NOT appear"), "agent1") .register_agent(lambda: MockAgent("agent2", "Agent2 output - SHOULD appear"), "agent2") .set_start_executor("start") @@ -766,7 +766,7 @@ async def start_executor(messages: list[ChatMessage], ctx: WorkflowContext[Agent # Build workflow with single agent workflow = ( WorkflowBuilder() - .register_executor(lambda: start_executor, "start") + .register_executors({"start": lambda: start_executor}) .register_agent(lambda: MockAgent("agent", "Unique response text"), "agent") .set_start_executor("start") .add_edge("start", "agent") diff --git a/python/packages/core/tests/workflow/test_workflow_builder.py b/python/packages/core/tests/workflow/test_workflow_builder.py index 9b504fbaa5..bf006bf158 100644 --- a/python/packages/core/tests/workflow/test_workflow_builder.py +++ b/python/packages/core/tests/workflow/test_workflow_builder.py @@ -139,15 +139,252 @@ def test_add_agent_duplicate_id_raises_error(): builder.set_start_executor(agent1).add_edge(agent1, agent2).build() +# Tests for build() and build_async() with executor factories + + +def test_build_raises_error_with_async_executor_factories(): + """Test that build() raises ValueError when async executor factories are detected.""" + + async def create_async_executor() -> MockExecutor: + return MockExecutor(id="executor_async") + + builder = ( + WorkflowBuilder() + .register_executors({"AsyncExecutor": create_async_executor}) + .set_start_executor("AsyncExecutor") + ) + + # Attempting to build synchronously with async factories should raise an error + with pytest.raises(ValueError, match="Async executor factories were detected."): + builder.build() + + +def test_build_raises_error_when_factory_returns_non_executor(): + """Test that build() raises TypeError when a factory returns a non-Executor type.""" + + def create_invalid_executor() -> MockExecutor: # type: ignore[reportReturnType] + return "not an executor" # type: ignore[return-value] + + builder = ( + WorkflowBuilder() + .register_executors({"InvalidExecutor": create_invalid_executor}) + .set_start_executor("InvalidExecutor") + ) + + # Attempting to build with a factory that returns non-Executor should raise TypeError + with pytest.raises(TypeError, match=r"Factory 'InvalidExecutor' returned str instead of an Executor\."): + builder.build() + + +def test_build_raises_error_when_factory_returns_none(): + """Test that build() raises TypeError when a factory returns None.""" + + def create_none_executor() -> MockExecutor: # type: ignore[reportReturnType] + return None # type: ignore[return-value] + + builder = ( + WorkflowBuilder().register_executors({"NoneExecutor": create_none_executor}).set_start_executor("NoneExecutor") + ) + + # Attempting to build with a factory that returns None should raise TypeError + with pytest.raises(TypeError, match=r"Factory 'NoneExecutor' returned NoneType instead of an Executor\."): + builder.build() + + +def test_build_raises_error_when_sync_factory_raises_exception(): + """Test that build() propagates exceptions raised by sync factories.""" + + def create_failing_executor() -> MockExecutor: + raise RuntimeError("Sync factory failed!") + + builder = ( + WorkflowBuilder() + .register_executors({"FailingExecutor": create_failing_executor}) + .set_start_executor("FailingExecutor") + ) + + # Exception from sync factory should propagate + with pytest.raises(RuntimeError, match="Sync factory failed!"): + builder.build() + + +def test_build_raises_error_when_close_throws_exception(): + """Test that build() raises ValueError even when instance.close() throws an exception.""" + + class FailingCoroutine: + """A mock coroutine that raises an exception when close() is called.""" + + def close(self): + raise RuntimeError("Close failed!") + + def __await__(self): + # This makes it awaitable but we won't get here + return self # type: ignore[return-value] + + def create_failing_async_executor() -> MockExecutor: # type: ignore[reportReturnType] + return FailingCoroutine() # type: ignore[return-value] + + builder = ( + WorkflowBuilder() + .register_executors({"FailingExecutor": create_failing_async_executor}) + .set_start_executor("FailingExecutor") + ) + + # The ValueError about async factories should still be raised despite close() failing + with pytest.raises(ValueError, match="Async executor factories were detected."): + builder.build() + + +def test_build_calls_close_on_async_factory(): + """Test that build() actually calls close() on awaitable instances.""" + close_called = {"value": False} + + class TrackingCoroutine: + """A mock coroutine that tracks if close() is called.""" + + def close(self): + close_called["value"] = True + + def __await__(self): + return self # type: ignore[return-value] + + def create_tracking_async_executor() -> MockExecutor: # type: ignore[reportReturnType] + return TrackingCoroutine() # type: ignore[return-value] + + builder = ( + WorkflowBuilder() + .register_executors({"TrackingExecutor": create_tracking_async_executor}) + .set_start_executor("TrackingExecutor") + ) + + # Build should detect async factory and call close() + with pytest.raises(ValueError, match="Async executor factories were detected."): + builder.build() + + # Verify close() was actually called + assert close_called["value"], "close() should have been called on the awaitable instance" + + +def test_build_handles_awaitable_without_close_method(): + """Test that build() handles awaitable instances that don't have a close() method.""" + + class AwaitableWithoutClose: + """A mock awaitable without a close() method.""" + + def __await__(self): + return self # type: ignore[return-value] + + def create_awaitable_executor() -> MockExecutor: # type: ignore[reportReturnType] + return AwaitableWithoutClose() # type: ignore[return-value] + + builder = ( + WorkflowBuilder() + .register_executors({"AwaitableExecutor": create_awaitable_executor}) + .set_start_executor("AwaitableExecutor") + ) + + # Should still raise ValueError about async factories, even without close() method + with pytest.raises(ValueError, match="Async executor factories were detected."): + builder.build() + + +async def test_build_async_with_async_executor_factories(): + """Test that build_async() works with async executor factories.""" + + async def create_executor_a() -> MockExecutor: + return MockExecutor(id="executor_a") + + async def create_executor_b() -> MockExecutor: + return MockExecutor(id="executor_b") + + # Build workflow with async executor factories + workflow = await ( + WorkflowBuilder() + .register_executors({ + "ExecutorA": create_executor_a, + "ExecutorB": create_executor_b, + }) + .set_start_executor("ExecutorA") + .add_edge("ExecutorA", "ExecutorB") + .build_async() + ) + + assert workflow.start_executor_id == "executor_a" + assert len(workflow.executors) == 2 + assert "executor_a" in workflow.executors + assert "executor_b" in workflow.executors + + +async def test_build_async_with_mixed_sync_and_async_factories(): + """Test that build_async() works with a mix of sync and async executor factories.""" + + def create_sync_executor() -> MockExecutor: + return MockExecutor(id="executor_sync") + + async def create_async_executor() -> MockExecutor: + return MockExecutor(id="executor_async") + + # Build workflow with mixed sync and async executor factories + workflow = await ( + WorkflowBuilder() + .register_executors({ + "SyncExecutor": create_sync_executor, + "AsyncExecutor": create_async_executor, + }) + .set_start_executor("SyncExecutor") + .add_edge("SyncExecutor", "AsyncExecutor") + .build_async() + ) + + assert workflow.start_executor_id == "executor_sync" + assert len(workflow.executors) == 2 + assert "executor_sync" in workflow.executors + assert "executor_async" in workflow.executors + + +async def test_build_async_raises_error_when_factory_returns_non_executor(): + """Test that build_async() raises TypeError when an async factory returns a non-Executor type.""" + + async def create_invalid_executor() -> MockExecutor: # type: ignore[reportReturnType] + return None # type: ignore[return-value] + + builder = ( + WorkflowBuilder() + .register_executors({"InvalidExecutor": create_invalid_executor}) + .set_start_executor("InvalidExecutor") + ) + + # Attempting to build with an async factory that returns non-Executor should raise TypeError + with pytest.raises(TypeError, match=r"Factory 'InvalidExecutor' returned NoneType instead of an Executor\."): + await builder.build_async() + + +async def test_build_async_raises_error_when_factory_raises_exception(): + """Test that build_async() propagates exceptions raised by async factories.""" + + async def create_failing_executor() -> MockExecutor: + raise RuntimeError("Factory failed!") + + builder = ( + WorkflowBuilder() + .register_executors({"FailingExecutor": create_failing_executor}) + .set_start_executor("FailingExecutor") + ) + + # Exception from async factory should propagate + with pytest.raises(RuntimeError, match="Factory failed!"): + await builder.build_async() + + # Tests for new executor registration patterns -def test_register_executor_basic(): +def test_register_executors_basic(): """Test basic executor registration with lazy initialization.""" builder = WorkflowBuilder() # Register an executor factory - ID must match the registered name - result = builder.register_executor(lambda: MockExecutor(id="TestExecutor"), name="TestExecutor") + result = builder.register_executors({"TestExecutor": lambda: MockExecutor(id="TestExecutor")}) # Verify that register returns the builder for chaining assert result is builder @@ -159,15 +396,18 @@ def test_register_executor_basic(): def test_register_multiple_executors(): - """Test registering multiple executors and connecting them with edges.""" + """Test registering multiple executors with a mapping of factories and connecting them with edges.""" builder = WorkflowBuilder() - # Register multiple executors - IDs must match registered names - builder.register_executor(lambda: MockExecutor(id="ExecutorA"), name="ExecutorA") - builder.register_executor(lambda: MockExecutor(id="ExecutorB"), name="ExecutorB") - builder.register_executor(lambda: MockExecutor(id="ExecutorC"), name="ExecutorC") + # IDs must match registered names + result = builder.register_executors({ + "ExecutorA": lambda: MockExecutor(id="ExecutorA"), + "ExecutorB": lambda: MockExecutor(id="ExecutorB"), + "ExecutorC": lambda: MockExecutor(id="ExecutorC"), + }) + + assert result is builder - # Build workflow with edges using registered names workflow = ( builder .set_start_executor("ExecutorA") @@ -176,13 +416,56 @@ def test_register_multiple_executors(): .build() ) - # Verify all executors are present assert "ExecutorA" in workflow.executors assert "ExecutorB" in workflow.executors assert "ExecutorC" in workflow.executors assert workflow.start_executor_id == "ExecutorA" +def test_register_executors_multiple_calls_with_non_overlapping_names(): + """Test that register_executors can be called multiple times with non-overlapping names.""" + builder = WorkflowBuilder() + + # First registration + builder.register_executors({"ExecutorA": lambda: MockExecutor(id="ExecutorA")}) + + # Second registration with different names + builder.register_executors({ + "ExecutorB": lambda: MockExecutor(id="ExecutorB"), + "ExecutorC": lambda: MockExecutor(id="ExecutorC"), + }) + + # Build workflow and verify all executors are present + workflow = ( + builder + .set_start_executor("ExecutorA") + .add_edge("ExecutorA", "ExecutorB") + .add_edge("ExecutorB", "ExecutorC") + .build() + ) + + assert "ExecutorA" in workflow.executors + assert "ExecutorB" in workflow.executors + assert "ExecutorC" in workflow.executors + + +def test_register_executors_rejects_empty_inputs(): + """Test that empty executor mappings and entries are rejected.""" + builder = WorkflowBuilder() + + with pytest.raises(ValueError, match="cannot be empty"): + builder.register_executors({}) + + with pytest.raises(ValueError, match="name cannot be empty"): + builder.register_executors({"": lambda: MockExecutor(id="ExecutorA")}) + + with pytest.raises(ValueError, match="cannot be empty or whitespace-only"): + builder.register_executors({" ": lambda: MockExecutor(id="ExecutorA")}) + + with pytest.raises(TypeError, match="must be callable"): + builder.register_executors({"ExecutorA": None}) # type: ignore[arg-type] + + def test_register_with_multiple_names(): """Test registering the same factory function under multiple names.""" builder = WorkflowBuilder() @@ -195,7 +478,7 @@ def make_executor(): counter["val"] += 1 return MockExecutor(id="ExecutorA" if counter["val"] == 1 else "ExecutorB") - builder.register_executor(make_executor, name=["ExecutorA", "ExecutorB"]) + builder.register_executors({"ExecutorA": make_executor, "ExecutorB": make_executor}) # Set up workflow workflow = builder.set_start_executor("ExecutorA").add_edge("ExecutorA", "ExecutorB").build() @@ -211,24 +494,71 @@ def test_register_duplicate_name_raises_error(): builder = WorkflowBuilder() # Register first executor - builder.register_executor(lambda: MockExecutor(id="executor_1"), name="MyExecutor") + builder.register_executors({"MyExecutor": lambda: MockExecutor(id="executor_1")}) # Registering second executor with same name should raise ValueError with pytest.raises(ValueError, match="already registered"): - builder.register_executor(lambda: MockExecutor(id="executor_2"), name="MyExecutor") + builder.register_executors({"MyExecutor": lambda: MockExecutor(id="executor_2")}) def test_register_duplicate_id_raises_error(): """Test that registering duplicate id raises an error.""" - builder = WorkflowBuilder() - - # Register first executor - builder.register_executor(lambda: MockExecutor(id="executor"), name="MyExecutor1") - builder.register_executor(lambda: MockExecutor(id="executor"), name="MyExecutor2") + builder = WorkflowBuilder().register_executors({ + "MyExecutor1": lambda: MockExecutor(id="executor"), + "MyExecutor2": lambda: MockExecutor(id="executor"), + }) builder.set_start_executor("MyExecutor1") # Registering second executor with same ID should raise ValueError - with pytest.raises(ValueError, match="Executor with ID 'executor' has already been registered."): + with pytest.raises( + ValueError, + match=r"Executor with ID 'executor' from factory 'MyExecutor2' conflicts with existing factory 'MyExecutor1'\.", + ): + builder.build() + + +def test_register_factory_conflicts_with_direct_executor(): + """Test that a factory-produced executor conflicting with a directly-added executor raises an error.""" + # Add an executor directly via add_edge + direct_executor = MockExecutor(id="executor") + builder = WorkflowBuilder() + builder.add_edge(direct_executor, direct_executor) # Add it to self._executors + + # Now register a factory that produces the same ID + builder.register_executors({"MyFactory": lambda: MockExecutor(id="executor")}) + builder.set_start_executor("MyFactory") + + # Building should raise ValueError + with pytest.raises(ValueError, match=r"Executor with ID 'executor' has already been added to the workflow\."): + builder.build() + + +def test_unregistered_factory_name_in_edge_raises_error(): + """Test that referencing an unregistered factory name in an edge raises ValueError.""" + builder = WorkflowBuilder() + + # Register only ExecutorA + builder.register_executors({"ExecutorA": lambda: MockExecutor(id="ExecutorA")}) + builder.set_start_executor("ExecutorA") + + # Add edge to unregistered ExecutorB + builder.add_edge("ExecutorA", "ExecutorB") + + # Building should raise ValueError because ExecutorB was never registered + with pytest.raises(ValueError, match="Factory 'ExecutorB' has not been registered"): + builder.build() + + +def test_unregistered_start_executor_factory_name_raises_error(): + """Test that setting an unregistered factory name as start executor raises ValueError.""" + builder = WorkflowBuilder() + + # Register ExecutorA but set start to unregistered ExecutorB + builder.register_executors({"ExecutorA": lambda: MockExecutor(id="ExecutorA")}) + builder.set_start_executor("ExecutorB") + + # Building should raise ValueError because ExecutorB was never registered + with pytest.raises(ValueError, match="Failed to resolve starting executor from registered factories"): builder.build() @@ -286,8 +616,10 @@ def test_register_and_add_edge_with_strings(): builder = WorkflowBuilder() # Register executors - builder.register_executor(lambda: MockExecutor(id="source"), name="Source") - builder.register_executor(lambda: MockExecutor(id="target"), name="Target") + builder.register_executors({ + "Source": lambda: MockExecutor(id="source"), + "Target": lambda: MockExecutor(id="target"), + }) # Add edge using string names workflow = builder.set_start_executor("Source").add_edge("Source", "Target").build() @@ -321,9 +653,11 @@ def test_register_with_fan_out_edges(): builder = WorkflowBuilder() # Register executors - IDs must match registered names - builder.register_executor(lambda: MockExecutor(id="Source"), name="Source") - builder.register_executor(lambda: MockExecutor(id="Target1"), name="Target1") - builder.register_executor(lambda: MockExecutor(id="Target2"), name="Target2") + builder.register_executors({ + "Source": lambda: MockExecutor(id="Source"), + "Target1": lambda: MockExecutor(id="Target1"), + "Target2": lambda: MockExecutor(id="Target2"), + }) # Add fan-out edges using registered names workflow = builder.set_start_executor("Source").add_fan_out_edges("Source", ["Target1", "Target2"]).build() @@ -339,9 +673,11 @@ def test_register_with_fan_in_edges(): builder = WorkflowBuilder() # Register executors - IDs must match registered names - builder.register_executor(lambda: MockExecutor(id="Source1"), name="Source1") - builder.register_executor(lambda: MockExecutor(id="Source2"), name="Source2") - builder.register_executor(lambda: MockAggregator(id="Aggregator"), name="Aggregator") + builder.register_executors({ + "Source1": lambda: MockExecutor(id="Source1"), + "Source2": lambda: MockExecutor(id="Source2"), + "Aggregator": lambda: MockAggregator(id="Aggregator"), + }) # Add fan-in edges using registered names # Both Source1 and Source2 need to be reachable, so connect Source1 to Source2 @@ -364,9 +700,11 @@ def test_register_with_chain(): builder = WorkflowBuilder() # Register executors - IDs must match registered names - builder.register_executor(lambda: MockExecutor(id="Step1"), name="Step1") - builder.register_executor(lambda: MockExecutor(id="Step2"), name="Step2") - builder.register_executor(lambda: MockExecutor(id="Step3"), name="Step3") + builder.register_executors({ + "Step1": lambda: MockExecutor(id="Step1"), + "Step2": lambda: MockExecutor(id="Step2"), + "Step3": lambda: MockExecutor(id="Step3"), + }) # Add chain using registered names workflow = builder.add_chain(["Step1", "Step2", "Step3"]).set_start_executor("Step1").build() @@ -388,7 +726,7 @@ def factory(): return MockExecutor(id="Test") builder = WorkflowBuilder() - builder.register_executor(factory, name="Test") + builder.register_executors({"Test": factory}) # Factory should not be called yet assert call_count == 0 @@ -415,7 +753,7 @@ def test_mixing_eager_and_lazy_initialization_error(): eager_executor = MockExecutor(id="eager") # Register a lazy executor - builder.register_executor(lambda: MockExecutor(id="Lazy"), name="Lazy") + builder.register_executors({"Lazy": lambda: MockExecutor(id="Lazy")}) # Mixing eager and lazy should raise an error during add_edge with pytest.raises( @@ -436,8 +774,10 @@ def condition_func(msg: MockMessage) -> bool: return msg.data > 0 # Register executors - IDs must match registered names - builder.register_executor(lambda: MockExecutor(id="Source"), name="Source") - builder.register_executor(lambda: MockExecutor(id="Target"), name="Target") + builder.register_executors({ + "Source": lambda: MockExecutor(id="Source"), + "Target": lambda: MockExecutor(id="Target"), + }) # Add edge with condition workflow = builder.set_start_executor("Source").add_edge("Source", "Target", condition=condition_func).build() @@ -518,8 +858,10 @@ def test_with_output_from_with_registered_names(): """Test with_output_from with registered factory names (strings).""" workflow = ( WorkflowBuilder() - .register_executor(lambda: MockExecutor(id="ExecutorA"), name="ExecutorAFactory") - .register_executor(lambda: MockExecutor(id="ExecutorB"), name="ExecutorBFactory") + .register_executors({ + "ExecutorAFactory": lambda: MockExecutor(id="ExecutorA"), + "ExecutorBFactory": lambda: MockExecutor(id="ExecutorB"), + }) .set_start_executor("ExecutorAFactory") .add_edge("ExecutorAFactory", "ExecutorBFactory") .with_output_from(["ExecutorBFactory"]) diff --git a/python/samples/getting_started/workflows/_start-here/step4_using_factories.py b/python/samples/getting_started/workflows/_start-here/step4_using_factories.py index 166514f7ac..93283783d6 100644 --- a/python/samples/getting_started/workflows/_start-here/step4_using_factories.py +++ b/python/samples/getting_started/workflows/_start-here/step4_using_factories.py @@ -67,15 +67,17 @@ def create_agent() -> ChatAgent: async def main(): """Build and run a simple 2-step workflow using the fluent builder API.""" # Build the workflow using a fluent pattern: - # 1) register_executor(factory, name) registers an executor factory + # 1) register_executors({name: factory}) registers executor factories # 2) register_agent(factory, name) registers an agent factory # 3) add_chain([node_names]) adds a sequence of nodes to the workflow # 4) set_start_executor(node) declares the entry point # 5) build() finalizes and returns an immutable Workflow object workflow = ( WorkflowBuilder() - .register_executor(lambda: UpperCase(id="upper_case_executor"), name="UpperCase") - .register_executor(lambda: reverse_text, name="ReverseText") + .register_executors({ + "UpperCase": lambda: UpperCase(id="upper_case_executor"), + "ReverseText": lambda: reverse_text, + }) .register_agent(create_agent, name="DecoderAgent") .add_chain(["UpperCase", "ReverseText", "DecoderAgent"]) .set_start_executor("UpperCase") diff --git a/python/samples/getting_started/workflows/agents/azure_ai_agents_with_shared_thread.py b/python/samples/getting_started/workflows/agents/azure_ai_agents_with_shared_thread.py index 874cb2956a..b695ebd93d 100644 --- a/python/samples/getting_started/workflows/agents/azure_ai_agents_with_shared_thread.py +++ b/python/samples/getting_started/workflows/agents/azure_ai_agents_with_shared_thread.py @@ -74,10 +74,7 @@ async def main() -> None: WorkflowBuilder() .register_agent(factory_func=lambda: writer, name="writer", agent_thread=shared_thread) .register_agent(factory_func=lambda: reviewer, name="reviewer", agent_thread=shared_thread) - .register_executor( - factory_func=lambda: intercept_agent_response, - name="intercept_agent_response", - ) + .register_executors({"intercept_agent_response": lambda: intercept_agent_response}) .add_chain(["writer", "intercept_agent_response", "reviewer"]) .set_start_executor("writer") .build() diff --git a/python/samples/getting_started/workflows/agents/azure_chat_agents_tool_calls_with_feedback.py b/python/samples/getting_started/workflows/agents/azure_chat_agents_tool_calls_with_feedback.py index 457defcf51..5c87c4e824 100644 --- a/python/samples/getting_started/workflows/agents/azure_chat_agents_tool_calls_with_feedback.py +++ b/python/samples/getting_started/workflows/agents/azure_chat_agents_tool_calls_with_feedback.py @@ -243,14 +243,13 @@ async def main() -> None: WorkflowBuilder() .register_agent(create_writer_agent, name="writer_agent") .register_agent(create_final_editor_agent, name="final_editor_agent") - .register_executor( - lambda: Coordinator( + .register_executors({ + "coordinator": lambda: Coordinator( id="coordinator", writer_id="writer_agent", final_editor_id="final_editor_agent", ), - name="coordinator", - ) + }) .set_start_executor("writer_agent") .add_edge("writer_agent", "coordinator") .add_edge("coordinator", "writer_agent") diff --git a/python/samples/getting_started/workflows/agents/custom_agent_executors.py b/python/samples/getting_started/workflows/agents/custom_agent_executors.py index cab73bc761..906e9f7e09 100644 --- a/python/samples/getting_started/workflows/agents/custom_agent_executors.py +++ b/python/samples/getting_started/workflows/agents/custom_agent_executors.py @@ -24,7 +24,7 @@ pattern with typed inputs and typed WorkflowContext[T] outputs, connect executors with the fluent WorkflowBuilder, and finish by yielding outputs from the terminal node. -Note: When an agent is passed to a workflow, the workflow essenatially wrap the agent in a more sophisticated executor. +Note: When an agent is passed to a workflow, the workflow essentially wraps the agent in a more sophisticated executor. Prerequisites: - Azure OpenAI configured for AzureOpenAIChatClient with required environment variables. diff --git a/python/samples/getting_started/workflows/agents/workflow_as_agent_human_in_the_loop.py b/python/samples/getting_started/workflows/agents/workflow_as_agent_human_in_the_loop.py index d1bdcb71ba..4bcbf176a3 100644 --- a/python/samples/getting_started/workflows/agents/workflow_as_agent_human_in_the_loop.py +++ b/python/samples/getting_started/workflows/agents/workflow_as_agent_human_in_the_loop.py @@ -100,16 +100,14 @@ async def main() -> None: # and escalation paths for human review. agent = ( WorkflowBuilder() - .register_executor( - lambda: Worker( - id="sub-worker", - chat_client=AzureOpenAIChatClient(credential=AzureCliCredential()), - ), - name="worker", - ) - .register_executor( - lambda: ReviewerWithHumanInTheLoop(worker_id="sub-worker"), - name="reviewer", + .register_executors( + { + "worker": lambda: Worker( + id="sub-worker", + chat_client=AzureOpenAIChatClient(credential=AzureCliCredential()), + ), + "reviewer": lambda: ReviewerWithHumanInTheLoop(worker_id="sub-worker"), + } ) .add_edge("worker", "reviewer") # Worker sends requests to Reviewer .add_edge("reviewer", "worker") # Reviewer sends feedback to Worker diff --git a/python/samples/getting_started/workflows/agents/workflow_as_agent_reflection_pattern.py b/python/samples/getting_started/workflows/agents/workflow_as_agent_reflection_pattern.py index 2db380ea77..77c9168113 100644 --- a/python/samples/getting_started/workflows/agents/workflow_as_agent_reflection_pattern.py +++ b/python/samples/getting_started/workflows/agents/workflow_as_agent_reflection_pattern.py @@ -188,13 +188,11 @@ async def main() -> None: print("Building workflow with Worker ↔ Reviewer cycle...") agent = ( WorkflowBuilder() - .register_executor( - lambda: Worker(id="worker", chat_client=OpenAIChatClient(model_id="gpt-4.1-nano")), - name="worker", - ) - .register_executor( - lambda: Reviewer(id="reviewer", chat_client=OpenAIChatClient(model_id="gpt-4.1")), - name="reviewer", + .register_executors( + { + "worker": lambda: Worker(id="worker", chat_client=OpenAIChatClient(model_id="gpt-4.1-nano")), + "reviewer": lambda: Reviewer(id="reviewer", chat_client=OpenAIChatClient(model_id="gpt-4.1")), + } ) .add_edge("worker", "reviewer") # Worker sends responses to Reviewer .add_edge("reviewer", "worker") # Reviewer provides feedback to Worker diff --git a/python/samples/getting_started/workflows/checkpoint/checkpoint_with_human_in_the_loop.py b/python/samples/getting_started/workflows/checkpoint/checkpoint_with_human_in_the_loop.py index df7c5b1445..cf028ea304 100644 --- a/python/samples/getting_started/workflows/checkpoint/checkpoint_with_human_in_the_loop.py +++ b/python/samples/getting_started/workflows/checkpoint/checkpoint_with_human_in_the_loop.py @@ -188,8 +188,12 @@ def create_workflow(checkpoint_storage: FileCheckpointStorage) -> Workflow: ), name="writer", ) - .register_executor(lambda: ReviewGateway(id="review_gateway", writer_id="writer"), name="review_gateway") - .register_executor(lambda: BriefPreparer(id="prepare_brief", agent_id="writer"), name="prepare_brief") + .register_executors( + { + "review_gateway": lambda: ReviewGateway(id="review_gateway", writer_id="writer"), + "prepare_brief": lambda: BriefPreparer(id="prepare_brief", agent_id="writer"), + } + ) .set_start_executor("prepare_brief") .add_edge("prepare_brief", "writer") .add_edge("writer", "review_gateway") diff --git a/python/samples/getting_started/workflows/checkpoint/checkpoint_with_resume.py b/python/samples/getting_started/workflows/checkpoint/checkpoint_with_resume.py index ff23b1af5b..4b91039929 100644 --- a/python/samples/getting_started/workflows/checkpoint/checkpoint_with_resume.py +++ b/python/samples/getting_started/workflows/checkpoint/checkpoint_with_resume.py @@ -106,8 +106,12 @@ async def main(): # Build workflow with checkpointing enabled workflow_builder = ( WorkflowBuilder() - .register_executor(lambda: StartExecutor(id="start"), name="start") - .register_executor(lambda: WorkerExecutor(id="worker"), name="worker") + .register_executors( + { + "start": lambda: StartExecutor(id="start"), + "worker": lambda: WorkerExecutor(id="worker"), + } + ) .set_start_executor("start") .add_edge("start", "worker") .add_edge("worker", "worker") # Self-loop for iterative processing diff --git a/python/samples/getting_started/workflows/checkpoint/sub_workflow_checkpoint.py b/python/samples/getting_started/workflows/checkpoint/sub_workflow_checkpoint.py index 267cfdfb60..1b2a77cad4 100644 --- a/python/samples/getting_started/workflows/checkpoint/sub_workflow_checkpoint.py +++ b/python/samples/getting_started/workflows/checkpoint/sub_workflow_checkpoint.py @@ -299,9 +299,7 @@ def build_sub_workflow() -> WorkflowExecutor: """Assemble the sub-workflow used by the parent workflow executor.""" sub_workflow = ( WorkflowBuilder() - .register_executor(DraftWriter, name="writer") - .register_executor(DraftReviewRouter, name="router") - .register_executor(DraftFinaliser, name="finaliser") + .register_executors({"writer": DraftWriter, "router": DraftReviewRouter, "finaliser": DraftFinaliser}) .set_start_executor("writer") .add_edge("writer", "router") .add_edge("router", "finaliser") @@ -316,8 +314,7 @@ def build_parent_workflow(storage: FileCheckpointStorage) -> Workflow: """Assemble the parent workflow that embeds the sub-workflow.""" return ( WorkflowBuilder() - .register_executor(LaunchCoordinator, name="coordinator") - .register_executor(build_sub_workflow, name="sub_executor") + .register_executors({"coordinator": LaunchCoordinator, "sub_executor": build_sub_workflow}) .set_start_executor("coordinator") .add_edge("coordinator", "sub_executor") .add_edge("sub_executor", "coordinator") diff --git a/python/samples/getting_started/workflows/composition/sub_workflow_basics.py b/python/samples/getting_started/workflows/composition/sub_workflow_basics.py index 826425a0ae..14d07d74e0 100644 --- a/python/samples/getting_started/workflows/composition/sub_workflow_basics.py +++ b/python/samples/getting_started/workflows/composition/sub_workflow_basics.py @@ -142,7 +142,7 @@ def create_sub_workflow() -> WorkflowExecutor: processing_workflow = ( WorkflowBuilder() - .register_executor(TextProcessor, name="text_processor") + .register_executors({"text_processor": TextProcessor}) .set_start_executor("text_processor") .build() ) @@ -156,8 +156,12 @@ async def main(): # Step 1: Create the parent workflow main_workflow = ( WorkflowBuilder() - .register_executor(TextProcessingOrchestrator, name="text_orchestrator") - .register_executor(create_sub_workflow, name="text_processor_workflow") + .register_executors( + { + "text_orchestrator": TextProcessingOrchestrator, + "text_processor_workflow": create_sub_workflow, + } + ) .set_start_executor("text_orchestrator") .add_edge("text_orchestrator", "text_processor_workflow") .add_edge("text_processor_workflow", "text_orchestrator") diff --git a/python/samples/getting_started/workflows/composition/sub_workflow_parallel_requests.py b/python/samples/getting_started/workflows/composition/sub_workflow_parallel_requests.py index 58ee575684..d2f504e244 100644 --- a/python/samples/getting_started/workflows/composition/sub_workflow_parallel_requests.py +++ b/python/samples/getting_started/workflows/composition/sub_workflow_parallel_requests.py @@ -171,10 +171,14 @@ async def collect(self, response: ResourceResponse | PolicyResponse, ctx: Workfl return ( WorkflowBuilder() - .register_executor(lambda: RequestDistribution("orchestrator"), name="orchestrator") - .register_executor(lambda: ResourceRequester("resource_requester"), name="resource_requester") - .register_executor(lambda: PolicyChecker("policy_checker"), name="policy_checker") - .register_executor(lambda: ResultCollector("result_collector"), name="result_collector") + .register_executors( + { + "orchestrator": lambda: RequestDistribution("orchestrator"), + "resource_requester": lambda: ResourceRequester("resource_requester"), + "policy_checker": lambda: PolicyChecker("policy_checker"), + "result_collector": lambda: ResultCollector("result_collector"), + } + ) .set_start_executor("orchestrator") .add_edge("orchestrator", "resource_requester") .add_edge("orchestrator", "policy_checker") @@ -290,18 +294,19 @@ async def main() -> None: # Build the main workflow main_workflow = ( WorkflowBuilder() - .register_executor(lambda: ResourceAllocator("resource_allocator"), name="resource_allocator") - .register_executor(lambda: PolicyEngine("policy_engine"), name="policy_engine") - .register_executor( - lambda: WorkflowExecutor( - build_resource_request_distribution_workflow(), - "sub_workflow_executor", - # Setting allow_direct_output=True to let the sub-workflow output directly. - # This is because the sub-workflow is the both the entry point and the exit - # point of the main workflow. - allow_direct_output=True, - ), - name="sub_workflow_executor", + .register_executors( + { + "resource_allocator": lambda: ResourceAllocator("resource_allocator"), + "policy_engine": lambda: PolicyEngine("policy_engine"), + "sub_workflow_executor": lambda: WorkflowExecutor( + build_resource_request_distribution_workflow(), + "sub_workflow_executor", + # Setting allow_direct_output=True to let the sub-workflow output directly. + # This is because the sub-workflow is the both the entry point and the exit + # point of the main workflow. + allow_direct_output=True, + ), + } ) .set_start_executor("sub_workflow_executor") .add_edge("sub_workflow_executor", "resource_allocator") diff --git a/python/samples/getting_started/workflows/composition/sub_workflow_request_interception.py b/python/samples/getting_started/workflows/composition/sub_workflow_request_interception.py index 9b0637652b..522346191c 100644 --- a/python/samples/getting_started/workflows/composition/sub_workflow_request_interception.py +++ b/python/samples/getting_started/workflows/composition/sub_workflow_request_interception.py @@ -155,9 +155,13 @@ async def handle_domain_validation_response( # Build the workflow return ( WorkflowBuilder() - .register_executor(lambda: EmailSanitizer(id="email_sanitizer"), name="email_sanitizer") - .register_executor(lambda: EmailFormatValidator(id="email_format_validator"), name="email_format_validator") - .register_executor(lambda: DomainValidator(id="domain_validator"), name="domain_validator") + .register_executors( + { + "email_sanitizer": lambda: EmailSanitizer(id="email_sanitizer"), + "email_format_validator": lambda: EmailFormatValidator(id="email_format_validator"), + "domain_validator": lambda: DomainValidator(id="domain_validator"), + } + ) .set_start_executor("email_sanitizer") .add_edge("email_sanitizer", "email_format_validator") .add_edge("email_format_validator", "domain_validator") @@ -271,14 +275,17 @@ async def main() -> None: # Build the main workflow workflow = ( WorkflowBuilder() - .register_executor( - lambda: SmartEmailOrchestrator(id="smart_email_orchestrator", approved_domains=approved_domains), - name="smart_email_orchestrator", - ) - .register_executor(lambda: EmailDelivery(id="email_delivery"), name="email_delivery") - .register_executor( - lambda: WorkflowExecutor(build_email_address_validation_workflow(), id="email_validation_workflow"), - name="email_validation_workflow", + .register_executors( + { + "smart_email_orchestrator": lambda: SmartEmailOrchestrator( + id="smart_email_orchestrator", approved_domains=approved_domains + ), + "email_delivery": lambda: EmailDelivery(id="email_delivery"), + "email_validation_workflow": lambda: WorkflowExecutor( + build_email_address_validation_workflow(), + id="email_validation_workflow", + ), + } ) .set_start_executor("smart_email_orchestrator") .add_edge("smart_email_orchestrator", "email_validation_workflow") diff --git a/python/samples/getting_started/workflows/control-flow/edge_condition.py b/python/samples/getting_started/workflows/control-flow/edge_condition.py index 8c7dc4b760..6842a26924 100644 --- a/python/samples/getting_started/workflows/control-flow/edge_condition.py +++ b/python/samples/getting_started/workflows/control-flow/edge_condition.py @@ -165,9 +165,13 @@ async def main() -> None: WorkflowBuilder() .register_agent(create_spam_detector_agent, name="spam_detection_agent") .register_agent(create_email_assistant_agent, name="email_assistant_agent") - .register_executor(lambda: to_email_assistant_request, name="to_email_assistant_request") - .register_executor(lambda: handle_email_response, name="send_email") - .register_executor(lambda: handle_spam_classifier_response, name="handle_spam") + .register_executors( + { + "to_email_assistant_request": lambda: to_email_assistant_request, + "send_email": lambda: handle_email_response, + "handle_spam": lambda: handle_spam_classifier_response, + } + ) .set_start_executor("spam_detection_agent") # Not spam path: transform response -> request for assistant -> assistant -> send email .add_edge("spam_detection_agent", "to_email_assistant_request", condition=get_condition(False)) diff --git a/python/samples/getting_started/workflows/control-flow/multi_selection_edge_group.py b/python/samples/getting_started/workflows/control-flow/multi_selection_edge_group.py index 67058435c9..f9e720abfb 100644 --- a/python/samples/getting_started/workflows/control-flow/multi_selection_edge_group.py +++ b/python/samples/getting_started/workflows/control-flow/multi_selection_edge_group.py @@ -229,15 +229,19 @@ def select_targets(analysis: AnalysisResult, target_ids: list[str]) -> list[str] .register_agent(create_email_analysis_agent, name="email_analysis_agent") .register_agent(create_email_assistant_agent, name="email_assistant_agent") .register_agent(create_email_summary_agent, name="email_summary_agent") - .register_executor(lambda: store_email, name="store_email") - .register_executor(lambda: to_analysis_result, name="to_analysis_result") - .register_executor(lambda: submit_to_email_assistant, name="submit_to_email_assistant") - .register_executor(lambda: finalize_and_send, name="finalize_and_send") - .register_executor(lambda: summarize_email, name="summarize_email") - .register_executor(lambda: merge_summary, name="merge_summary") - .register_executor(lambda: handle_spam, name="handle_spam") - .register_executor(lambda: handle_uncertain, name="handle_uncertain") - .register_executor(lambda: database_access, name="database_access") + .register_executors( + { + "store_email": lambda: store_email, + "to_analysis_result": lambda: to_analysis_result, + "submit_to_email_assistant": lambda: submit_to_email_assistant, + "finalize_and_send": lambda: finalize_and_send, + "summarize_email": lambda: summarize_email, + "merge_summary": lambda: merge_summary, + "handle_spam": lambda: handle_spam, + "handle_uncertain": lambda: handle_uncertain, + "database_access": lambda: database_access, + } + ) ) workflow = ( diff --git a/python/samples/getting_started/workflows/control-flow/sequential_executors.py b/python/samples/getting_started/workflows/control-flow/sequential_executors.py index d69aafcfe9..b214479ed8 100644 --- a/python/samples/getting_started/workflows/control-flow/sequential_executors.py +++ b/python/samples/getting_started/workflows/control-flow/sequential_executors.py @@ -64,8 +64,12 @@ async def main() -> None: # Order matters. We connect upper_case_executor -> reverse_text_executor and set the start. workflow = ( WorkflowBuilder() - .register_executor(lambda: UpperCaseExecutor(id="upper_case_executor"), name="upper_case_executor") - .register_executor(lambda: ReverseTextExecutor(id="reverse_text_executor"), name="reverse_text_executor") + .register_executors( + { + "upper_case_executor": lambda: UpperCaseExecutor(id="upper_case_executor"), + "reverse_text_executor": lambda: ReverseTextExecutor(id="reverse_text_executor"), + } + ) .add_edge("upper_case_executor", "reverse_text_executor") .set_start_executor("upper_case_executor") .build() diff --git a/python/samples/getting_started/workflows/control-flow/sequential_streaming.py b/python/samples/getting_started/workflows/control-flow/sequential_streaming.py index cb06157d1a..01a6161770 100644 --- a/python/samples/getting_started/workflows/control-flow/sequential_streaming.py +++ b/python/samples/getting_started/workflows/control-flow/sequential_streaming.py @@ -57,8 +57,12 @@ async def main(): # Order matters. upper_case_executor runs first, then reverse_text_executor. workflow = ( WorkflowBuilder() - .register_executor(lambda: to_upper_case, name="upper_case_executor") - .register_executor(lambda: reverse_text, name="reverse_text_executor") + .register_executors( + { + "upper_case_executor": lambda: to_upper_case, + "reverse_text_executor": lambda: reverse_text, + } + ) .add_edge("upper_case_executor", "reverse_text_executor") .set_start_executor("upper_case_executor") .build() diff --git a/python/samples/getting_started/workflows/control-flow/simple_loop.py b/python/samples/getting_started/workflows/control-flow/simple_loop.py index e9fca78510..6b4f28ab43 100644 --- a/python/samples/getting_started/workflows/control-flow/simple_loop.py +++ b/python/samples/getting_started/workflows/control-flow/simple_loop.py @@ -127,10 +127,14 @@ async def main(): # This time we are creating a loop in the workflow. workflow = ( WorkflowBuilder() - .register_executor(lambda: GuessNumberExecutor((1, 100), "guess_number"), name="guess_number") + .register_executors( + { + "guess_number": lambda: GuessNumberExecutor((1, 100), "guess_number"), + "submit_judge": lambda: SubmitToJudgeAgent(judge_agent_id="judge_agent", target=30), + "parse_judge": lambda: ParseJudgeResponse(id="parse_judge"), + } + ) .register_agent(create_judge_agent, name="judge_agent") - .register_executor(lambda: SubmitToJudgeAgent(judge_agent_id="judge_agent", target=30), name="submit_judge") - .register_executor(lambda: ParseJudgeResponse(id="parse_judge"), name="parse_judge") .add_edge("guess_number", "submit_judge") .add_edge("submit_judge", "judge_agent") .add_edge("judge_agent", "parse_judge") diff --git a/python/samples/getting_started/workflows/control-flow/switch_case_edge_group.py b/python/samples/getting_started/workflows/control-flow/switch_case_edge_group.py index b4d1852e9a..d3fd0d30ab 100644 --- a/python/samples/getting_started/workflows/control-flow/switch_case_edge_group.py +++ b/python/samples/getting_started/workflows/control-flow/switch_case_edge_group.py @@ -182,12 +182,16 @@ async def main(): WorkflowBuilder() .register_agent(create_spam_detection_agent, name="spam_detection_agent") .register_agent(create_email_assistant_agent, name="email_assistant_agent") - .register_executor(lambda: store_email, name="store_email") - .register_executor(lambda: to_detection_result, name="to_detection_result") - .register_executor(lambda: submit_to_email_assistant, name="submit_to_email_assistant") - .register_executor(lambda: finalize_and_send, name="finalize_and_send") - .register_executor(lambda: handle_spam, name="handle_spam") - .register_executor(lambda: handle_uncertain, name="handle_uncertain") + .register_executors( + { + "store_email": lambda: store_email, + "to_detection_result": lambda: to_detection_result, + "submit_to_email_assistant": lambda: submit_to_email_assistant, + "finalize_and_send": lambda: finalize_and_send, + "handle_spam": lambda: handle_spam, + "handle_uncertain": lambda: handle_uncertain, + } + ) .set_start_executor("store_email") .add_edge("store_email", "spam_detection_agent") .add_edge("spam_detection_agent", "to_detection_result") diff --git a/python/samples/getting_started/workflows/control-flow/workflow_cancellation.py b/python/samples/getting_started/workflows/control-flow/workflow_cancellation.py index e921fbe9cf..2548326cd2 100644 --- a/python/samples/getting_started/workflows/control-flow/workflow_cancellation.py +++ b/python/samples/getting_started/workflows/control-flow/workflow_cancellation.py @@ -52,9 +52,13 @@ def build_workflow(): """Build a simple 3-step sequential workflow (~6 seconds total).""" return ( WorkflowBuilder() - .register_executor(lambda: step1, name="step1") - .register_executor(lambda: step2, name="step2") - .register_executor(lambda: step3, name="step3") + .register_executors( + { + "step1": lambda: step1, + "step2": lambda: step2, + "step3": lambda: step3, + } + ) .add_edge("step1", "step2") .add_edge("step2", "step3") .set_start_executor("step1") diff --git a/python/samples/getting_started/workflows/parallelism/aggregate_results_of_different_types.py b/python/samples/getting_started/workflows/parallelism/aggregate_results_of_different_types.py index e4550c1ab2..9f0fe16cc8 100644 --- a/python/samples/getting_started/workflows/parallelism/aggregate_results_of_different_types.py +++ b/python/samples/getting_started/workflows/parallelism/aggregate_results_of_different_types.py @@ -74,10 +74,14 @@ async def main() -> None: # 1) Build a simple fan out and fan in workflow workflow = ( WorkflowBuilder() - .register_executor(lambda: Dispatcher(id="dispatcher"), name="dispatcher") - .register_executor(lambda: Average(id="average"), name="average") - .register_executor(lambda: Sum(id="summation"), name="summation") - .register_executor(lambda: Aggregator(id="aggregator"), name="aggregator") + .register_executors( + { + "dispatcher": lambda: Dispatcher(id="dispatcher"), + "average": lambda: Average(id="average"), + "summation": lambda: Sum(id="summation"), + "aggregator": lambda: Aggregator(id="aggregator"), + } + ) .set_start_executor("dispatcher") .add_fan_out_edges("dispatcher", ["average", "summation"]) .add_fan_in_edges(["average", "summation"], "aggregator") diff --git a/python/samples/getting_started/workflows/parallelism/fan_out_fan_in_edges.py b/python/samples/getting_started/workflows/parallelism/fan_out_fan_in_edges.py index 2be9bc09f7..1382233ea1 100644 --- a/python/samples/getting_started/workflows/parallelism/fan_out_fan_in_edges.py +++ b/python/samples/getting_started/workflows/parallelism/fan_out_fan_in_edges.py @@ -127,8 +127,12 @@ async def main() -> None: .register_agent(create_researcher_agent, name="researcher") .register_agent(create_marketer_agent, name="marketer") .register_agent(create_legal_agent, name="legal") - .register_executor(lambda: DispatchToExperts(id="dispatcher"), name="dispatcher") - .register_executor(lambda: AggregateInsights(id="aggregator"), name="aggregator") + .register_executors( + { + "dispatcher": lambda: DispatchToExperts(id="dispatcher"), + "aggregator": lambda: AggregateInsights(id="aggregator"), + } + ) .set_start_executor("dispatcher") .add_fan_out_edges("dispatcher", ["researcher", "marketer", "legal"]) # Parallel branches .add_fan_in_edges(["researcher", "marketer", "legal"], "aggregator") # Join at the aggregator diff --git a/python/samples/getting_started/workflows/parallelism/map_reduce_and_visualization.py b/python/samples/getting_started/workflows/parallelism/map_reduce_and_visualization.py index 99494c59f4..1749583df9 100644 --- a/python/samples/getting_started/workflows/parallelism/map_reduce_and_visualization.py +++ b/python/samples/getting_started/workflows/parallelism/map_reduce_and_visualization.py @@ -260,27 +260,25 @@ async def main(): """Construct the map reduce workflow, visualize it, then run it over a sample file.""" # Step 1: Create the workflow builder and register executors. - workflow_builder = ( - WorkflowBuilder() - .register_executor(lambda: Map(id="map_executor_0"), name="map_executor_0") - .register_executor(lambda: Map(id="map_executor_1"), name="map_executor_1") - .register_executor(lambda: Map(id="map_executor_2"), name="map_executor_2") - .register_executor( - lambda: Split(["map_executor_0", "map_executor_1", "map_executor_2"], id="split_data_executor"), - name="split_data_executor", - ) - .register_executor(lambda: Reduce(id="reduce_executor_0"), name="reduce_executor_0") - .register_executor(lambda: Reduce(id="reduce_executor_1"), name="reduce_executor_1") - .register_executor(lambda: Reduce(id="reduce_executor_2"), name="reduce_executor_2") - .register_executor(lambda: Reduce(id="reduce_executor_3"), name="reduce_executor_3") - .register_executor( - lambda: Shuffle( + workflow_builder = WorkflowBuilder().register_executors( + { + "map_executor_0": lambda: Map(id="map_executor_0"), + "map_executor_1": lambda: Map(id="map_executor_1"), + "map_executor_2": lambda: Map(id="map_executor_2"), + "split_data_executor": lambda: Split( + ["map_executor_0", "map_executor_1", "map_executor_2"], + id="split_data_executor", + ), + "reduce_executor_0": lambda: Reduce(id="reduce_executor_0"), + "reduce_executor_1": lambda: Reduce(id="reduce_executor_1"), + "reduce_executor_2": lambda: Reduce(id="reduce_executor_2"), + "reduce_executor_3": lambda: Reduce(id="reduce_executor_3"), + "shuffle_executor": lambda: Shuffle( ["reduce_executor_0", "reduce_executor_1", "reduce_executor_2", "reduce_executor_3"], id="shuffle_executor", ), - name="shuffle_executor", - ) - .register_executor(lambda: CompletionExecutor(id="completion_executor"), name="completion_executor") + "completion_executor": lambda: CompletionExecutor(id="completion_executor"), + } ) # Step 2: Build the workflow graph using fan out and fan in edges. diff --git a/python/samples/getting_started/workflows/state-management/state_with_agents.py b/python/samples/getting_started/workflows/state-management/state_with_agents.py index 1844ae40e3..12f7847a88 100644 --- a/python/samples/getting_started/workflows/state-management/state_with_agents.py +++ b/python/samples/getting_started/workflows/state-management/state_with_agents.py @@ -192,11 +192,15 @@ async def main() -> None: WorkflowBuilder() .register_agent(create_spam_detection_agent, name="spam_detection_agent") .register_agent(create_email_assistant_agent, name="email_assistant_agent") - .register_executor(lambda: store_email, name="store_email") - .register_executor(lambda: to_detection_result, name="to_detection_result") - .register_executor(lambda: submit_to_email_assistant, name="submit_to_email_assistant") - .register_executor(lambda: finalize_and_send, name="finalize_and_send") - .register_executor(lambda: handle_spam, name="handle_spam") + .register_executors( + { + "store_email": lambda: store_email, + "to_detection_result": lambda: to_detection_result, + "submit_to_email_assistant": lambda: submit_to_email_assistant, + "finalize_and_send": lambda: finalize_and_send, + "handle_spam": lambda: handle_spam, + } + ) .set_start_executor("store_email") .add_edge("store_email", "spam_detection_agent") .add_edge("spam_detection_agent", "to_detection_result") diff --git a/python/samples/getting_started/workflows/visualization/concurrent_with_visualization.py b/python/samples/getting_started/workflows/visualization/concurrent_with_visualization.py index 68b68c4a7a..23a2d015f6 100644 --- a/python/samples/getting_started/workflows/visualization/concurrent_with_visualization.py +++ b/python/samples/getting_started/workflows/visualization/concurrent_with_visualization.py @@ -127,8 +127,12 @@ async def main() -> None: .register_agent(create_researcher_agent, name="researcher") .register_agent(create_marketer_agent, name="marketer") .register_agent(create_legal_agent, name="legal") - .register_executor(lambda: DispatchToExperts(id="dispatcher"), name="dispatcher") - .register_executor(lambda: AggregateInsights(id="aggregator"), name="aggregator") + .register_executors( + { + "dispatcher": lambda: DispatchToExperts(id="dispatcher"), + "aggregator": lambda: AggregateInsights(id="aggregator"), + } + ) .set_start_executor("dispatcher") .add_fan_out_edges("dispatcher", ["researcher", "marketer", "legal"]) .add_fan_in_edges(["researcher", "marketer", "legal"], "aggregator")