Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions python/packages/core/agent_framework/_workflows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
MagenticStallInterventionRequest,
StandardMagenticManager,
)
from ._orchestration_request_info import AgentInputRequest, AgentResponseReviewRequest, RequestInfoInterceptor
from ._orchestration_state import OrchestrationState
from ._request_info_mixin import response_handler
from ._runner import Runner
Expand Down Expand Up @@ -122,6 +123,8 @@
"AgentExecutor",
"AgentExecutorRequest",
"AgentExecutorResponse",
"AgentInputRequest",
"AgentResponseReviewRequest",
"AgentRunEvent",
"AgentRunUpdateEvent",
"Case",
Expand Down Expand Up @@ -164,6 +167,7 @@
"Message",
"OrchestrationState",
"RequestInfoEvent",
"RequestInfoInterceptor",
"Runner",
"RunnerContext",
"SequentialBuilder",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ def __init__(self, executor_id: str) -> None:
self._max_rounds: int | None = None
self._termination_condition: Callable[[list[ChatMessage]], bool | Awaitable[bool]] | None = None

def register_participant_entry(self, name: str, *, entry_id: str, is_agent: bool) -> None:
def register_participant_entry(
self, name: str, *, entry_id: str, is_agent: bool, exit_id: str | None = None
) -> None:
"""Record routing details for a participant's entry executor.

This method provides a unified interface for registering participants
Expand All @@ -57,8 +59,10 @@ def register_participant_entry(self, name: str, *, entry_id: str, is_agent: bool
name: Participant name (used for selection and tracking)
entry_id: Executor ID for this participant's entry point
is_agent: Whether this is an AgentExecutor (True) or custom Executor (False)
exit_id: Executor ID for this participant's exit point (where responses come from).
If None, defaults to entry_id.
"""
self._registry.register(name, entry_id=entry_id, is_agent=is_agent)
self._registry.register(name, entry_id=entry_id, is_agent=is_agent, exit_id=exit_id)

# Conversation state management (shared across all patterns)

Expand Down
40 changes: 37 additions & 3 deletions python/packages/core/agent_framework/_workflows/_concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from ._checkpoint import CheckpointStorage
from ._executor import Executor, handler
from ._message_utils import normalize_messages_input
from ._orchestration_request_info import RequestInfoInterceptor
from ._workflow import Workflow
from ._workflow_builder import WorkflowBuilder
from ._workflow_context import WorkflowContext
Expand Down Expand Up @@ -209,15 +210,18 @@ def summarize(results):

workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).with_custom_aggregator(summarize).build()


# Enable checkpoint persistence so runs can resume
workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).with_checkpointing(storage).build()

# Enable request info before aggregation
workflow = ConcurrentBuilder().participants([agent1, agent2]).with_request_info().build()
"""

def __init__(self) -> None:
self._participants: list[AgentProtocol | Executor] = []
self._aggregator: Executor | None = None
self._checkpoint_storage: CheckpointStorage | None = None
self._request_info_enabled: bool = False

def participants(self, participants: Sequence[AgentProtocol | Executor]) -> "ConcurrentBuilder":
r"""Define the parallel participants for this concurrent workflow.
Expand Down Expand Up @@ -296,12 +300,33 @@ def with_checkpointing(self, checkpoint_storage: CheckpointStorage) -> "Concurre
self._checkpoint_storage = checkpoint_storage
return self

def with_request_info(self) -> "ConcurrentBuilder":
"""Enable request info before aggregation in the workflow.

When enabled, the workflow pauses after all parallel agents complete,
emitting a RequestInfoEvent that allows the caller to review and optionally
modify the combined results before aggregation. The caller provides feedback
via the standard response_handler/request_info pattern.

Note:
Unlike SequentialBuilder and GroupChatBuilder, ConcurrentBuilder does not
support per-agent filtering since all agents run in parallel and results
are collected together. The pause occurs once with all agent outputs received.

Returns:
self: The builder instance for fluent chaining.
"""
self._request_info_enabled = True
return self

def build(self) -> Workflow:
r"""Build and validate the concurrent workflow.

Wiring pattern:
- Dispatcher (internal) fans out the input to all `participants`
- Fan-in aggregator collects `AgentExecutorResponse` objects
- Fan-in collects `AgentExecutorResponse` objects from all participants
- If request info is enabled, the orchestration emits a request info event with outputs from all participants
before sending the outputs to the aggregator
- Aggregator yields output and the workflow becomes idle. The output is either:
- list[ChatMessage] (default aggregator: one user + one assistant per agent)
- custom payload from the provided callback/executor
Expand All @@ -327,7 +352,16 @@ def build(self) -> Workflow:
builder = WorkflowBuilder()
builder.set_start_executor(dispatcher)
builder.add_fan_out_edges(dispatcher, list(self._participants))
builder.add_fan_in_edges(list(self._participants), aggregator)

if self._request_info_enabled:
# Insert interceptor between fan-in and aggregator
# participants -> fan-in -> interceptor -> aggregator
request_info_interceptor = RequestInfoInterceptor(executor_id="request_info")
builder.add_fan_in_edges(list(self._participants), request_info_interceptor)
builder.add_edge(request_info_interceptor, aggregator)
else:
# Direct fan-in to aggregator
builder.add_fan_in_edges(list(self._participants), aggregator)

if self._checkpoint_storage is not None:
builder = builder.with_checkpointing(self._checkpoint_storage)
Expand Down
Loading
Loading