Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
706 changes: 706 additions & 0 deletions docs/decisions/00XX-workflow-composability.md
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Don't forget to update document number instead of 00XX.

Large diffs are not rendered by default.

52 changes: 32 additions & 20 deletions python/packages/core/agent_framework/_workflows/_concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,29 +525,14 @@ def _resolve_participants(self) -> list[Executor]:

return executors

def build(self) -> Workflow:
r"""Build and validate the concurrent workflow.
def _to_builder(self) -> tuple["WorkflowBuilder", str, list[str]]:
"""Build the internal WorkflowBuilder without calling build().

Wiring pattern:
- Dispatcher (internal) fans out the input to all `participants`
- Fan-in collects `AgentExecutorResponse` objects from all participants
- If request info is enabled, the orchestration emits a request info event with outputs from all participants
before sending the outputs to the aggregator
- Aggregator yields output and the workflow becomes idle. The output is either:
- list[ChatMessage] (default aggregator: one user + one assistant per agent)
- custom payload from the provided aggregator
This is used for workflow composition via WorkflowBuilder.add_workflow().

Returns:
Workflow: a ready-to-run workflow instance

Raises:
ValueError: if no participants were defined

Example:

.. code-block:: python

workflow = ConcurrentBuilder().participants([agent1, agent2]).build()
A tuple of (WorkflowBuilder, start_executor_id, terminal_executor_ids).
The terminal executor is the aggregator.
"""
if not self._participants and not self._participant_factories:
raise ValueError(
Expand Down Expand Up @@ -579,4 +564,31 @@ def build(self) -> Workflow:
if self._checkpoint_storage is not None:
builder = builder.with_checkpointing(self._checkpoint_storage)

return builder, dispatcher.id, [aggregator.id]

def build(self) -> Workflow:
r"""Build and validate the concurrent workflow.

Wiring pattern:
- Dispatcher (internal) fans out the input to all `participants`
- Fan-in collects `AgentExecutorResponse` objects from all participants
- If request info is enabled, the orchestration emits a request info event with outputs from all participants
before sending the outputs to the aggregator
- Aggregator yields output and the workflow becomes idle. The output is either:
- list[ChatMessage] (default aggregator: one user + one assistant per agent)
- custom payload from the provided aggregator

Returns:
Workflow: a ready-to-run workflow instance

Raises:
ValueError: if no participants were defined

Example:

.. code-block:: python

workflow = ConcurrentBuilder().participants([agent1, agent2]).build()
"""
builder, _, _ = self._to_builder()
return builder.build()
76 changes: 76 additions & 0 deletions python/packages/core/agent_framework/_workflows/_edge.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,29 @@ class CustomGroup(EdgeGroup):
cls._TYPE_REGISTRY[subclass.__name__] = subclass
return subclass

def _with_prefix(self, prefix: str) -> "EdgeGroup":
"""Create a copy of this edge group with prefixed executor IDs.

This is used internally for workflow composition when merging edge groups
from one builder into another.

Args:
prefix: The prefix to add to all executor IDs (e.g., "analysis").

Returns:
A new EdgeGroup instance with prefixed IDs.
"""
prefixed_edges = [
Edge(
source_id=f"{prefix}/{edge.source_id}",
target_id=f"{prefix}/{edge.target_id}",
condition=edge._condition,
condition_name=edge.condition_name,
)
for edge in self.edges
]
return EdgeGroup(prefixed_edges, type=self.type)

@classmethod
def from_dict(cls, data: dict[str, Any]) -> "EdgeGroup":
"""Hydrate the correct `EdgeGroup` subclass from serialised state.
Expand Down Expand Up @@ -493,6 +516,15 @@ def __init__(
edge = Edge(source_id=source_id, target_id=target_id, condition=condition)
super().__init__([edge], id=id, type=self.__class__.__name__)

def _with_prefix(self, prefix: str) -> "SingleEdgeGroup":
"""Create a copy with prefixed executor IDs."""
edge = self.edges[0]
return SingleEdgeGroup(
source_id=f"{prefix}/{edge.source_id}",
target_id=f"{prefix}/{edge.target_id}",
condition=edge._condition,
)


@EdgeGroup.register
@dataclass(init=False)
Expand Down Expand Up @@ -608,6 +640,16 @@ def to_dict(self) -> dict[str, Any]:
payload["selection_func_name"] = self.selection_func_name
return payload

def _with_prefix(self, prefix: str) -> "FanOutEdgeGroup":
"""Create a copy with prefixed executor IDs."""
source_id = self.edges[0].source_id
return FanOutEdgeGroup(
source_id=f"{prefix}/{source_id}",
target_ids=[f"{prefix}/{tid}" for tid in self._target_ids],
selection_func=self._selection_func,
selection_func_name=self.selection_func_name,
)


@EdgeGroup.register
@dataclass(init=False)
Expand Down Expand Up @@ -643,6 +685,15 @@ def __init__(self, source_ids: Sequence[str], target_id: str, *, id: str | None
edges = [Edge(source_id=source, target_id=target_id) for source in source_ids]
super().__init__(edges, id=id, type=self.__class__.__name__)

def _with_prefix(self, prefix: str) -> "FanInEdgeGroup":
"""Create a copy with prefixed executor IDs."""
target_id = self.edges[0].target_id
source_ids = [edge.source_id for edge in self.edges]
return FanInEdgeGroup(
source_ids=[f"{prefix}/{sid}" for sid in source_ids],
target_id=f"{prefix}/{target_id}",
)


@dataclass(init=False)
class SwitchCaseEdgeGroupCase(DictConvertible):
Expand Down Expand Up @@ -902,6 +953,26 @@ def to_dict(self) -> dict[str, Any]:
payload["cases"] = [encode_value(case) for case in self.cases]
return payload

def _with_prefix(self, prefix: str) -> "SwitchCaseEdgeGroup":
"""Create a copy with prefixed executor IDs."""
source_id = self.edges[0].source_id
prefixed_cases: list[SwitchCaseEdgeGroupCase | SwitchCaseEdgeGroupDefault] = []
for case in self.cases:
if isinstance(case, SwitchCaseEdgeGroupDefault):
prefixed_cases.append(SwitchCaseEdgeGroupDefault(target_id=f"{prefix}/{case.target_id}"))
else:
prefixed_cases.append(
SwitchCaseEdgeGroupCase(
condition=case._condition,
target_id=f"{prefix}/{case.target_id}",
condition_name=case.condition_name,
)
)
return SwitchCaseEdgeGroup(
source_id=f"{prefix}/{source_id}",
cases=prefixed_cases,
)


@EdgeGroup.register
@dataclass(init=False)
Expand Down Expand Up @@ -939,3 +1010,8 @@ def __init__(self, executor_id: str) -> None:
"""
edge = Edge(source_id=INTERNAL_SOURCE_ID(executor_id), target_id=executor_id)
super().__init__([edge])

def _with_prefix(self, prefix: str) -> "InternalEdgeGroup":
"""Create a copy with prefixed executor IDs."""
executor_id = self.edges[0].target_id
return InternalEdgeGroup(executor_id=f"{prefix}/{executor_id}")
16 changes: 16 additions & 0 deletions python/packages/core/agent_framework/_workflows/_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,22 @@ async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
"""
...

def _clone_with_id(self, new_id: str) -> "Executor":
"""Create a clone of this executor with a new ID.

This is used internally for workflow composition when merging executors
from one builder into another with prefixed IDs.

Args:
new_id: The new ID for the cloned executor.

Returns:
A new Executor instance with the same configuration but a different ID.
"""
cloned = copy.copy(self)
cloned.id = new_id
return cloned


# endregion: Executor

Expand Down
32 changes: 22 additions & 10 deletions python/packages/core/agent_framework/_workflows/_sequential.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,17 +275,14 @@ def _resolve_participants(self) -> list[Executor]:

return executors

def build(self) -> Workflow:
"""Build and validate the sequential workflow.
def _to_builder(self) -> tuple["WorkflowBuilder", str, list[str]]:
"""Build the internal WorkflowBuilder without calling build().

Wiring pattern:
- _InputToConversation normalizes the initial input into list[ChatMessage]
- For each participant in order:
- If Agent (or AgentExecutor): pass conversation to the agent, then optionally
route through a request info interceptor, then convert response to conversation
via _ResponseToConversation
- Else (custom Executor): pass conversation directly to the executor
- _EndWithConversation yields the final conversation and the workflow becomes idle
This is used for workflow composition via WorkflowBuilder.add_workflow().

Returns:
A tuple of (WorkflowBuilder, start_executor_id, terminal_executor_ids).
The terminal executor is the end executor.
"""
if not self._participants and not self._participant_factories:
raise ValueError(
Expand Down Expand Up @@ -320,4 +317,19 @@ def build(self) -> Workflow:
if self._checkpoint_storage is not None:
builder = builder.with_checkpointing(self._checkpoint_storage)

return builder, input_conv.id, [end.id]

def build(self) -> Workflow:
"""Build and validate the sequential workflow.

Wiring pattern:
- _InputToConversation normalizes the initial input into list[ChatMessage]
- For each participant in order:
- If Agent (or AgentExecutor): pass conversation to the agent, then optionally
route through a request info interceptor, then convert response to conversation
via _ResponseToConversation
- Else (custom Executor): pass conversation directly to the executor
- _EndWithConversation yields the final conversation and the workflow becomes idle
"""
builder, _, _ = self._to_builder()
return builder.build()
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,12 @@ def _validate_edge_type_compatibility(self, edge: Edge, edge_group: EdgeGroup) -
target_executor = self._executors[edge.target_id]

# Get output types from source executor
# First try send_message output types, then fall back to yield_output types
# This supports workflow composition where terminal executors (using yield_output)
# may be connected to downstream executors via add_workflow()
source_output_types = list(source_executor.output_types)
if not source_output_types:
source_output_types = list(source_executor.workflow_output_types)

# Get input types from target executor
target_input_types = target_executor.input_types
Expand Down
Loading
Loading