Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
393 changes: 393 additions & 0 deletions docs/decisions/00XX-workflow_composability_design.md

Large diffs are not rendered by default.

35 changes: 34 additions & 1 deletion python/packages/core/agent_framework/_workflows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,22 @@
)
from ._sequential import SequentialBuilder
from ._shared_state import SharedState
from ._type_adapters import (
ConversationToText,
FunctionAdapter,
IdentityAdapter,
ItemToList,
ListToItem,
MessageWrapper,
SingleMessageExtractor,
TextToConversation,
TypeAdapter,
dict_to_struct_adapter,
find_adapter,
json_deserializer,
json_serializer,
struct_to_dict_adapter,
)
from ._validation import (
EdgeDuplicationError,
GraphConnectivityError,
Expand All @@ -100,7 +116,7 @@
)
from ._viz import WorkflowViz
from ._workflow import Workflow, WorkflowRunResult
from ._workflow_builder import WorkflowBuilder
from ._workflow_builder import ConnectionHandle, MergeResult, WorkflowBuilder, WorkflowConnection
from ._workflow_context import WorkflowContext
from ._workflow_executor import SubWorkflowRequestMessage, SubWorkflowResponseMessage, WorkflowExecutor

Expand All @@ -122,6 +138,8 @@
"Case",
"CheckpointStorage",
"ConcurrentBuilder",
"ConnectionHandle",
"ConversationToText",
"Default",
"Edge",
"EdgeDuplicationError",
Expand All @@ -133,15 +151,19 @@
"FanInEdgeGroup",
"FanOutEdgeGroup",
"FileCheckpointStorage",
"FunctionAdapter",
"FunctionExecutor",
"GraphConnectivityError",
"GroupChatBuilder",
"GroupChatDirective",
"GroupChatStateSnapshot",
"HandoffBuilder",
"HandoffUserInputRequest",
"IdentityAdapter",
"InMemoryCheckpointStorage",
"InProcRunnerContext",
"ItemToList",
"ListToItem",
"MagenticBuilder",
"MagenticContext",
"MagenticManagerBase",
Expand All @@ -151,14 +173,17 @@
"ManagerDirectiveModel",
"ManagerSelectionRequest",
"ManagerSelectionResponse",
"MergeResult",
"Message",
"MessageWrapper",
"OrchestrationState",
"RequestInfoEvent",
"Runner",
"RunnerContext",
"SequentialBuilder",
"SharedState",
"SingleEdgeGroup",
"SingleMessageExtractor",
"StandardMagenticManager",
"SubWorkflowRequestMessage",
"SubWorkflowResponseMessage",
Expand All @@ -167,13 +192,16 @@
"SwitchCaseEdgeGroup",
"SwitchCaseEdgeGroupCase",
"SwitchCaseEdgeGroupDefault",
"TextToConversation",
"TypeAdapter",
"TypeCompatibilityError",
"ValidationTypeEnum",
"Workflow",
"WorkflowAgent",
"WorkflowBuilder",
"WorkflowCheckpoint",
"WorkflowCheckpointSummary",
"WorkflowConnection",
"WorkflowContext",
"WorkflowErrorDetails",
"WorkflowEvent",
Expand All @@ -189,9 +217,14 @@
"WorkflowValidationError",
"WorkflowViz",
"create_edge_runner",
"dict_to_struct_adapter",
"executor",
"find_adapter",
"get_checkpoint_summary",
"handler",
"json_deserializer",
"json_serializer",
"response_handler",
"struct_to_dict_adapter",
"validate_workflow_graph",
]
14 changes: 12 additions & 2 deletions python/packages/core/agent_framework/_workflows/_concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from ._executor import Executor, handler
from ._message_utils import normalize_messages_input
from ._workflow import Workflow
from ._workflow_builder import WorkflowBuilder
from ._workflow_builder import WorkflowBuilder, WorkflowConnection
from ._workflow_context import WorkflowContext

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -296,6 +296,11 @@ def with_checkpointing(self, checkpoint_storage: CheckpointStorage) -> "Concurre
self._checkpoint_storage = checkpoint_storage
return self

def as_connection(self, prefix: str | None = None) -> WorkflowConnection:
"""Expose the concurrent wiring as a reusable connection."""
builder = self._build_workflow_builder()
return builder.as_connection(prefix=prefix)

def build(self) -> Workflow:
r"""Build and validate the concurrent workflow.

Expand All @@ -318,6 +323,11 @@ def build(self) -> Workflow:

workflow = ConcurrentBuilder().participants([agent1, agent2]).build()
"""
builder = self._build_workflow_builder()
return builder.build()

def _build_workflow_builder(self) -> WorkflowBuilder:
"""Internal helper to construct the workflow builder for this concurrent workflow."""
if not self._participants:
raise ValueError("No participants provided. Call .participants([...]) first.")

Expand All @@ -332,4 +342,4 @@ def build(self) -> Workflow:
if self._checkpoint_storage is not None:
builder = builder.with_checkpointing(self._checkpoint_storage)

return builder.build()
return builder
20 changes: 16 additions & 4 deletions python/packages/core/agent_framework/_workflows/_group_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from ._executor import Executor, handler
from ._participant_utils import GroupChatParticipantSpec, prepare_participant_metadata, wrap_participant
from ._workflow import Workflow
from ._workflow_builder import WorkflowBuilder
from ._workflow_builder import WorkflowBuilder, WorkflowConnection
from ._workflow_context import WorkflowContext

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -1693,6 +1693,11 @@ def _build_participant_specs(self) -> dict[str, GroupChatParticipantSpec]:
)
return specs

def as_connection(self, prefix: str | None = None) -> WorkflowConnection:
"""Expose the group chat wiring as a reusable connection."""
builder = self._build_workflow_builder()
return builder.as_connection(prefix=prefix)

def build(self) -> Workflow:
"""Build and validate the group chat workflow.

Expand Down Expand Up @@ -1727,6 +1732,11 @@ def build(self) -> Workflow:
async for message in workflow.run("Solve this problem collaboratively"):
print(message.text)
"""
builder = self._build_workflow_builder()
return builder.build()

def _build_workflow_builder(self) -> WorkflowBuilder:
"""Internal helper to construct the workflow builder for this group chat workflow."""
# Manager is only required when using the default orchestrator factory
# Custom factories (e.g., MagenticBuilder) provide their own orchestrator with embedded manager
if (
Expand Down Expand Up @@ -1760,10 +1770,12 @@ def build(self) -> Workflow:
orchestrator_factory=self._orchestrator_factory,
interceptors=self._interceptors,
checkpoint_storage=self._checkpoint_storage,
return_builder=True,
)
if not isinstance(result, Workflow):
raise TypeError("Expected Workflow from assemble_group_chat_workflow")
return result
if not (isinstance(result, tuple) and len(result) == 2):
raise TypeError("Expected (WorkflowBuilder, orchestrator) from assemble_group_chat_workflow")
builder, _ = result
return builder


# endregion
Expand Down
17 changes: 12 additions & 5 deletions python/packages/core/agent_framework/_workflows/_handoff.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
from ._participant_utils import GroupChatParticipantSpec, prepare_participant_metadata, sanitize_identifier
from ._request_info_mixin import response_handler
from ._workflow import Workflow
from ._workflow_builder import WorkflowBuilder
from ._workflow_builder import WorkflowBuilder, WorkflowConnection
from ._workflow_context import WorkflowContext

if sys.version_info >= (3, 12):
Expand Down Expand Up @@ -1307,6 +1307,11 @@ def enable_return_to_previous(self, enabled: bool = True) -> "HandoffBuilder":
self._return_to_previous = enabled
return self

def as_connection(self, prefix: str | None = None) -> WorkflowConnection:
"""Expose the handoff wiring as a reusable connection."""
builder = self._build_workflow_builder()
return builder.as_connection(prefix=prefix)

def build(self) -> Workflow:
"""Construct the final Workflow instance from the configured builder.

Expand Down Expand Up @@ -1362,6 +1367,11 @@ def build(self) -> Workflow:
After calling build(), the builder instance should not be reused. Create a
new builder if you need to construct another workflow with different configuration.
"""
builder = self._build_workflow_builder()
return builder.build()

def _build_workflow_builder(self) -> WorkflowBuilder:
"""Internal helper to construct the workflow builder for this handoff workflow."""
if not self._executors:
raise ValueError("No participants provided. Call participants([...]) first.")
if self._starting_agent_id is None:
Expand Down Expand Up @@ -1448,7 +1458,6 @@ def _handoff_orchestrator_factory(_: _GroupChatConfig) -> Executor:
participant_aliases=self._aliases,
participant_executors=self._executors,
)

result = assemble_group_chat_workflow(
wiring=wiring,
participant_factory=_default_participant_factory,
Expand All @@ -1464,9 +1473,7 @@ def _handoff_orchestrator_factory(_: _GroupChatConfig) -> Executor:

builder = builder.add_edge(input_node, starting_executor)
builder = builder.add_edge(coordinator, user_gateway)
builder = builder.add_edge(user_gateway, coordinator)

return builder.build()
return builder.add_edge(user_gateway, coordinator)

def _resolve_to_id(self, candidate: str | AgentProtocol | Executor) -> str:
"""Resolve a participant reference into a concrete executor identifier."""
Expand Down
13 changes: 12 additions & 1 deletion python/packages/core/agent_framework/_workflows/_magentic.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from ._participant_utils import GroupChatParticipantSpec, participant_description
from ._request_info_mixin import response_handler
from ._workflow import Workflow, WorkflowRunResult
from ._workflow_builder import WorkflowConnection
from ._workflow_context import WorkflowContext

if sys.version_info >= (3, 11):
Expand Down Expand Up @@ -2088,6 +2089,16 @@ async def plan(self, context: MagenticContext) -> ChatMessage:

def build(self) -> Workflow:
"""Build a Magentic workflow with the orchestrator and all agent executors."""
group_builder = self._build_group_chat_builder()
return group_builder.build()

def as_connection(self, prefix: str | None = None) -> WorkflowConnection:
"""Expose the Magentic wiring as a reusable connection."""
group_builder = self._build_group_chat_builder()
return group_builder.as_connection(prefix=prefix)

def _build_group_chat_builder(self) -> GroupChatBuilder:
"""Internal helper to construct the underlying group chat builder."""
if not self._participants:
raise ValueError("No participants added to Magentic workflow")

Expand Down Expand Up @@ -2135,7 +2146,7 @@ def _participant_factory(
if self._checkpoint_storage is not None:
group_builder = group_builder.with_checkpointing(self._checkpoint_storage)

return group_builder.build()
return group_builder

def start_with_string(self, task: str) -> "MagenticWorkflow":
"""Build a Magentic workflow and return a wrapper with convenience methods for string tasks.
Expand Down
14 changes: 12 additions & 2 deletions python/packages/core/agent_framework/_workflows/_sequential.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
)
from ._message_utils import normalize_messages_input
from ._workflow import Workflow
from ._workflow_builder import WorkflowBuilder
from ._workflow_builder import WorkflowBuilder, WorkflowConnection
from ._workflow_context import WorkflowContext

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -157,6 +157,11 @@ def with_checkpointing(self, checkpoint_storage: CheckpointStorage) -> "Sequenti
self._checkpoint_storage = checkpoint_storage
return self

def as_connection(self, prefix: str | None = None) -> WorkflowConnection:
"""Expose the sequential wiring as a reusable connection."""
builder = self._build_workflow_builder()
return builder.as_connection(prefix=prefix)

def build(self) -> Workflow:
"""Build and validate the sequential workflow.

Expand All @@ -168,6 +173,11 @@ def build(self) -> Workflow:
- Else (custom Executor): pass conversation directly to the executor
- _EndWithConversation yields the final conversation and the workflow becomes idle
"""
builder = self._build_workflow_builder()
return builder.build()

def _build_workflow_builder(self) -> WorkflowBuilder:
"""Internal helper to construct the workflow builder for this sequential workflow."""
if not self._participants:
raise ValueError("No participants provided. Call .participants([...]) first.")

Expand Down Expand Up @@ -205,4 +215,4 @@ def build(self) -> Workflow:
if self._checkpoint_storage is not None:
builder = builder.with_checkpointing(self._checkpoint_storage)

return builder.build()
return builder
Loading