From 79b0a5446e942c851d12dd83ec557c0bee6d59c7 Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Tue, 20 Jan 2026 11:16:18 +0900 Subject: [PATCH 1/3] Wip: composability for workflows --- python/docs/XX-workflow-compose.md | 632 ++++++++++++++++++ .../agent_framework/_workflows/_concurrent.py | 52 +- .../core/agent_framework/_workflows/_edge.py | 76 +++ .../agent_framework/_workflows/_executor.py | 16 + .../agent_framework/_workflows/_sequential.py | 32 +- .../_workflows/_workflow_builder.py | 290 +++++++- .../workflow/test_workflow_composition.py | 390 +++++++++++ .../composition/workflow_composition.py | 246 +++++++ 8 files changed, 1693 insertions(+), 41 deletions(-) create mode 100644 python/docs/XX-workflow-compose.md create mode 100644 python/packages/core/tests/workflow/test_workflow_composition.py create mode 100644 python/samples/getting_started/workflows/composition/workflow_composition.py diff --git a/python/docs/XX-workflow-compose.md b/python/docs/XX-workflow-compose.md new file mode 100644 index 0000000000..f84d231290 --- /dev/null +++ b/python/docs/XX-workflow-compose.md @@ -0,0 +1,632 @@ +--- +status: in-progress +contact: moonbox3 +date: 2026-01-20 +deciders: bentho, taochen, jacob, victor, peter +consulted: eduard, dmytro, gil, mark +informed: team +--- + +# Workflow Composability Design + +## Problem Statement + +Users want to extend high-level builder patterns in two ways: + +1. **Add pre/post-processing** - Insert custom executors before or after a high-level pattern (e.g., validate input before ConcurrentBuilder, format output after) + +2. **Combine patterns** - Chain multiple high-level builders together (e.g., ConcurrentBuilder for analysis, then SequentialBuilder for summarization) + +Today, both require writing custom orchestrator executors that manually dispatch messages and collect results. + +**What users want to write:** + +```python +# Use case 1: Add preprocessing to a high-level pattern +analysis = ConcurrentBuilder().participants([analyzer1, analyzer2]) + +workflow = ( + WorkflowBuilder() + .register_executor(InputValidator, name="validator") + .add_workflow(analysis, id="analysis") + .register_executor(OutputFormatter, name="formatter") + .add_edge("validator", "analysis") + .add_edge("analysis", "formatter") + .set_start_executor("validator") + .build() +) + +# Use case 2: Chain high-level patterns together +analysis = ConcurrentBuilder().participants([analyzer1, analyzer2]) +summary = SequentialBuilder().participants([summarizer]) + +workflow = ( + WorkflowBuilder() + .add_workflow(analysis, id="analysis") + .add_workflow(summary, id="summary") + .add_edge("analysis", "summary") + .build() +) +``` + +**What's required today:** + +Direct composition of high-level builders is not supported. The closest pattern requires a custom orchestrator executor that manually dispatches work and collects results: + +```python +class PipelineOrchestrator(Executor): + @handler + async def start(self, input_data: str, ctx: WorkflowContext) -> None: + await ctx.send_message(input_data, target_id="analysis") + + @handler + async def handle_analysis_result( + self, result: list[ChatMessage], ctx: WorkflowContext + ) -> None: + await ctx.send_message(result, target_id="summary") + + @handler + async def handle_summary_result( + self, result: list[ChatMessage], ctx: WorkflowContext + ) -> None: + await ctx.yield_output(result) + +workflow = ( + WorkflowBuilder() + .register_executor(PipelineOrchestrator, name="orchestrator") + .register_executor(lambda: WorkflowExecutor(analysis.build(), id="analysis"), name="analysis") + .register_executor(lambda: WorkflowExecutor(summary.build(), id="summary"), name="summary") + .add_edge("orchestrator", "analysis") + .add_edge("analysis", "orchestrator") + .add_edge("orchestrator", "summary") + .add_edge("summary", "orchestrator") + .set_start_executor("orchestrator") + .build() +) +``` + +## Goals + +1. **Simple composition** - Combine workflows with minimal boilerplate +2. **Pythonic API** - Feels natural, no new concepts to learn +3. **Type safety** - Fail at build time with clear errors if types don't match +4. **Preserve semantics** - Checkpointing, request/response, observability work correctly +5. **Backward compatible** - Existing code continues to work + +## Current State + +| Component | Status | Notes | +|-----------|--------|-------| +| WorkflowBuilder | Complete | Full fluent API for graph construction | +| WorkflowExecutor | Complete | Wraps workflow as executor (nested composition) | +| High-level builders | Complete | ConcurrentBuilder, SequentialBuilder, GroupChatBuilder, etc. | +| `add_workflow()` | **Missing** | No convenience method for composition | +| Type validation across workflows | **Missing** | No validation that connected workflows have compatible types | + +--- + +## Proposed API + +### OrchestrationBuilder Protocol + +High-level orchestration patterns share a common interface: they have a `build()` method that returns a `Workflow`. We define a protocol to capture this: + +```python +class OrchestrationBuilder(Protocol): + """Protocol for high-level orchestration pattern builders. + + Orchestration builders provide pre-wired multi-agent patterns: + - ConcurrentBuilder (fan-out/fan-in) + - SequentialBuilder (chain with shared context) + - GroupChatBuilder (orchestrator-directed conversation) + - HandoffBuilder (decentralized agent routing) + - MagenticBuilder (plan-based orchestration) + + Note: WorkflowBuilder is NOT an OrchestrationBuilder. It's the low-level + primitive used to construct these patterns. add_workflow() accepts both + OrchestrationBuilder and WorkflowBuilder, but they serve different purposes. + """ + + def build(self) -> Workflow: ... +``` + +This allows `add_workflow()` to accept any current or future orchestration pattern without explicitly listing them. + +### Core Addition: `add_workflow()` + +```python +class WorkflowBuilder: + def add_workflow( + self, + source: OrchestrationBuilder | WorkflowBuilder, + *, + id: str, + ) -> Self: + """Merge an orchestration pattern or workflow builder into this builder. + + Args: + source: An OrchestrationBuilder (ConcurrentBuilder, etc.) or WorkflowBuilder + id: Logical identifier for the merged workflow. + Used with add_edge() and set_start_executor(). + Internal executor IDs are prefixed with this id. + + Returns: + Self for method chaining. + + The `id` becomes a logical identifier that add_edge() and set_start_executor() + can resolve automatically: + - add_edge("analysis", "summary") wires analysis's exit to summary's entry + - set_start_executor("analysis") sets analysis's entry as the start + """ + ... +``` + +### Usage + +```python +# Simple linear composition +analysis = ConcurrentBuilder().participants([agent1, agent2]) +summary = SequentialBuilder().participants([summarizer]) + +workflow = ( + WorkflowBuilder() + .add_workflow(analysis, id="analysis") + .add_workflow(summary, id="summary") + .add_edge("analysis", "summary") # Framework resolves entry/exit points + .set_start_executor("analysis") # Framework knows this means analysis's entry + .build() +) +``` + +```python +# Composition with custom executors +workflow = ( + WorkflowBuilder() + .register_executor(Preprocessor, name="preprocess") + .add_workflow(analysis_builder, id="analysis") + .register_executor(Postprocessor, name="postprocess") + .add_edge("preprocess", "analysis") + .add_edge("analysis", "postprocess") + .set_start_executor("preprocess") + .build() +) +``` + +```python +# Branching based on classifier output +workflow = ( + WorkflowBuilder() + .register_executor(Classifier, name="classifier") + .add_workflow(fast_path, id="fast") + .add_workflow(slow_path, id="slow") + .add_switch_case_edge_group( + "classifier", + [ + Case(condition=lambda r: r.confidence > 0.9, target="fast"), + Default(target="slow"), + ] + ) + .set_start_executor("classifier") + .build() +) +``` + +--- + +## Type Safety + +### The Problem + +Each high-level builder has implicit input/output types: + +| Builder | Input Types | Output Types | +|---------|-------------|--------------| +| ConcurrentBuilder | `str`, `ChatMessage`, `list[ChatMessage]` | `list[ChatMessage]` | +| SequentialBuilder | `str`, `ChatMessage`, `list[ChatMessage]` | `list[ChatMessage]` | +| GroupChatBuilder | `str`, `ChatMessage`, `list[ChatMessage]` | `list[ChatMessage]` | +| Custom Executor | Whatever handlers accept | Whatever handlers send | + +If you connect workflows with incompatible types, messages silently won't be delivered (no handler matches). + +### Proposed Solution + +Add type metadata to builders and validate at `build()` time: + +```python +# Internal: each builder knows its contract +class ConcurrentBuilder: + @property + def _input_types(self) -> set[type]: + return {str, ChatMessage, list[ChatMessage]} + + @property + def _output_types(self) -> set[type]: + return {list[ChatMessage]} +``` + +When `WorkflowBuilder.build()` validates edges, check type compatibility: + +```python +# At build() time, for each edge crossing workflow boundaries: +if not _types_compatible(source_output_types, target_input_types): + raise TypeError( + f"Type mismatch: '{source_id}' outputs {source_output_types} " + f"but '{target_id}' expects {target_input_types}" + ) +``` + +### Error Messages + +Good error messages are critical: + +``` +TypeError: Cannot connect 'analysis/aggregator' to 'custom_processor': + - 'analysis/aggregator' outputs: list[ChatMessage] + - 'custom_processor' accepts: AnalysisResult, dict + +To fix this, either: + 1. Change 'custom_processor' to accept list[ChatMessage] + 2. Add an adapter executor between them that converts the types +``` + +--- + +## Implementation + +### How `add_workflow()` Works Internally + +1. **Extract graph info** from the source builder (executors, edges, start/end points) +2. **Prefix all executor IDs** with the provided `id` parameter +3. **Register executors** into the parent builder with prefixed IDs +4. **Copy edge groups** with prefixed IDs +5. **Track logical ID mapping** for entry/exit point resolution + +```python +def add_workflow( + self, + source: OrchestrationBuilder | WorkflowBuilder, + *, + id: str, +) -> Self: + # Extract WorkflowBuilder from source (or use directly if already WorkflowBuilder) + inner_builder = self._extract_builder(source) + + # Prefix all IDs + prefix = id + + # Copy executors with prefixed IDs + for exec_id, executor in inner_builder._executors.items(): + prefixed_id = f"{prefix}/{exec_id}" + # Clone executor with new ID + cloned = executor._clone_with_id(prefixed_id) + self._executors[prefixed_id] = cloned + + # Copy edge groups with prefixed IDs + for edge_group in inner_builder._edge_groups: + prefixed_group = edge_group._with_prefix(prefix) + self._edge_groups.append(prefixed_group) + + # Track logical ID -> entry/exit point mapping + entry_id = f"{prefix}/{inner_builder._start_executor_id}" + exit_ids = [f"{prefix}/{eid}" for eid in inner_builder._terminal_executor_ids] + self._workflow_mappings[id] = WorkflowMapping(entry=entry_id, exits=exit_ids) + + return self +``` + +### How `add_edge()` Resolves Logical IDs + +When `add_edge()` receives an ID, it checks if it's a logical workflow ID: + +```python +def add_edge(self, source: str, target: str) -> Self: + # Resolve logical IDs to actual executor IDs + resolved_source = self._resolve_exit(source) # Use exit point if workflow ID + resolved_target = self._resolve_entry(target) # Use entry point if workflow ID + + # ... existing edge creation logic with resolved IDs ... + +def _resolve_exit(self, id: str) -> str: + """Resolve ID to exit point if it's a workflow ID, otherwise return as-is.""" + if id in self._workflow_mappings: + mapping = self._workflow_mappings[id] + if len(mapping.exits) != 1: + raise ValueError( + f"Workflow '{id}' has {len(mapping.exits)} exit points. " + f"Use explicit IDs: {mapping.exits}" + ) + return mapping.exits[0] + return id + +def _resolve_entry(self, id: str) -> str: + """Resolve ID to entry point if it's a workflow ID, otherwise return as-is.""" + if id in self._workflow_mappings: + return self._workflow_mappings[id].entry + return id +``` + +This approach: +- **Keeps the simple case simple**: `add_edge("analysis", "summary")` just works +- **Handles ambiguity explicitly**: If a workflow has multiple exits, user must specify which one +- **Preserves escape hatch**: Users can still use full IDs like `"analysis/aggregator"` when needed + +### Extracting Builder from High-Level Builders + +High-level builders don't currently expose their internal structure. Options: + +**Option A: Add internal method to each builder** +```python +class ConcurrentBuilder: + def _to_builder(self) -> WorkflowBuilder: + """Build internal WorkflowBuilder without calling build().""" + # Similar to build() but returns the builder, not the workflow + ... +``` + +**Option B: Build and extract from Workflow** +```python +def _extract_builder(self, source) -> WorkflowBuilder: + if isinstance(source, WorkflowBuilder): + return source + # For high-level builders, build then extract + workflow = source.build() + return workflow._to_builder() # Reconstruct builder from workflow +``` + +**Recommendation: Option A** - cleaner, no round-trip through Workflow. + +--- + +## What We're NOT Doing + +To keep the design simple: + +- **No new `connect()` method** - Use existing `add_edge()` with logical ID resolution +- **No public handle types** - Logical IDs and internal mappings are implementation details +- **No type adapter registry** - Users write adapter executors if needed +- **No auto-adapter insertion** - Explicit is better than implicit +- **No port semantics** - Over-engineering for current needs +- **No requirement to know internal executor names** - Logical IDs abstract this away + +--- + +## Migration + +### Before + +```python +# Custom orchestrator required to chain two workflows +class PipelineOrchestrator(Executor): + @handler + async def start(self, data: str, ctx: WorkflowContext) -> None: + await ctx.send_message(data, target_id="analysis") + + @handler + async def handle_analysis_result( + self, result: list[ChatMessage], ctx: WorkflowContext + ) -> None: + await ctx.send_message(result, target_id="summary") + + @handler + async def handle_summary_result( + self, result: list[ChatMessage], ctx: WorkflowContext + ) -> None: + await ctx.yield_output(result) + +workflow = ( + WorkflowBuilder() + .register_executor(PipelineOrchestrator, name="orchestrator") + .register_executor(lambda: WorkflowExecutor(analysis.build(), id="analysis"), name="analysis") + .register_executor(lambda: WorkflowExecutor(summary.build(), id="summary"), name="summary") + .add_edge("orchestrator", "analysis") + .add_edge("analysis", "orchestrator") + .add_edge("orchestrator", "summary") + .add_edge("summary", "orchestrator") + .set_start_executor("orchestrator") + .build() +) +``` + +### After + +```python +# Direct composition - no custom orchestrator needed +workflow = ( + WorkflowBuilder() + .add_workflow(analysis, id="analysis") + .add_workflow(summary, id="summary") + .add_edge("analysis", "summary") + .set_start_executor("analysis") + .build() +) +``` + +--- + +## Open Questions + +1. **ID prefix separator**: `analysis/dispatcher` vs `analysis::dispatcher` vs `analysis.dispatcher`? + - Proposal: `/` (familiar from paths, clear visual separator) + +2. **What if source builder is used multiple times?** + - Each `add_workflow()` call should clone the source to avoid shared state + - Different `id` prefixes ensure no collisions + +3. **Should `add_workflow()` accept a built `Workflow`?** + - Useful for reusing a pre-built workflow + - Requires extracting topology back into a builder (cloning executors) + - Recommendation: Support it for flexibility + +4. **How to handle checkpoint storage from merged builders?** + - Proposal: Parent's checkpoint config takes precedence + - Merged builders' checkpoint configs are ignored + +--- + +## Alternatives Considered + +### Alternative A: WorkflowExecutor Sugar Only + +Add convenience methods to wrap workflows in `WorkflowExecutor` without changing execution semantics. + +```python +class Workflow: + def as_executor(self, id: str) -> WorkflowExecutor: + return WorkflowExecutor(self, id=id) + +# Usage +workflow = ( + WorkflowBuilder() + .register_executor(lambda: analysis.build().as_executor("analysis"), name="analysis") + .register_executor(lambda: summary.build().as_executor("summary"), name="summary") + .add_edge("analysis", "summary") + .build() +) +``` + +**Why not chosen:** +- Still requires `.build()` before composing +- Maintains nested execution boundary (double superstep scheduling, separate checkpoint lineage) +- Doesn't solve the core problem of needing a custom orchestrator for message routing +- Just syntactic sugar over existing `WorkflowExecutor(workflow, id=...)` pattern + +### Alternative B: Connection Protocol with Explicit Metadata + +Expose a `WorkflowConnection` wrapper and `as_connection()` method on all builders. + +```python +class WorkflowConnection: + builder: WorkflowBuilder + entry: str + exits: list[str] + input_types: set[type] + output_types: set[type] + +class ConcurrentBuilder: + def as_connection(self, prefix: str | None = None) -> WorkflowConnection: + ... + +# Usage +connection = analysis_builder.as_connection(prefix="analysis") +builder.add_workflow(connection) +builder.connect(connection.exits[0], other.entry) +``` + +**Why not chosen:** +- Exposes unnecessary abstraction to users (`WorkflowConnection`) +- Users must understand and call `as_connection()` explicitly +- Adds cognitive overhead without clear benefit +- `add_workflow()` can handle this internally + +### Alternative C: Handle-Based API + +Return a handle from `add_workflow()` with `.start` and `.end` properties for explicit wiring. + +```python +h1 = builder.add_workflow(analysis, id="analysis") +h2 = builder.add_workflow(summary, id="summary") +builder.add_edge(h1.end[0], h2.start) # Explicit entry/exit wiring +``` + +**Why not chosen:** +- Requires users to understand internal structure (`.start`, `.end[0]`) +- `.end[0]` is awkward - users shouldn't need to know about exit point lists +- Exposes implementation details that should be hidden +- Logical ID resolution provides the same functionality with simpler syntax + +### Alternative D: New `connect()` Method + +Add a `connect()` method alongside `add_edge()` specifically for workflow composition. + +```python +builder.connect("analysis", "summary") # Separate method for workflow connections +``` + +**Why not chosen:** +- Duplicates `add_edge()` functionality +- Adds API surface without clear benefit +- Users already know `add_edge()` +- Creates confusion about when to use `connect()` vs `add_edge()` +- Better to enhance `add_edge()` with logical ID resolution + +### Alternative E: Port-Based Interfaces + +Elevate executor I/O to named ports with declared types and semantics. + +```python +class Executor: + ports: dict[str, PortSpec] # in/out, types, semantics + +builder.connect( + source=(analysis, "out:conversation"), + target=(summary, "in:conversation") +) +``` + +**Why not chosen:** +- Significant complexity added to executor interface +- Requires retrofitting all existing executors +- Over-engineering for current needs +- Port semantics can be added later if needed + +### Alternative F: Auto-Adapter Insertion + +Automatically insert type adapters when connecting incompatible workflows. + +```python +# Framework auto-inserts adapter if types don't match +builder.add_edge(text_output, chat_input) # Auto-inserts TextToMessages adapter +``` + +**Why not chosen:** +- "Magic" behavior obscures graph structure +- Makes debugging harder (hidden executors) +- Users may not realize types are being converted +- Explicit adapters are clearer and more predictable + +### Alternative G: Declarative Composition DSL + +Define composition via YAML/JSON schema with explicit type contracts. + +```yaml +workflows: + analysis: + builder: ConcurrentBuilder + participants: [agent1, agent2] + summary: + builder: SequentialBuilder + participants: [summarizer] + +pipeline: + - analysis -> summary +``` + +**Why not chosen:** +- Adds a new language/format to learn +- Requires tooling for validation and code generation +- Python code is already declarative enough +- Solve the simple problem first + +--- + +## Implementation Phases + +### Phase 1: Core `add_workflow()` + +1. Add internal `WorkflowMapping` dataclass (entry/exit tracking) +2. Add `_to_builder()` to ConcurrentBuilder and SequentialBuilder +3. Implement `add_workflow()` on WorkflowBuilder +4. Add logical ID resolution to `add_edge()` and `set_start_executor()` +5. Add tests for basic composition + +### Phase 2: Type Validation + +1. Add `_input_types` / `_output_types` properties to builders +2. Enhance `build()` validation to check cross-workflow type compatibility +3. Improve error messages + +### Phase 3: Remaining Builders + +1. Add `_to_builder()` to GroupChatBuilder, HandoffBuilder, MagenticBuilder +2. Support `Workflow` as input to `add_workflow()` +3. Documentation and examples diff --git a/python/packages/core/agent_framework/_workflows/_concurrent.py b/python/packages/core/agent_framework/_workflows/_concurrent.py index 033946afff..741dbb53d3 100644 --- a/python/packages/core/agent_framework/_workflows/_concurrent.py +++ b/python/packages/core/agent_framework/_workflows/_concurrent.py @@ -525,29 +525,14 @@ def _resolve_participants(self) -> list[Executor]: return executors - def build(self) -> Workflow: - r"""Build and validate the concurrent workflow. + def _to_builder(self) -> tuple["WorkflowBuilder", str, list[str]]: + """Build the internal WorkflowBuilder without calling build(). - Wiring pattern: - - Dispatcher (internal) fans out the input to all `participants` - - Fan-in collects `AgentExecutorResponse` objects from all participants - - If request info is enabled, the orchestration emits a request info event with outputs from all participants - before sending the outputs to the aggregator - - Aggregator yields output and the workflow becomes idle. The output is either: - - list[ChatMessage] (default aggregator: one user + one assistant per agent) - - custom payload from the provided aggregator + This is used for workflow composition via WorkflowBuilder.add_workflow(). Returns: - Workflow: a ready-to-run workflow instance - - Raises: - ValueError: if no participants were defined - - Example: - - .. code-block:: python - - workflow = ConcurrentBuilder().participants([agent1, agent2]).build() + A tuple of (WorkflowBuilder, start_executor_id, terminal_executor_ids). + The terminal executor is the aggregator. """ if not self._participants and not self._participant_factories: raise ValueError( @@ -579,4 +564,31 @@ def build(self) -> Workflow: if self._checkpoint_storage is not None: builder = builder.with_checkpointing(self._checkpoint_storage) + return builder, dispatcher.id, [aggregator.id] + + def build(self) -> Workflow: + r"""Build and validate the concurrent workflow. + + Wiring pattern: + - Dispatcher (internal) fans out the input to all `participants` + - Fan-in collects `AgentExecutorResponse` objects from all participants + - If request info is enabled, the orchestration emits a request info event with outputs from all participants + before sending the outputs to the aggregator + - Aggregator yields output and the workflow becomes idle. The output is either: + - list[ChatMessage] (default aggregator: one user + one assistant per agent) + - custom payload from the provided aggregator + + Returns: + Workflow: a ready-to-run workflow instance + + Raises: + ValueError: if no participants were defined + + Example: + + .. code-block:: python + + workflow = ConcurrentBuilder().participants([agent1, agent2]).build() + """ + builder, _, _ = self._to_builder() return builder.build() diff --git a/python/packages/core/agent_framework/_workflows/_edge.py b/python/packages/core/agent_framework/_workflows/_edge.py index 02ca1722dd..500cd47966 100644 --- a/python/packages/core/agent_framework/_workflows/_edge.py +++ b/python/packages/core/agent_framework/_workflows/_edge.py @@ -413,6 +413,29 @@ class CustomGroup(EdgeGroup): cls._TYPE_REGISTRY[subclass.__name__] = subclass return subclass + def _with_prefix(self, prefix: str) -> "EdgeGroup": + """Create a copy of this edge group with prefixed executor IDs. + + This is used internally for workflow composition when merging edge groups + from one builder into another. + + Args: + prefix: The prefix to add to all executor IDs (e.g., "analysis"). + + Returns: + A new EdgeGroup instance with prefixed IDs. + """ + prefixed_edges = [ + Edge( + source_id=f"{prefix}/{edge.source_id}", + target_id=f"{prefix}/{edge.target_id}", + condition=edge._condition, + condition_name=edge.condition_name, + ) + for edge in self.edges + ] + return EdgeGroup(prefixed_edges, type=self.type) + @classmethod def from_dict(cls, data: dict[str, Any]) -> "EdgeGroup": """Hydrate the correct `EdgeGroup` subclass from serialised state. @@ -493,6 +516,15 @@ def __init__( edge = Edge(source_id=source_id, target_id=target_id, condition=condition) super().__init__([edge], id=id, type=self.__class__.__name__) + def _with_prefix(self, prefix: str) -> "SingleEdgeGroup": + """Create a copy with prefixed executor IDs.""" + edge = self.edges[0] + return SingleEdgeGroup( + source_id=f"{prefix}/{edge.source_id}", + target_id=f"{prefix}/{edge.target_id}", + condition=edge._condition, + ) + @EdgeGroup.register @dataclass(init=False) @@ -608,6 +640,16 @@ def to_dict(self) -> dict[str, Any]: payload["selection_func_name"] = self.selection_func_name return payload + def _with_prefix(self, prefix: str) -> "FanOutEdgeGroup": + """Create a copy with prefixed executor IDs.""" + source_id = self.edges[0].source_id + return FanOutEdgeGroup( + source_id=f"{prefix}/{source_id}", + target_ids=[f"{prefix}/{tid}" for tid in self._target_ids], + selection_func=self._selection_func, + selection_func_name=self.selection_func_name, + ) + @EdgeGroup.register @dataclass(init=False) @@ -643,6 +685,15 @@ def __init__(self, source_ids: Sequence[str], target_id: str, *, id: str | None edges = [Edge(source_id=source, target_id=target_id) for source in source_ids] super().__init__(edges, id=id, type=self.__class__.__name__) + def _with_prefix(self, prefix: str) -> "FanInEdgeGroup": + """Create a copy with prefixed executor IDs.""" + target_id = self.edges[0].target_id + source_ids = [edge.source_id for edge in self.edges] + return FanInEdgeGroup( + source_ids=[f"{prefix}/{sid}" for sid in source_ids], + target_id=f"{prefix}/{target_id}", + ) + @dataclass(init=False) class SwitchCaseEdgeGroupCase(DictConvertible): @@ -902,6 +953,26 @@ def to_dict(self) -> dict[str, Any]: payload["cases"] = [encode_value(case) for case in self.cases] return payload + def _with_prefix(self, prefix: str) -> "SwitchCaseEdgeGroup": + """Create a copy with prefixed executor IDs.""" + source_id = self.edges[0].source_id + prefixed_cases: list[SwitchCaseEdgeGroupCase | SwitchCaseEdgeGroupDefault] = [] + for case in self.cases: + if isinstance(case, SwitchCaseEdgeGroupDefault): + prefixed_cases.append(SwitchCaseEdgeGroupDefault(target_id=f"{prefix}/{case.target_id}")) + else: + prefixed_cases.append( + SwitchCaseEdgeGroupCase( + condition=case._condition, + target_id=f"{prefix}/{case.target_id}", + condition_name=case.condition_name, + ) + ) + return SwitchCaseEdgeGroup( + source_id=f"{prefix}/{source_id}", + cases=prefixed_cases, + ) + @EdgeGroup.register @dataclass(init=False) @@ -939,3 +1010,8 @@ def __init__(self, executor_id: str) -> None: """ edge = Edge(source_id=INTERNAL_SOURCE_ID(executor_id), target_id=executor_id) super().__init__([edge]) + + def _with_prefix(self, prefix: str) -> "InternalEdgeGroup": + """Create a copy with prefixed executor IDs.""" + executor_id = self.edges[0].target_id + return InternalEdgeGroup(executor_id=f"{prefix}/{executor_id}") diff --git a/python/packages/core/agent_framework/_workflows/_executor.py b/python/packages/core/agent_framework/_workflows/_executor.py index 49f3dafd06..9ae6564140 100644 --- a/python/packages/core/agent_framework/_workflows/_executor.py +++ b/python/packages/core/agent_framework/_workflows/_executor.py @@ -519,6 +519,22 @@ async def on_checkpoint_restore(self, state: dict[str, Any]) -> None: """ ... + def _clone_with_id(self, new_id: str) -> "Executor": + """Create a clone of this executor with a new ID. + + This is used internally for workflow composition when merging executors + from one builder into another with prefixed IDs. + + Args: + new_id: The new ID for the cloned executor. + + Returns: + A new Executor instance with the same configuration but a different ID. + """ + cloned = copy.copy(self) + cloned.id = new_id + return cloned + # endregion: Executor diff --git a/python/packages/core/agent_framework/_workflows/_sequential.py b/python/packages/core/agent_framework/_workflows/_sequential.py index 11c123d153..308431c809 100644 --- a/python/packages/core/agent_framework/_workflows/_sequential.py +++ b/python/packages/core/agent_framework/_workflows/_sequential.py @@ -275,17 +275,14 @@ def _resolve_participants(self) -> list[Executor]: return executors - def build(self) -> Workflow: - """Build and validate the sequential workflow. + def _to_builder(self) -> tuple["WorkflowBuilder", str, list[str]]: + """Build the internal WorkflowBuilder without calling build(). - Wiring pattern: - - _InputToConversation normalizes the initial input into list[ChatMessage] - - For each participant in order: - - If Agent (or AgentExecutor): pass conversation to the agent, then optionally - route through a request info interceptor, then convert response to conversation - via _ResponseToConversation - - Else (custom Executor): pass conversation directly to the executor - - _EndWithConversation yields the final conversation and the workflow becomes idle + This is used for workflow composition via WorkflowBuilder.add_workflow(). + + Returns: + A tuple of (WorkflowBuilder, start_executor_id, terminal_executor_ids). + The terminal executor is the end executor. """ if not self._participants and not self._participant_factories: raise ValueError( @@ -320,4 +317,19 @@ def build(self) -> Workflow: if self._checkpoint_storage is not None: builder = builder.with_checkpointing(self._checkpoint_storage) + return builder, input_conv.id, [end.id] + + def build(self) -> Workflow: + """Build and validate the sequential workflow. + + Wiring pattern: + - _InputToConversation normalizes the initial input into list[ChatMessage] + - For each participant in order: + - If Agent (or AgentExecutor): pass conversation to the agent, then optionally + route through a request info interceptor, then convert response to conversation + via _ResponseToConversation + - Else (custom Executor): pass conversation directly to the executor + - _EndWithConversation yields the final conversation and the workflow becomes idle + """ + builder, _, _ = self._to_builder() return builder.build() diff --git a/python/packages/core/agent_framework/_workflows/_workflow_builder.py b/python/packages/core/agent_framework/_workflows/_workflow_builder.py index 8cc31e2cc9..5feac43785 100644 --- a/python/packages/core/agent_framework/_workflows/_workflow_builder.py +++ b/python/packages/core/agent_framework/_workflows/_workflow_builder.py @@ -42,6 +42,19 @@ logger = logging.getLogger(__name__) +@dataclass +class _WorkflowMapping: + """Internal mapping of a merged workflow's logical ID to its entry/exit points. + + Args: + entry: The actual executor ID of the workflow's entry point. + exits: List of actual executor IDs of the workflow's exit points. + """ + + entry: str + exits: list[str] + + @dataclass class _EdgeRegistration: """A data class representing an edge registration in the workflow builder. @@ -189,6 +202,9 @@ def __init__( ] = [] self._executor_registry: dict[str, Callable[[], Executor]] = {} + # Workflow composition: maps logical workflow IDs to their entry/exit points + self._workflow_mappings: dict[str, _WorkflowMapping] = {} + # Agents auto-wrapped by builder now always stream incremental updates. def _add_executor(self, executor: Executor) -> str: @@ -520,7 +536,10 @@ async def process(self, count: int, ctx: WorkflowContext[Never, str]) -> None: if isinstance(source, str) and isinstance(target, str): # Both are names; defer resolution to build time - self._edge_registry.append(_EdgeRegistration(source=source, target=target, condition=condition)) + # Resolve workflow logical IDs: source uses exit point, target uses entry point + resolved_source = self._resolve_workflow_exit(source) + resolved_target = self._resolve_workflow_entry(target) + self._edge_registry.append(_EdgeRegistration(source=resolved_source, target=resolved_target, condition=condition)) return self # Both are Executor/AgentProtocol instances; wrap and add now @@ -599,7 +618,10 @@ async def validate(self, data: str, ctx: WorkflowContext) -> None: if isinstance(source, str) and all(isinstance(t, str) for t in targets): # Both are names; defer resolution to build time - self._edge_registry.append(_FanOutEdgeRegistration(source=source, targets=list(targets))) # type: ignore + # Resolve workflow logical IDs: source uses exit point, targets use entry points + resolved_source = self._resolve_workflow_exit(source) + resolved_targets = [self._resolve_workflow_entry(t) for t in targets] # type: ignore[arg-type] + self._edge_registry.append(_FanOutEdgeRegistration(source=resolved_source, targets=resolved_targets)) return self # Both are Executor/AgentProtocol instances; wrap and add now @@ -698,7 +720,17 @@ async def handle(self, result: Result, ctx: WorkflowContext) -> None: if isinstance(source, str) and all(isinstance(case.target, str) for case in cases): # Source is a name; defer resolution to build time - self._edge_registry.append(_SwitchCaseEdgeGroupRegistration(source=source, cases=list(cases))) # type: ignore + # Resolve workflow logical IDs + resolved_source = self._resolve_workflow_exit(source) + resolved_cases: list[Case | Default] = [] + for case in cases: + target_str = case.target # type: ignore[assignment] + resolved_target = self._resolve_workflow_entry(target_str) + if isinstance(case, Default): + resolved_cases.append(Default(target=resolved_target)) + else: + resolved_cases.append(Case(condition=case.condition, target=resolved_target)) + self._edge_registry.append(_SwitchCaseEdgeGroupRegistration(source=resolved_source, cases=resolved_cases)) return self # Source is an Executor/AgentProtocol instance; wrap and add now @@ -810,10 +842,13 @@ def select_workers(task: Task, available: list[str]) -> list[str]: if isinstance(source, str) and all(isinstance(t, str) for t in targets): # Both are names; defer resolution to build time + # Resolve workflow logical IDs: source uses exit point, targets use entry points + resolved_source = self._resolve_workflow_exit(source) + resolved_targets = [self._resolve_workflow_entry(t) for t in targets] # type: ignore[arg-type] self._edge_registry.append( _MultiSelectionEdgeGroupRegistration( - source=source, - targets=list(targets), # type: ignore + source=resolved_source, + targets=resolved_targets, selection_func=selection_func, ) ) @@ -895,7 +930,10 @@ async def aggregate(self, results: list[str], ctx: WorkflowContext[Never, str]) if all(isinstance(s, str) for s in sources) and isinstance(target, str): # Both are names; defer resolution to build time - self._edge_registry.append(_FanInEdgeRegistration(sources=list(sources), target=target)) # type: ignore + # Resolve workflow logical IDs: sources use exit points, target uses entry point + resolved_sources = [self._resolve_workflow_exit(s) for s in sources] # type: ignore[arg-type] + resolved_target = self._resolve_workflow_entry(target) + self._edge_registry.append(_FanInEdgeRegistration(sources=resolved_sources, target=resolved_target)) return self # Both are Executor/AgentProtocol instances; wrap and add now @@ -1030,7 +1068,8 @@ async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None: logger.warning(f"Overwriting existing start executor: {start_id} for the workflow.") if isinstance(executor, str): - self._start_executor = executor + # Resolve workflow logical ID to entry point + self._start_executor = self._resolve_workflow_entry(executor) else: wrapped = self._maybe_wrap_agent(executor) # type: ignore[arg-type] self._start_executor = wrapped @@ -1140,6 +1179,228 @@ async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None: self._checkpoint_storage = checkpoint_storage return self + def add_workflow( + self, + source: Any, + *, + id: str, + ) -> Self: + """Merge an orchestration pattern or workflow builder into this builder. + + This method enables workflow composition by inlining executors from high-level + orchestration builders (ConcurrentBuilder, SequentialBuilder, etc.) or other + WorkflowBuilder instances into this builder. + + The provided `id` becomes a logical identifier that can be used with + ``add_edge()`` and ``set_start_executor()``. The framework automatically + resolves these logical IDs to the actual entry/exit points of the merged + workflow. + + Args: + source: An orchestration builder (ConcurrentBuilder, SequentialBuilder, etc.) + or another WorkflowBuilder instance. + id: Logical identifier for the merged workflow. Used with ``add_edge()`` + and ``set_start_executor()``. Internal executor IDs are prefixed with + this id (e.g., "analysis/dispatcher"). + + Returns: + Self for method chaining. + + Raises: + ValueError: If the source doesn't have a ``_to_builder()`` method or if + the id conflicts with an existing registered name. + + Example: + Compose two high-level orchestration patterns: + + .. code-block:: python + + from agent_framework import ( + ConcurrentBuilder, + SequentialBuilder, + WorkflowBuilder, + ) + + # Create high-level patterns + analysis = ConcurrentBuilder().participants([agent1, agent2]) + summary = SequentialBuilder().participants([summarizer]) + + # Compose them together + workflow = ( + WorkflowBuilder() + .add_workflow(analysis, id="analysis") + .add_workflow(summary, id="summary") + .add_edge("analysis", "summary") # Framework resolves entry/exit + .set_start_executor("analysis") # Framework knows entry point + .build() + ) + + Add pre/post-processing to a high-level pattern: + + .. code-block:: python + + workflow = ( + WorkflowBuilder() + .register_executor(Preprocessor, name="preprocess") + .add_workflow(analysis_builder, id="analysis") + .register_executor(Postprocessor, name="postprocess") + .add_edge("preprocess", "analysis") + .add_edge("analysis", "postprocess") + .set_start_executor("preprocess") + .build() + ) + """ + # Check for ID conflicts + if id in self._executor_registry: + raise ValueError(f"Workflow ID '{id}' conflicts with an existing registered executor name.") + if id in self._workflow_mappings: + raise ValueError(f"Workflow ID '{id}' has already been used.") + + # Extract the builder and entry/exit information from the source + if hasattr(source, "_to_builder"): + inner_builder, start_id, terminal_ids = source._to_builder() + elif isinstance(source, WorkflowBuilder): + # For WorkflowBuilder, we need to extract its structure + inner_builder = source + if inner_builder._start_executor is None: + raise ValueError("Source WorkflowBuilder must have a start executor set.") + if isinstance(inner_builder._start_executor, str): + start_id = inner_builder._start_executor + else: + start_id = inner_builder._start_executor.id + # For raw WorkflowBuilder, we don't know the terminal executors + # The user will need to use explicit IDs or rely on validation + terminal_ids = [] + else: + raise ValueError( + f"Source must be an orchestration builder (ConcurrentBuilder, SequentialBuilder, etc.) " + f"or a WorkflowBuilder. Got {type(source).__name__}." + ) + + # Merge executors from the inner builder with prefixed IDs + for exec_id, executor in inner_builder._executors.items(): + prefixed_id = f"{id}/{exec_id}" + # Clone the executor with a new ID + cloned = executor._clone_with_id(prefixed_id) + self._executors[prefixed_id] = cloned + # Note: Internal edge groups are already included in inner_builder._edge_groups + # and will be prefixed and copied below + + # Copy edge groups with prefixed IDs + for edge_group in inner_builder._edge_groups: + prefixed_group = edge_group._with_prefix(id) + self._edge_groups.append(prefixed_group) + + # Copy executor registry with prefixed names + for name, factory in inner_builder._executor_registry.items(): + prefixed_name = f"{id}/{name}" + + def make_prefixed_factory(original_factory: Callable[[], Executor], prefix: str) -> Callable[[], Executor]: + def factory_fn() -> Executor: + executor = original_factory() + return executor._clone_with_id(f"{prefix}/{executor.id}") + + return factory_fn + + self._executor_registry[prefixed_name] = make_prefixed_factory(factory, id) + + # Copy edge registry with prefixed IDs + for registration in inner_builder._edge_registry: + prefixed_reg = self._prefix_edge_registration(registration, id) + self._edge_registry.append(prefixed_reg) + + # Track the workflow mapping for logical ID resolution + prefixed_start = f"{id}/{start_id}" + prefixed_terminals = [f"{id}/{tid}" for tid in terminal_ids] + self._workflow_mappings[id] = _WorkflowMapping(entry=prefixed_start, exits=prefixed_terminals) + + return self + + def _prefix_edge_registration( + self, + registration: _EdgeRegistration + | _FanOutEdgeRegistration + | _SwitchCaseEdgeGroupRegistration + | _MultiSelectionEdgeGroupRegistration + | _FanInEdgeRegistration, + prefix: str, + ) -> ( + _EdgeRegistration + | _FanOutEdgeRegistration + | _SwitchCaseEdgeGroupRegistration + | _MultiSelectionEdgeGroupRegistration + | _FanInEdgeRegistration + ): + """Create a copy of an edge registration with prefixed IDs.""" + match registration: + case _EdgeRegistration(source, target, condition): + return _EdgeRegistration( + source=f"{prefix}/{source}", + target=f"{prefix}/{target}", + condition=condition, + ) + case _FanOutEdgeRegistration(source, targets): + return _FanOutEdgeRegistration( + source=f"{prefix}/{source}", + targets=[f"{prefix}/{t}" for t in targets], + ) + case _FanInEdgeRegistration(sources, target): + return _FanInEdgeRegistration( + sources=[f"{prefix}/{s}" for s in sources], + target=f"{prefix}/{target}", + ) + case _SwitchCaseEdgeGroupRegistration(source, cases): + prefixed_cases: list[Case | Default] = [] + for case in cases: + if isinstance(case, Default): + prefixed_cases.append(Default(target=f"{prefix}/{case.target}")) + else: + prefixed_cases.append(Case(condition=case.condition, target=f"{prefix}/{case.target}")) + return _SwitchCaseEdgeGroupRegistration( + source=f"{prefix}/{source}", + cases=prefixed_cases, + ) + case _MultiSelectionEdgeGroupRegistration(source, targets, selection_func): + return _MultiSelectionEdgeGroupRegistration( + source=f"{prefix}/{source}", + targets=[f"{prefix}/{t}" for t in targets], + selection_func=selection_func, + ) + case _: + raise ValueError(f"Unknown edge registration type: {type(registration)}") + + def _resolve_workflow_entry(self, id: str) -> str: + """Resolve a logical workflow ID to its entry point executor ID. + + If the ID is a workflow logical ID, returns the entry point. + Otherwise, returns the ID unchanged. + """ + if id in self._workflow_mappings: + return self._workflow_mappings[id].entry + return id + + def _resolve_workflow_exit(self, id: str) -> str: + """Resolve a logical workflow ID to its exit point executor ID. + + If the ID is a workflow logical ID, returns the single exit point. + Raises if the workflow has multiple exit points. + Otherwise, returns the ID unchanged. + """ + if id in self._workflow_mappings: + mapping = self._workflow_mappings[id] + if len(mapping.exits) == 0: + raise ValueError( + f"Workflow '{id}' has no known exit points. " + f"Use explicit executor IDs like '{id}/'." + ) + if len(mapping.exits) != 1: + raise ValueError( + f"Workflow '{id}' has {len(mapping.exits)} exit points. " + f"Use explicit IDs: {mapping.exits}" + ) + return mapping.exits[0] + return id + def _resolve_edge_registry( self, ) -> tuple[Executor, list[Executor], list[EdgeGroup]]: @@ -1150,6 +1411,9 @@ def _resolve_edge_registry( start_executor: Executor | None = None if isinstance(self._start_executor, Executor): start_executor = self._start_executor + elif isinstance(self._start_executor, str) and self._start_executor in self._executors: + # Start executor is from a merged workflow (via add_workflow) + start_executor = self._executors[self._start_executor] # Maps registered factory names to created executor instances for edge resolution factory_name_to_instance: dict[str, Executor] = {} @@ -1170,10 +1434,14 @@ def _resolve_edge_registry( factory_name_to_instance[name] = instance def _get_executor(name: str) -> Executor: - """Helper to get executor by the registered name. Raises if not found.""" - if name not in factory_name_to_instance: - raise ValueError(f"Factory '{name}' has not been registered.") - return factory_name_to_instance[name] + """Helper to get executor by the registered name or prefixed ID. Raises if not found.""" + # First check factory name to instance (from executor registry) + if name in factory_name_to_instance: + return factory_name_to_instance[name] + # Then check direct executors (from add_workflow) + if name in self._executors: + return self._executors[name] + raise ValueError(f"Factory '{name}' has not been registered.") for registration in self._edge_registry: match registration: diff --git a/python/packages/core/tests/workflow/test_workflow_composition.py b/python/packages/core/tests/workflow/test_workflow_composition.py new file mode 100644 index 0000000000..9fa7cd962f --- /dev/null +++ b/python/packages/core/tests/workflow/test_workflow_composition.py @@ -0,0 +1,390 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Tests for workflow composition via add_workflow().""" + +from typing import Any +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from agent_framework import ( + ChatAgent, + ConcurrentBuilder, + Executor, + SequentialBuilder, + WorkflowBuilder, + WorkflowContext, + handler, +) +from agent_framework._workflows._edge import Case, Default + + +def create_mock_agent(name: str) -> ChatAgent: + """Create a mock agent for testing.""" + mock_client = MagicMock() + mock_client.create_response = AsyncMock( + return_value=MagicMock( + content="test response", + messages=[MagicMock(role="assistant", content="test response")], + ) + ) + agent = ChatAgent(name=name, chat_client=mock_client) + return agent + + +# Test executors +class EchoExecutor(Executor): + """Simple executor that echoes the input.""" + + @handler + async def handle(self, message: str, ctx: WorkflowContext[str]) -> None: + await ctx.send_message(f"echo: {message}") + + +class UpperExecutor(Executor): + """Executor that converts input to uppercase.""" + + @handler + async def handle(self, message: str, ctx: WorkflowContext[str]) -> None: + await ctx.send_message(message.upper()) + + +class OutputExecutor(Executor): + """Terminal executor that yields output.""" + + @handler + async def handle(self, message: str, ctx: WorkflowContext[None, str]) -> None: + await ctx.yield_output(message) + + +class ListOutputExecutor(Executor): + """Terminal executor that yields list output.""" + + @handler + async def handle(self, messages: list, ctx: WorkflowContext[None, list]) -> None: + await ctx.yield_output(messages) + + +class StringPassthroughExecutor(Executor): + """Executor that passes through string messages.""" + + @handler + async def handle(self, message: str, ctx: WorkflowContext[str]) -> None: + await ctx.send_message(message) + + +# Basic add_workflow tests + + +def test_add_workflow_with_concurrent_builder(): + """Test adding a ConcurrentBuilder workflow.""" + agent1 = create_mock_agent("agent1") + agent2 = create_mock_agent("agent2") + + concurrent = ConcurrentBuilder().participants([agent1, agent2]) + + builder = WorkflowBuilder() + result = builder.add_workflow(concurrent, id="analysis") + + # Should return self for chaining + assert result is builder + + # Should track the workflow mapping + assert "analysis" in builder._workflow_mappings + mapping = builder._workflow_mappings["analysis"] + assert mapping.entry == "analysis/dispatcher" + assert mapping.exits == ["analysis/aggregator"] + + +def test_add_workflow_with_sequential_builder(): + """Test adding a SequentialBuilder workflow.""" + agent1 = create_mock_agent("agent1") + agent2 = create_mock_agent("agent2") + + sequential = SequentialBuilder().participants([agent1, agent2]) + + builder = WorkflowBuilder() + result = builder.add_workflow(sequential, id="pipeline") + + # Should return self for chaining + assert result is builder + + # Should track the workflow mapping + assert "pipeline" in builder._workflow_mappings + mapping = builder._workflow_mappings["pipeline"] + assert mapping.entry == "pipeline/input-conversation" + assert mapping.exits == ["pipeline/end"] + + +def test_add_workflow_id_conflicts(): + """Test that duplicate workflow IDs raise errors.""" + agent1 = create_mock_agent("agent1") + agent2 = create_mock_agent("agent2") + + concurrent = ConcurrentBuilder().participants([agent1, agent2]) + + builder = WorkflowBuilder() + builder.add_workflow(concurrent, id="analysis") + + # Adding with same ID should fail + with pytest.raises(ValueError, match="already been used"): + builder.add_workflow(concurrent, id="analysis") + + +def test_add_workflow_executor_registry_conflict(): + """Test that workflow ID conflicts with executor registry raise errors.""" + agent1 = create_mock_agent("agent1") + agent2 = create_mock_agent("agent2") + + concurrent = ConcurrentBuilder().participants([agent1, agent2]) + + builder = WorkflowBuilder() + builder.register_executor(lambda: EchoExecutor(id="analysis"), name="analysis") + + # Adding workflow with same ID should fail + with pytest.raises(ValueError, match="conflicts with an existing registered executor name"): + builder.add_workflow(concurrent, id="analysis") + + +# Logical ID resolution tests + + +def test_set_start_executor_with_workflow_logical_id(): + """Test that set_start_executor resolves workflow logical IDs.""" + agent1 = create_mock_agent("agent1") + agent2 = create_mock_agent("agent2") + + concurrent = ConcurrentBuilder().participants([agent1, agent2]) + + builder = WorkflowBuilder() + builder.add_workflow(concurrent, id="analysis") + builder.set_start_executor("analysis") + + # Should resolve to the entry point + assert builder._start_executor == "analysis/dispatcher" + + +def test_add_edge_with_workflow_logical_ids(): + """Test that add_edge resolves workflow logical IDs.""" + agent1 = create_mock_agent("agent1") + agent2 = create_mock_agent("agent2") + agent3 = create_mock_agent("agent3") + + concurrent = ConcurrentBuilder().participants([agent1, agent2]) + sequential = SequentialBuilder().participants([agent3]) + + builder = WorkflowBuilder() + builder.add_workflow(concurrent, id="analysis") + builder.add_workflow(sequential, id="summary") + builder.add_edge("analysis", "summary") + + # Check that the edge was registered with resolved IDs + assert len(builder._edge_registry) == 1 + edge_reg = builder._edge_registry[0] + assert edge_reg.source == "analysis/aggregator" # exit point + assert edge_reg.target == "summary/input-conversation" # entry point + + +def test_add_edge_mixed_workflow_and_executor_ids(): + """Test mixing workflow logical IDs with regular executor IDs.""" + agent1 = create_mock_agent("agent1") + agent2 = create_mock_agent("agent2") + + concurrent = ConcurrentBuilder().participants([agent1, agent2]) + + builder = WorkflowBuilder() + builder.register_executor(lambda: EchoExecutor(id="prep"), name="prep") + builder.add_workflow(concurrent, id="analysis") + builder.register_executor(lambda: OutputExecutor(id="out"), name="output") + + # Connect prep -> analysis + builder.add_edge("prep", "analysis") + # Connect analysis -> output + builder.add_edge("analysis", "output") + + # Check that edges were resolved correctly + assert len(builder._edge_registry) == 2 + # First edge: prep -> analysis entry + assert builder._edge_registry[0].source == "prep" + assert builder._edge_registry[0].target == "analysis/dispatcher" + # Second edge: analysis exit -> output + assert builder._edge_registry[1].source == "analysis/aggregator" + assert builder._edge_registry[1].target == "output" + + +# Integration tests - full workflow builds + + +@pytest.mark.asyncio +async def test_compose_two_concurrent_workflows(): + """Test composing two concurrent workflows together.""" + agent1 = create_mock_agent("agent1") + agent2 = create_mock_agent("agent2") + agent3 = create_mock_agent("agent3") + + analysis = ConcurrentBuilder().participants([agent1, agent2]) + summary = SequentialBuilder().participants([agent3]) + + workflow = ( + WorkflowBuilder() + .add_workflow(analysis, id="analysis") + .add_workflow(summary, id="summary") + .add_edge("analysis", "summary") + .set_start_executor("analysis") + .build() + ) + + # Verify the workflow structure + assert workflow.start_executor_id == "analysis/dispatcher" + assert "analysis/dispatcher" in workflow.executors + assert "analysis/aggregator" in workflow.executors + assert "summary/input-conversation" in workflow.executors + assert "summary/end" in workflow.executors + + +@pytest.mark.asyncio +async def test_compose_with_pre_and_post_processing(): + """Test adding pre/post-processing executors around a workflow.""" + agent1 = create_mock_agent("agent1") + agent2 = create_mock_agent("agent2") + + concurrent = ConcurrentBuilder().participants([agent1, agent2]) + + workflow = ( + WorkflowBuilder() + .register_executor(lambda: StringPassthroughExecutor(id="pre"), name="pre") + .add_workflow(concurrent, id="analysis") + .register_executor(lambda: StringPassthroughExecutor(id="post"), name="post") + .add_edge("pre", "analysis") + .add_edge("analysis", "post") + .set_start_executor("pre") + .build() + ) + + # Verify the workflow structure + assert workflow.start_executor_id == "pre" + assert "pre" in workflow.executors + assert "analysis/dispatcher" in workflow.executors + assert "analysis/aggregator" in workflow.executors + assert "post" in workflow.executors + + +def test_add_fan_out_with_workflow_ids(): + """Test fan-out edges with workflow logical IDs.""" + agent1 = create_mock_agent("agent1") + agent2 = create_mock_agent("agent2") + agent3 = create_mock_agent("agent3") + agent4 = create_mock_agent("agent4") + + concurrent1 = ConcurrentBuilder().participants([agent1, agent2]) + concurrent2 = ConcurrentBuilder().participants([agent3, agent4]) + + builder = WorkflowBuilder() + builder.register_executor(lambda: EchoExecutor(id="source"), name="source") + builder.add_workflow(concurrent1, id="path1") + builder.add_workflow(concurrent2, id="path2") + builder.add_fan_out_edges("source", ["path1", "path2"]) + + # Check that the edge was registered with resolved IDs + fan_out_reg = builder._edge_registry[0] + assert fan_out_reg.source == "source" + assert fan_out_reg.targets == ["path1/dispatcher", "path2/dispatcher"] + + +def test_add_fan_in_with_workflow_ids(): + """Test fan-in edges with workflow logical IDs.""" + agent1 = create_mock_agent("agent1") + agent2 = create_mock_agent("agent2") + agent3 = create_mock_agent("agent3") + agent4 = create_mock_agent("agent4") + + concurrent1 = ConcurrentBuilder().participants([agent1, agent2]) + concurrent2 = ConcurrentBuilder().participants([agent3, agent4]) + + builder = WorkflowBuilder() + builder.add_workflow(concurrent1, id="source1") + builder.add_workflow(concurrent2, id="source2") + builder.register_executor(lambda: ListOutputExecutor(id="agg"), name="aggregator") + builder.add_fan_in_edges(["source1", "source2"], "aggregator") + + # Check that the edge was registered with resolved IDs + fan_in_reg = builder._edge_registry[0] + assert fan_in_reg.sources == ["source1/aggregator", "source2/aggregator"] + assert fan_in_reg.target == "aggregator" + + +def test_add_switch_case_with_workflow_ids(): + """Test switch-case edges with workflow logical IDs.""" + agent1 = create_mock_agent("agent1") + agent2 = create_mock_agent("agent2") + agent3 = create_mock_agent("agent3") + agent4 = create_mock_agent("agent4") + + concurrent1 = ConcurrentBuilder().participants([agent1, agent2]) + concurrent2 = ConcurrentBuilder().participants([agent3, agent4]) + + builder = WorkflowBuilder() + builder.register_executor(lambda: EchoExecutor(id="classifier"), name="classifier") + builder.add_workflow(concurrent1, id="fast") + builder.add_workflow(concurrent2, id="slow") + builder.add_switch_case_edge_group( + "classifier", + [ + Case(condition=lambda x: len(x) > 10, target="fast"), + Default(target="slow"), + ], + ) + + # Check that the edge was registered with resolved IDs + switch_reg = builder._edge_registry[0] + assert switch_reg.source == "classifier" + assert switch_reg.cases[0].target == "fast/dispatcher" + assert switch_reg.cases[1].target == "slow/dispatcher" + + +def test_invalid_source_type(): + """Test that invalid source types raise errors.""" + builder = WorkflowBuilder() + + with pytest.raises(ValueError, match="must be an orchestration builder"): + builder.add_workflow("not a builder", id="test") + + +def test_workflow_executor_prefix(): + """Test that executor IDs are properly prefixed.""" + agent1 = create_mock_agent("agent1") + agent2 = create_mock_agent("agent2") + + concurrent = ConcurrentBuilder().participants([agent1, agent2]) + + builder = WorkflowBuilder() + builder.add_workflow(concurrent, id="myworkflow") + + # Executors should be prefixed + assert "myworkflow/dispatcher" in builder._executors + # The agents get wrapped in AgentExecutors with the agent's name as ID + assert "myworkflow/agent1" in builder._executors + assert "myworkflow/agent2" in builder._executors + assert "myworkflow/aggregator" in builder._executors + + +def test_chained_add_workflow_calls(): + """Test that add_workflow returns self for chaining.""" + agent1 = create_mock_agent("agent1") + agent2 = create_mock_agent("agent2") + agent3 = create_mock_agent("agent3") + + concurrent = ConcurrentBuilder().participants([agent1, agent2]) + sequential = SequentialBuilder().participants([agent3]) + + # Should be able to chain all calls + builder = ( + WorkflowBuilder() + .add_workflow(concurrent, id="analysis") + .add_workflow(sequential, id="summary") + .add_edge("analysis", "summary") + .set_start_executor("analysis") + ) + + assert "analysis" in builder._workflow_mappings + assert "summary" in builder._workflow_mappings diff --git a/python/samples/getting_started/workflows/composition/workflow_composition.py b/python/samples/getting_started/workflows/composition/workflow_composition.py new file mode 100644 index 0000000000..02c346edcb --- /dev/null +++ b/python/samples/getting_started/workflows/composition/workflow_composition.py @@ -0,0 +1,246 @@ +# Copyright (c) Microsoft. All rights reserved. + +""" +Sample: Workflow Composition with add_workflow() + +What it does: +- Demonstrates composing high-level orchestration patterns (ConcurrentBuilder, SequentialBuilder) + using WorkflowBuilder.add_workflow() +- Shows how to chain workflows together with add_edge() using logical workflow IDs +- Demonstrates adding pre/post-processing executors around composed workflows + +This new API simplifies workflow composition by allowing you to: +1. Add orchestration patterns as logical units +2. Connect them using the workflow ID directly (no need to know internal executor names) +3. Mix and match high-level patterns with custom executors + +Prerequisites: +- Azure OpenAI or OpenAI API credentials (set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_API_KEY, + or OPENAI_API_KEY) +""" + +import asyncio +import os + +from agent_framework import ( + ChatAgent, + ConcurrentBuilder, + Executor, + SequentialBuilder, + WorkflowBuilder, + WorkflowContext, + handler, +) +from typing_extensions import Never + + +def create_chat_client(): + """Create a chat client based on available environment variables.""" + if os.environ.get("AZURE_OPENAI_ENDPOINT"): + from azure.identity import DefaultAzureCredential + + from agent_framework.azure_ai import AzureOpenAIChatClient + + return AzureOpenAIChatClient( + endpoint=os.environ["AZURE_OPENAI_ENDPOINT"], + credential=DefaultAzureCredential(), + model=os.environ.get("AZURE_OPENAI_DEPLOYMENT", "gpt-4o"), + ) + elif os.environ.get("OPENAI_API_KEY"): + from agent_framework.openai import OpenAIChatClient + + return OpenAIChatClient( + api_key=os.environ["OPENAI_API_KEY"], + model=os.environ.get("OPENAI_MODEL", "gpt-4o"), + ) + else: + raise ValueError( + "Please set either AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_API_KEY, " + "or OPENAI_API_KEY environment variables." + ) + + +# Custom preprocessor executor +class InputFormatter(Executor): + """Formats the input message before sending to analysis.""" + + def __init__(self): + super().__init__(id="input_formatter") + + @handler + async def format_input(self, message: str, ctx: WorkflowContext[str]) -> None: + """Format the input message.""" + print(f"\n{'='*60}") + print("INPUT FORMATTER") + print(f"{'='*60}") + print(f"Received: {message[:100]}...") + formatted = f"Please analyze the following topic thoroughly:\n\n{message}" + print("Formatted input for analysis agents.") + await ctx.send_message(formatted) + + +# Custom postprocessor executor +class OutputFormatter(Executor): + """Formats the final output from the workflow.""" + + def __init__(self): + super().__init__(id="output_formatter") + + @handler + async def format_output(self, messages: list, ctx: WorkflowContext[Never, str]) -> None: + """Format and yield the final output.""" + print(f"\n{'='*60}") + print("OUTPUT FORMATTER") + print(f"{'='*60}") + + # Extract text content from the messages + output_parts = [] + for msg in messages: + if hasattr(msg, "content"): + output_parts.append(str(msg.content)) + elif hasattr(msg, "text"): + output_parts.append(str(msg.text)) + else: + output_parts.append(str(msg)) + + final_output = "\n\n---\n\n".join(output_parts) + print(f"Compiled {len(output_parts)} analysis sections into final report.") + await ctx.yield_output(final_output) + + +async def example_simple_composition(): + """Example: Compose two high-level patterns together. + + This shows the simplest use case - chaining a ConcurrentBuilder (parallel analysis) + with a SequentialBuilder (summarization). + """ + print("\n" + "=" * 80) + print("EXAMPLE 1: Simple Workflow Composition") + print("=" * 80) + print("Chaining ConcurrentBuilder -> SequentialBuilder") + + client = create_chat_client() + + # Create agents for parallel analysis + technical_analyst = ChatAgent( + name="technical_analyst", + chat_client=client, + instructions="You are a technical analyst. Analyze the technical aspects of the topic in 2-3 sentences.", + ) + business_analyst = ChatAgent( + name="business_analyst", + chat_client=client, + instructions="You are a business analyst. Analyze the business implications in 2-3 sentences.", + ) + + # Create agent for summarization + summarizer = ChatAgent( + name="summarizer", + chat_client=client, + instructions="You are a summarizer. Combine the analyses into a brief executive summary in 2-3 sentences.", + ) + + # Create high-level orchestration patterns + analysis = ConcurrentBuilder().participants([technical_analyst, business_analyst]) + summary = SequentialBuilder().participants([summarizer]) + + # Compose them together using add_workflow() + workflow = ( + WorkflowBuilder() + .add_workflow(analysis, id="analysis") + .add_workflow(summary, id="summary") + .add_edge("analysis", "summary") # Framework resolves to analysis/aggregator -> summary/input-conversation + .set_start_executor("analysis") # Framework knows this means analysis/dispatcher + .build() + ) + + print("\nWorkflow structure:") + print(f" Start executor: {workflow.start_executor_id}") + print(f" Executors: {list(workflow.executors.keys())}") + + # Run the workflow + result = await workflow.run("Artificial Intelligence in Healthcare") + outputs = result.get_outputs() + + print("\n--- Final Output ---") + for output in outputs: + if hasattr(output, "__iter__") and not isinstance(output, str): + for item in output: + print(item.content if hasattr(item, "content") else item) + else: + print(output) + + +async def example_pre_post_processing(): + """Example: Add pre/post-processing around a high-level pattern. + + This shows how to wrap a ConcurrentBuilder with custom preprocessing + and postprocessing executors. + """ + print("\n" + "=" * 80) + print("EXAMPLE 2: Pre/Post Processing with Composed Workflows") + print("=" * 80) + print("InputFormatter -> ConcurrentBuilder -> OutputFormatter") + + client = create_chat_client() + + # Create agents for analysis + technical_analyst = ChatAgent( + name="technical_analyst", + chat_client=client, + instructions="You are a technical analyst. Provide a technical analysis in 2-3 sentences.", + ) + market_analyst = ChatAgent( + name="market_analyst", + chat_client=client, + instructions="You are a market analyst. Provide market analysis in 2-3 sentences.", + ) + + # Create the concurrent analysis pattern + analysis = ConcurrentBuilder().participants([technical_analyst, market_analyst]) + + # Compose with pre/post-processing + workflow = ( + WorkflowBuilder() + .register_executor(InputFormatter, name="input_formatter") + .add_workflow(analysis, id="analysis") + .register_executor(OutputFormatter, name="output_formatter") + .add_edge("input_formatter", "analysis") + .add_edge("analysis", "output_formatter") + .set_start_executor("input_formatter") + .build() + ) + + print("\nWorkflow structure:") + print(f" Start executor: {workflow.start_executor_id}") + print(f" Executors: {list(workflow.executors.keys())}") + + # Run the workflow + result = await workflow.run("Electric Vehicle Adoption") + outputs = result.get_outputs() + + print("\n--- Final Output ---") + for output in outputs: + print(output) + + +async def main(): + """Run all composition examples.""" + print("=" * 80) + print("WORKFLOW COMPOSITION SAMPLES") + print("=" * 80) + print("\nThese examples demonstrate the new add_workflow() API for composing") + print("high-level orchestration patterns (ConcurrentBuilder, SequentialBuilder)") + print("with WorkflowBuilder.\n") + + # Run examples + await example_simple_composition() + await example_pre_post_processing() + + print("\n" + "=" * 80) + print("ALL EXAMPLES COMPLETED") + print("=" * 80) + + +if __name__ == "__main__": + asyncio.run(main()) From 4733c134f9a4584f4123ae2458d2d0b90c42cd2a Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Tue, 20 Jan 2026 13:25:40 +0900 Subject: [PATCH 2/3] Latest work on workflow composability. New design doc. --- .../decisions/00XX-workflow-composability.md | 82 +++++++++++++- .../agent_framework/_workflows/_validation.py | 5 + .../_workflows/_workflow_builder.py | 16 +-- .../workflow/test_workflow_composition.py | 17 ++- .../composition/workflow_composition.py | 104 +++++++++++------- 5 files changed, 168 insertions(+), 56 deletions(-) rename python/docs/XX-workflow-compose.md => docs/decisions/00XX-workflow-composability.md (81%) diff --git a/python/docs/XX-workflow-compose.md b/docs/decisions/00XX-workflow-composability.md similarity index 81% rename from python/docs/XX-workflow-compose.md rename to docs/decisions/00XX-workflow-composability.md index f84d231290..ca145e4adb 100644 --- a/python/docs/XX-workflow-compose.md +++ b/docs/decisions/00XX-workflow-composability.md @@ -9,6 +9,30 @@ informed: team # Workflow Composability Design +## Table of Contents + +- [Problem Statement](#problem-statement) +- [Goals](#goals) +- [Current State](#current-state) +- [Proposed API](#proposed-api) + - [OrchestrationBuilder Protocol](#orchestrationbuilder-protocol) + - [Core Addition: `add_workflow()`](#core-addition-add_workflow) + - [Usage](#usage) +- [Type Safety](#type-safety) +- [Implementation](#implementation) + - [How `add_workflow()` Works Internally](#how-add_workflow-works-internally) + - [How `add_edge()` Resolves Logical IDs](#how-add_edge-resolves-logical-ids) + - [Extracting Builder from High-Level Builders](#extracting-builder-from-high-level-builders) +- [What We're NOT Doing](#what-were-not-doing) +- [Migration](#migration) +- [Open Questions](#open-questions) +- [Alternatives Considered](#alternatives-considered) +- [Design Decisions](#design-decisions) + - [Type Validation with `yield_output` vs `send_message`](#type-validation-with-yield_output-vs-send_message) +- [Implementation Phases](#implementation-phases) + +--- + ## Problem Statement Users want to extend high-level builder patterns in two ways: @@ -609,9 +633,59 @@ pipeline: --- +## Design Decisions + +### Type Validation with `yield_output` vs `send_message` + +**Problem:** High-level builders like `ConcurrentBuilder` use `yield_output()` in their terminal executors (e.g., aggregator) to produce workflow output. However, when composing workflows via `add_workflow()`, edges connect the terminal executor to downstream executors. The type validation originally only checked `output_types` (types from `send_message()`), not `workflow_output_types` (types from `yield_output()`), resulting in spurious warnings. + +**Options Considered:** + +1. **Automatically swap aggregators when composing** - When `add_workflow()` is called, detect if the source has a terminal executor using `yield_output()` and swap it for one using `send_message()`. + - **Rejected:** Too implicit. The same builder would behave differently standalone vs composed, making debugging difficult. + +2. **Modify `_to_builder()` to use a forwarding aggregator** - Similar to option 1 but at the `_to_builder()` level. + - **Rejected:** Same issues - hidden behavior change violates principle of least surprise. + +3. **Enhance type validation to check `workflow_output_types`** - When validating edge type compatibility, if `output_types` is empty, fall back to `workflow_output_types`. + - **Chosen:** Simple, explicit, no runtime behavior change. The validation becomes smarter without changing how executors work. + +4. **Have runtime automatically forward `yield_output` data through edges** - Detect when an executor yields output but has outgoing edges, and forward that output as a message. + - **Rejected:** Changes runtime semantics in potentially surprising ways. Mixing `yield_output` (workflow output) with `send_message` (edge routing) should remain explicit. + +**Implementation:** Modified `_validate_edge_type_compatibility()` in `_validation.py` to fall back to `workflow_output_types` when `output_types` is empty: + +```python +# Get output types from source executor +# First try send_message output types, then fall back to yield_output types +# This supports workflow composition where terminal executors (using yield_output) +# may be connected to downstream executors via add_workflow() +source_output_types = list(source_executor.output_types) +if not source_output_types: + source_output_types = list(source_executor.workflow_output_types) +``` + +**Note for users:** When adding post-processing after a composed workflow's terminal executor (e.g., adding an `OutputFormatter` after `ConcurrentBuilder`), the terminal executor must use `send_message()` instead of `yield_output()` for the data to flow through the edge. This can be achieved with a custom aggregator: + +```python +class ForwardingAggregator(Executor): + @handler + async def aggregate( + self, results: list[AgentExecutorResponse], ctx: WorkflowContext[list[ChatMessage]] + ) -> None: + # Extract and forward messages (uses send_message, not yield_output) + messages = [msg for r in results for msg in r.agent_response.messages if msg.role == Role.ASSISTANT] + await ctx.send_message(messages) + +# Use with ConcurrentBuilder +analysis = ConcurrentBuilder().participants([...]).with_aggregator(ForwardingAggregator()) +``` + +--- + ## Implementation Phases -### Phase 1: Core `add_workflow()` +### Phase 1: Core `add_workflow()` ✅ 1. Add internal `WorkflowMapping` dataclass (entry/exit tracking) 2. Add `_to_builder()` to ConcurrentBuilder and SequentialBuilder @@ -619,10 +693,10 @@ pipeline: 4. Add logical ID resolution to `add_edge()` and `set_start_executor()` 5. Add tests for basic composition -### Phase 2: Type Validation +### Phase 2: Type Validation ✅ -1. Add `_input_types` / `_output_types` properties to builders -2. Enhance `build()` validation to check cross-workflow type compatibility +1. ~~Add `_input_types` / `_output_types` properties to builders~~ (Using existing executor type introspection) +2. Enhance `build()` validation to check cross-workflow type compatibility (fall back to `workflow_output_types`) 3. Improve error messages ### Phase 3: Remaining Builders diff --git a/python/packages/core/agent_framework/_workflows/_validation.py b/python/packages/core/agent_framework/_workflows/_validation.py index fc59bb94e1..1cac8cdc93 100644 --- a/python/packages/core/agent_framework/_workflows/_validation.py +++ b/python/packages/core/agent_framework/_workflows/_validation.py @@ -229,7 +229,12 @@ def _validate_edge_type_compatibility(self, edge: Edge, edge_group: EdgeGroup) - target_executor = self._executors[edge.target_id] # Get output types from source executor + # First try send_message output types, then fall back to yield_output types + # This supports workflow composition where terminal executors (using yield_output) + # may be connected to downstream executors via add_workflow() source_output_types = list(source_executor.output_types) + if not source_output_types: + source_output_types = list(source_executor.workflow_output_types) # Get input types from target executor target_input_types = target_executor.input_types diff --git a/python/packages/core/agent_framework/_workflows/_workflow_builder.py b/python/packages/core/agent_framework/_workflows/_workflow_builder.py index 5feac43785..0941b13228 100644 --- a/python/packages/core/agent_framework/_workflows/_workflow_builder.py +++ b/python/packages/core/agent_framework/_workflows/_workflow_builder.py @@ -4,7 +4,7 @@ import sys from collections.abc import Callable, Sequence from dataclasses import dataclass -from typing import Any +from typing import Any, cast from typing_extensions import deprecated @@ -539,7 +539,9 @@ async def process(self, count: int, ctx: WorkflowContext[Never, str]) -> None: # Resolve workflow logical IDs: source uses exit point, target uses entry point resolved_source = self._resolve_workflow_exit(source) resolved_target = self._resolve_workflow_entry(target) - self._edge_registry.append(_EdgeRegistration(source=resolved_source, target=resolved_target, condition=condition)) + self._edge_registry.append( + _EdgeRegistration(source=resolved_source, target=resolved_target, condition=condition) + ) return self # Both are Executor/AgentProtocol instances; wrap and add now @@ -724,7 +726,7 @@ async def handle(self, result: Result, ctx: WorkflowContext) -> None: resolved_source = self._resolve_workflow_exit(source) resolved_cases: list[Case | Default] = [] for case in cases: - target_str = case.target # type: ignore[assignment] + target_str = cast(str, case.target) resolved_target = self._resolve_workflow_entry(target_str) if isinstance(case, Default): resolved_cases.append(Default(target=resolved_target)) @@ -1231,7 +1233,7 @@ def add_workflow( .add_workflow(analysis, id="analysis") .add_workflow(summary, id="summary") .add_edge("analysis", "summary") # Framework resolves entry/exit - .set_start_executor("analysis") # Framework knows entry point + .set_start_executor("analysis") # Framework knows entry point .build() ) @@ -1390,13 +1392,11 @@ def _resolve_workflow_exit(self, id: str) -> str: mapping = self._workflow_mappings[id] if len(mapping.exits) == 0: raise ValueError( - f"Workflow '{id}' has no known exit points. " - f"Use explicit executor IDs like '{id}/'." + f"Workflow '{id}' has no known exit points. Use explicit executor IDs like '{id}/'." ) if len(mapping.exits) != 1: raise ValueError( - f"Workflow '{id}' has {len(mapping.exits)} exit points. " - f"Use explicit IDs: {mapping.exits}" + f"Workflow '{id}' has {len(mapping.exits)} exit points. Use explicit IDs: {mapping.exits}" ) return mapping.exits[0] return id diff --git a/python/packages/core/tests/workflow/test_workflow_composition.py b/python/packages/core/tests/workflow/test_workflow_composition.py index 9fa7cd962f..6c9505fcd6 100644 --- a/python/packages/core/tests/workflow/test_workflow_composition.py +++ b/python/packages/core/tests/workflow/test_workflow_composition.py @@ -2,13 +2,13 @@ """Tests for workflow composition via add_workflow().""" -from typing import Any from unittest.mock import AsyncMock, MagicMock import pytest from agent_framework import ( ChatAgent, + ChatMessage, ConcurrentBuilder, Executor, SequentialBuilder, @@ -28,8 +28,7 @@ def create_mock_agent(name: str) -> ChatAgent: messages=[MagicMock(role="assistant", content="test response")], ) ) - agent = ChatAgent(name=name, chat_client=mock_client) - return agent + return ChatAgent(name=name, chat_client=mock_client) # Test executors @@ -73,6 +72,14 @@ async def handle(self, message: str, ctx: WorkflowContext[str]) -> None: await ctx.send_message(message) +class ChatMessageListPassthroughExecutor(Executor): + """Executor that passes through list[ChatMessage] messages.""" + + @handler + async def handle(self, messages: list[ChatMessage], ctx: WorkflowContext[list[ChatMessage]]) -> None: + await ctx.send_message(messages) + + # Basic add_workflow tests @@ -250,11 +257,13 @@ async def test_compose_with_pre_and_post_processing(): concurrent = ConcurrentBuilder().participants([agent1, agent2]) + # Pre-processor accepts str (like the dispatcher), post-processor accepts list[ChatMessage] + # (like aggregator outputs) workflow = ( WorkflowBuilder() .register_executor(lambda: StringPassthroughExecutor(id="pre"), name="pre") .add_workflow(concurrent, id="analysis") - .register_executor(lambda: StringPassthroughExecutor(id="post"), name="post") + .register_executor(lambda: ChatMessageListPassthroughExecutor(id="post"), name="post") .add_edge("pre", "analysis") .add_edge("analysis", "post") .set_start_executor("pre") diff --git a/python/samples/getting_started/workflows/composition/workflow_composition.py b/python/samples/getting_started/workflows/composition/workflow_composition.py index 02c346edcb..de53d2b0fc 100644 --- a/python/samples/getting_started/workflows/composition/workflow_composition.py +++ b/python/samples/getting_started/workflows/composition/workflow_composition.py @@ -15,49 +15,36 @@ 3. Mix and match high-level patterns with custom executors Prerequisites: -- Azure OpenAI or OpenAI API credentials (set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_API_KEY, - or OPENAI_API_KEY) +- Azure OpenAI credentials """ import asyncio import os from agent_framework import ( + AgentExecutorResponse, ChatAgent, + ChatMessage, ConcurrentBuilder, Executor, + Role, SequentialBuilder, WorkflowBuilder, WorkflowContext, handler, ) +from agent_framework.azure import AzureOpenAIChatClient +from azure.identity import DefaultAzureCredential from typing_extensions import Never def create_chat_client(): """Create a chat client based on available environment variables.""" - if os.environ.get("AZURE_OPENAI_ENDPOINT"): - from azure.identity import DefaultAzureCredential - - from agent_framework.azure_ai import AzureOpenAIChatClient - - return AzureOpenAIChatClient( - endpoint=os.environ["AZURE_OPENAI_ENDPOINT"], - credential=DefaultAzureCredential(), - model=os.environ.get("AZURE_OPENAI_DEPLOYMENT", "gpt-4o"), - ) - elif os.environ.get("OPENAI_API_KEY"): - from agent_framework.openai import OpenAIChatClient - - return OpenAIChatClient( - api_key=os.environ["OPENAI_API_KEY"], - model=os.environ.get("OPENAI_MODEL", "gpt-4o"), - ) - else: - raise ValueError( - "Please set either AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_API_KEY, " - "or OPENAI_API_KEY environment variables." - ) + return AzureOpenAIChatClient( + endpoint=os.environ["AZURE_OPENAI_ENDPOINT"], + credential=DefaultAzureCredential(), + model=os.environ.get("AZURE_OPENAI_DEPLOYMENT", "gpt-4o"), + ) # Custom preprocessor executor @@ -70,15 +57,41 @@ def __init__(self): @handler async def format_input(self, message: str, ctx: WorkflowContext[str]) -> None: """Format the input message.""" - print(f"\n{'='*60}") + print(f"\n{'=' * 60}") print("INPUT FORMATTER") - print(f"{'='*60}") + print(f"{'=' * 60}") print(f"Received: {message[:100]}...") formatted = f"Please analyze the following topic thoroughly:\n\n{message}" print("Formatted input for analysis agents.") await ctx.send_message(formatted) +# Custom aggregator that forwards messages to downstream executors (instead of yielding) +class ForwardingAggregator(Executor): + """Aggregates agent responses and forwards them to downstream executors. + + Unlike the default ConcurrentBuilder aggregator which yields output (terminal), + this aggregator uses send_message to forward to downstream executors. + """ + + def __init__(self): + super().__init__(id="forwarding_aggregator") + + @handler + async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowContext[list[ChatMessage]]) -> None: + """Aggregate responses and forward to downstream.""" + # Extract assistant messages from each agent's response + messages: list[ChatMessage] = [] + for r in results: + if r.agent_response and r.agent_response.messages: + for msg in r.agent_response.messages: + if msg.role == Role.ASSISTANT: + messages.append(msg) + + # Forward to downstream executor (OutputFormatter) + await ctx.send_message(messages) + + # Custom postprocessor executor class OutputFormatter(Executor): """Formats the final output from the workflow.""" @@ -87,19 +100,17 @@ def __init__(self): super().__init__(id="output_formatter") @handler - async def format_output(self, messages: list, ctx: WorkflowContext[Never, str]) -> None: + async def format_output(self, messages: list[ChatMessage], ctx: WorkflowContext[Never, str]) -> None: """Format and yield the final output.""" - print(f"\n{'='*60}") + print(f"\n{'=' * 60}") print("OUTPUT FORMATTER") - print(f"{'='*60}") + print(f"{'=' * 60}") # Extract text content from the messages - output_parts = [] + output_parts: list[str] = [] for msg in messages: - if hasattr(msg, "content"): - output_parts.append(str(msg.content)) - elif hasattr(msg, "text"): - output_parts.append(str(msg.text)) + if isinstance(msg, ChatMessage): + output_parts.append(msg.text) else: output_parts.append(str(msg)) @@ -164,11 +175,16 @@ async def example_simple_composition(): print("\n--- Final Output ---") for output in outputs: - if hasattr(output, "__iter__") and not isinstance(output, str): + if isinstance(output, list): for item in output: - print(item.content if hasattr(item, "content") else item) + if isinstance(item, ChatMessage): + print(f"\n{item.text}") + else: + print(f"\n{item}") + elif isinstance(output, ChatMessage): + print(f"\n{output.text}") else: - print(output) + print(f"\n{output}") async def example_pre_post_processing(): @@ -176,11 +192,15 @@ async def example_pre_post_processing(): This shows how to wrap a ConcurrentBuilder with custom preprocessing and postprocessing executors. + + Note: The default ConcurrentBuilder aggregator uses yield_output() which makes it + a terminal node. To chain to downstream executors, we use a custom ForwardingAggregator + that uses send_message() instead. """ print("\n" + "=" * 80) print("EXAMPLE 2: Pre/Post Processing with Composed Workflows") print("=" * 80) - print("InputFormatter -> ConcurrentBuilder -> OutputFormatter") + print("InputFormatter -> ConcurrentBuilder (with custom aggregator) -> OutputFormatter") client = create_chat_client() @@ -196,8 +216,12 @@ async def example_pre_post_processing(): instructions="You are a market analyst. Provide market analysis in 2-3 sentences.", ) - # Create the concurrent analysis pattern - analysis = ConcurrentBuilder().participants([technical_analyst, market_analyst]) + # Create the concurrent analysis pattern with a custom forwarding aggregator + # The ForwardingAggregator uses send_message() to forward to downstream executors, + # unlike the default aggregator which uses yield_output() (terminal) + analysis = ( + ConcurrentBuilder().participants([technical_analyst, market_analyst]).with_aggregator(ForwardingAggregator()) + ) # Compose with pre/post-processing workflow = ( From 7faa6add4ff4252124ceb61077bc94f18fc0744c Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Tue, 20 Jan 2026 14:41:43 +0900 Subject: [PATCH 3/3] Address copilot feedback --- .../getting_started/workflows/README.md | 1 + .../composition/workflow_composition.py | 31 ++++++++++++++----- 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/python/samples/getting_started/workflows/README.md b/python/samples/getting_started/workflows/README.md index 8ca5e0f4bc..7c1ed812bb 100644 --- a/python/samples/getting_started/workflows/README.md +++ b/python/samples/getting_started/workflows/README.md @@ -62,6 +62,7 @@ Once comfortable with these, explore the rest of the samples below. | Sample | File | Concepts | |---|---|---| +| Workflow Composition | [composition/workflow_composition.py](./composition/workflow_composition.py) | Compose high-level builders (ConcurrentBuilder, SequentialBuilder) using add_workflow() with logical ID resolution | | Sub-Workflow (Basics) | [composition/sub_workflow_basics.py](./composition/sub_workflow_basics.py) | Wrap a workflow as an executor and orchestrate sub-workflows | | Sub-Workflow: Request Interception | [composition/sub_workflow_request_interception.py](./composition/sub_workflow_request_interception.py) | Intercept and forward sub-workflow requests using @handler for SubWorkflowRequestMessage | | Sub-Workflow: Parallel Requests | [composition/sub_workflow_parallel_requests.py](./composition/sub_workflow_parallel_requests.py) | Multiple specialized interceptors handling different request types from same sub-workflow | diff --git a/python/samples/getting_started/workflows/composition/workflow_composition.py b/python/samples/getting_started/workflows/composition/workflow_composition.py index de53d2b0fc..a3af62da6d 100644 --- a/python/samples/getting_started/workflows/composition/workflow_composition.py +++ b/python/samples/getting_started/workflows/composition/workflow_composition.py @@ -47,9 +47,21 @@ def create_chat_client(): ) -# Custom preprocessor executor +# ============================================================================= +# EXAMPLE CUSTOM EXECUTORS +# ============================================================================= +# The executors below are FOR DEMONSTRATION PURPOSES ONLY. They show patterns +# for building your own pre-processors, post-processors, and custom aggregators. +# These are NOT part of the agent_framework library - you would write your own +# versions tailored to your specific use case. +# ============================================================================= + + class InputFormatter(Executor): - """Formats the input message before sending to analysis.""" + """Example preprocessor that formats input before sending to analysis. + + This demonstrates how to add custom preprocessing before a composed workflow. + """ def __init__(self): super().__init__(id="input_formatter") @@ -66,12 +78,13 @@ async def format_input(self, message: str, ctx: WorkflowContext[str]) -> None: await ctx.send_message(formatted) -# Custom aggregator that forwards messages to downstream executors (instead of yielding) class ForwardingAggregator(Executor): - """Aggregates agent responses and forwards them to downstream executors. + """Example aggregator that forwards messages to downstream executors. - Unlike the default ConcurrentBuilder aggregator which yields output (terminal), - this aggregator uses send_message to forward to downstream executors. + This demonstrates how to create a custom aggregator for ConcurrentBuilder + when you need to chain to downstream executors. The default aggregator uses + yield_output() which makes it terminal. This version uses send_message() + to forward data through edges to downstream executors. """ def __init__(self): @@ -92,9 +105,11 @@ async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowCon await ctx.send_message(messages) -# Custom postprocessor executor class OutputFormatter(Executor): - """Formats the final output from the workflow.""" + """Example postprocessor that formats the final output from the workflow. + + This demonstrates how to add custom postprocessing after a composed workflow. + """ def __init__(self): super().__init__(id="output_formatter")