Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions python/packages/core/agent_framework/_workflows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@
RunnerContext,
)
from ._sequential import SequentialBuilder
from ._shared_state import SharedState
from ._validation import (
EdgeDuplicationError,
GraphConnectivityError,
Expand Down Expand Up @@ -179,7 +178,6 @@
"Runner",
"RunnerContext",
"SequentialBuilder",
"SharedState",
"SingleEdgeGroup",
"StandardMagenticManager",
"SubWorkflowRequestMessage",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class GroupChatResponseMessage:


TerminationCondition: TypeAlias = Callable[[list[ChatMessage]], bool | Awaitable[bool]]
GroupChatWorkflowContext_T_Out: TypeAlias = AgentExecutorRequest | GroupChatRequestMessage | GroupChatParticipantMessage
GroupChatWorkflowContextOutT: TypeAlias = AgentExecutorRequest | GroupChatRequestMessage | GroupChatParticipantMessage


# region Group chat events
Expand Down Expand Up @@ -201,7 +201,7 @@ def __init__(
async def handle_str(
self,
task: str,
ctx: WorkflowContext[GroupChatWorkflowContext_T_Out, list[ChatMessage]],
ctx: WorkflowContext[GroupChatWorkflowContextOutT, list[ChatMessage]],
) -> None:
"""Handler for string input as workflow entry point.

Expand All @@ -220,7 +220,7 @@ async def handle_str(
async def handle_message(
self,
task: ChatMessage,
ctx: WorkflowContext[GroupChatWorkflowContext_T_Out, list[ChatMessage]],
ctx: WorkflowContext[GroupChatWorkflowContextOutT, list[ChatMessage]],
) -> None:
"""Handler for single ChatMessage input as workflow entry point.

Expand All @@ -239,7 +239,7 @@ async def handle_message(
async def handle_messages(
self,
task: list[ChatMessage],
ctx: WorkflowContext[GroupChatWorkflowContext_T_Out, list[ChatMessage]],
ctx: WorkflowContext[GroupChatWorkflowContextOutT, list[ChatMessage]],
) -> None:
"""Handler for list of ChatMessages as workflow entry point.

Expand All @@ -262,7 +262,7 @@ async def handle_messages(
async def handle_participant_response(
self,
response: AgentExecutorResponse | GroupChatResponseMessage,
ctx: WorkflowContext[GroupChatWorkflowContext_T_Out, list[ChatMessage]],
ctx: WorkflowContext[GroupChatWorkflowContextOutT, list[ChatMessage]],
) -> None:
"""Handler for participant responses.

Expand All @@ -288,7 +288,7 @@ async def handle_participant_response(
async def _handle_messages(
self,
messages: list[ChatMessage],
ctx: WorkflowContext[GroupChatWorkflowContext_T_Out, list[ChatMessage]],
ctx: WorkflowContext[GroupChatWorkflowContextOutT, list[ChatMessage]],
) -> None:
"""Handle task messages from users as workflow entry point.

Expand All @@ -303,7 +303,7 @@ async def _handle_messages(
async def _handle_response(
self,
response: AgentExecutorResponse | GroupChatResponseMessage,
ctx: WorkflowContext[GroupChatWorkflowContext_T_Out, list[ChatMessage]],
ctx: WorkflowContext[GroupChatWorkflowContextOutT, list[ChatMessage]],
) -> None:
"""Handle a participant response.

Expand Down
7 changes: 4 additions & 3 deletions python/packages/core/agent_framework/_workflows/_edge.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
# Conditions receive the message data and return bool (sync or async).
EdgeCondition: TypeAlias = Callable[[Any], bool | Awaitable[bool]]

# TypeVar for EdgeGroup subclasses used in class methods
EdgeGroupT = TypeVar("EdgeGroupT", bound="EdgeGroup")


def _extract_function_name(func: Callable[..., Any]) -> str:
"""Map a Python callable to a concise, human-focused identifier.
Expand Down Expand Up @@ -308,8 +311,6 @@ class EdgeGroup(DictConvertible):

from builtins import type as builtin_type

_T_EdgeGroup = TypeVar("_T_EdgeGroup", bound="EdgeGroup")

_TYPE_REGISTRY: ClassVar[dict[str, builtin_type["EdgeGroup"]]] = {}

def __init__(
Expand Down Expand Up @@ -392,7 +393,7 @@ def to_dict(self) -> dict[str, Any]:
}

@classmethod
def register(cls, subclass: builtin_type[_T_EdgeGroup]) -> builtin_type[_T_EdgeGroup]:
def register(cls, subclass: builtin_type[EdgeGroupT]) -> builtin_type[EdgeGroupT]:
"""Register a subclass so deserialisation can recover the right type.

Registration is typically performed via the decorator syntax applied to
Expand Down
8 changes: 4 additions & 4 deletions python/packages/core/agent_framework/_workflows/_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ class LoggingExecutor(Executor):
async def log_message(self, msg: str, ctx: WorkflowContext) -> None:
print(f"Received: {msg}") # Only logging, no outputs

### WorkflowContext[T_Out]
Enables sending messages of type T_Out via `ctx.send_message()`:
### WorkflowContext[OutT]
Enables sending messages of type OutT via `ctx.send_message()`:

.. code-block:: python

Expand All @@ -126,8 +126,8 @@ class ProcessorExecutor(Executor):
async def handler(self, msg: str, ctx: WorkflowContext[int]) -> None:
await ctx.send_message(42) # Can send int messages

### WorkflowContext[T_Out, T_W_Out]
Enables both sending messages (T_Out) and yielding workflow outputs (T_W_Out):
### WorkflowContext[OutT, W_OutT]
Enables both sending messages (OutT) and yielding workflow outputs (W_OutT):

.. code-block:: python

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ def __init__(
output: Optional explicit output type(s) that can be sent via ``ctx.send_message()``.
Supports union types (e.g., ``str | int``) and string forward references.
When provided, takes precedence over introspection from the ``WorkflowContext``
first generic parameter (T_Out).
first generic parameter (OutT).
workflow_output: Optional explicit output type(s) that can be yielded via
``ctx.yield_output()``. Supports union types (e.g., ``str | int``) and string
forward references. When provided, takes precedence over introspection from the
``WorkflowContext`` second generic parameter (T_W_Out).
``WorkflowContext`` second generic parameter (W_OutT).

Raises:
ValueError: If func is a staticmethod or classmethod (use @handler on instance methods instead)
Expand Down Expand Up @@ -262,11 +262,11 @@ async def process(self, data: str, ctx: WorkflowContext[str]):
output: Optional explicit output type(s) that can be sent via ``ctx.send_message()``.
Supports union types (e.g., ``str | int``) and string forward references.
When provided, takes precedence over introspection from the ``WorkflowContext``
first generic parameter (T_Out).
first generic parameter (OutT).
workflow_output: Optional explicit output type(s) that can be yielded via
``ctx.yield_output()``. Supports union types (e.g., ``str | int``) and string
forward references. When provided, takes precedence over introspection from the
``WorkflowContext`` second generic parameter (T_W_Out).
``WorkflowContext`` second generic parameter (W_OutT).

Returns:
A FunctionExecutor instance that can be wired into a Workflow.
Expand Down
10 changes: 5 additions & 5 deletions python/packages/core/agent_framework/_workflows/_group_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
GroupChatParticipantMessage,
GroupChatRequestMessage,
GroupChatResponseMessage,
GroupChatWorkflowContext_T_Out,
GroupChatWorkflowContextOutT,
ParticipantRegistry,
TerminationCondition,
)
Expand Down Expand Up @@ -163,7 +163,7 @@ async def round_robin_selector(state: GroupChatState) -> str:
async def _handle_messages(
self,
messages: list[ChatMessage],
ctx: WorkflowContext[GroupChatWorkflowContext_T_Out, list[ChatMessage]],
ctx: WorkflowContext[GroupChatWorkflowContextOutT, list[ChatMessage]],
) -> None:
"""Initialize orchestrator state and start the conversation loop."""
self._append_messages(messages)
Expand All @@ -189,7 +189,7 @@ async def _handle_messages(
async def _handle_response(
self,
response: AgentExecutorResponse | GroupChatResponseMessage,
ctx: WorkflowContext[GroupChatWorkflowContext_T_Out, list[ChatMessage]],
ctx: WorkflowContext[GroupChatWorkflowContextOutT, list[ChatMessage]],
) -> None:
"""Handle a participant response."""
messages = self._process_participant_response(response)
Expand Down Expand Up @@ -324,7 +324,7 @@ def _append_messages(self, messages: Sequence[ChatMessage]) -> None:
async def _handle_messages(
self,
messages: list[ChatMessage],
ctx: WorkflowContext[GroupChatWorkflowContext_T_Out, list[ChatMessage]],
ctx: WorkflowContext[GroupChatWorkflowContextOutT, list[ChatMessage]],
) -> None:
"""Initialize orchestrator state and start the conversation loop."""
self._append_messages(messages)
Expand Down Expand Up @@ -356,7 +356,7 @@ async def _handle_messages(
async def _handle_response(
self,
response: AgentExecutorResponse | GroupChatResponseMessage,
ctx: WorkflowContext[GroupChatWorkflowContext_T_Out, list[ChatMessage]],
ctx: WorkflowContext[GroupChatWorkflowContextOutT, list[ChatMessage]],
) -> None:
"""Handle a participant response."""
messages = self._process_participant_response(response)
Expand Down
16 changes: 8 additions & 8 deletions python/packages/core/agent_framework/_workflows/_magentic.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
GroupChatParticipantMessage,
GroupChatRequestMessage,
GroupChatResponseMessage,
GroupChatWorkflowContext_T_Out,
GroupChatWorkflowContextOutT,
ParticipantRegistry,
)
from ._checkpoint import CheckpointStorage
Expand Down Expand Up @@ -903,7 +903,7 @@ def __init__(
async def _handle_messages(
self,
messages: list[ChatMessage],
ctx: WorkflowContext[GroupChatWorkflowContext_T_Out, list[ChatMessage]],
ctx: WorkflowContext[GroupChatWorkflowContextOutT, list[ChatMessage]],
) -> None:
"""Handle the initial task messages to start the workflow."""
if self._terminated:
Expand Down Expand Up @@ -954,7 +954,7 @@ async def _handle_messages(
async def _handle_response(
self,
response: AgentExecutorResponse | GroupChatResponseMessage,
ctx: WorkflowContext[GroupChatWorkflowContext_T_Out, list[ChatMessage]],
ctx: WorkflowContext[GroupChatWorkflowContextOutT, list[ChatMessage]],
) -> None:
"""Handle a response message from a participant."""
if self._magentic_context is None or self._task_ledger is None:
Expand All @@ -980,7 +980,7 @@ async def handle_plan_review_response(
self,
original_request: MagenticPlanReviewRequest,
response: MagenticPlanReviewResponse,
ctx: WorkflowContext[GroupChatWorkflowContext_T_Out, list[ChatMessage]],
ctx: WorkflowContext[GroupChatWorkflowContextOutT, list[ChatMessage]],
) -> None:
"""Handle the human response to the plan review request.

Expand Down Expand Up @@ -1038,7 +1038,7 @@ async def _send_plan_review_request(self, ctx: WorkflowContext, is_stalled: bool

async def _run_inner_loop(
self,
ctx: WorkflowContext[GroupChatWorkflowContext_T_Out, list[ChatMessage]],
ctx: WorkflowContext[GroupChatWorkflowContextOutT, list[ChatMessage]],
) -> None:
"""Run the inner orchestration loop. Coordination phase. Serialized with a lock."""
if self._magentic_context is None or self._task_ledger is None:
Expand All @@ -1048,7 +1048,7 @@ async def _run_inner_loop(

async def _run_inner_loop_helper(
self,
ctx: WorkflowContext[GroupChatWorkflowContext_T_Out, list[ChatMessage]],
ctx: WorkflowContext[GroupChatWorkflowContextOutT, list[ChatMessage]],
) -> None:
"""Run inner loop with exclusive access."""
# Narrow optional context for the remainder of this method
Expand Down Expand Up @@ -1134,7 +1134,7 @@ async def _run_inner_loop_helper(

async def _reset_and_replan(
self,
ctx: WorkflowContext[GroupChatWorkflowContext_T_Out, list[ChatMessage]],
ctx: WorkflowContext[GroupChatWorkflowContextOutT, list[ChatMessage]],
) -> None:
"""Reset context and replan."""
if self._magentic_context is None:
Expand Down Expand Up @@ -1169,7 +1169,7 @@ async def _reset_and_replan(

async def _run_outer_loop(
self,
ctx: WorkflowContext[GroupChatWorkflowContext_T_Out, list[ChatMessage]],
ctx: WorkflowContext[GroupChatWorkflowContextOutT, list[ChatMessage]],
) -> None:
"""Run the outer orchestration loop - planning phase."""
if self._magentic_context is None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
if TYPE_CHECKING:
from ._executor import Executor

T_Out = TypeVar("T_Out", default=Never)
T_W_Out = TypeVar("T_W_Out", default=Never)
OutT = TypeVar("OutT", default=Never)
W_OutT = TypeVar("W_OutT", default=Never)


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -67,15 +67,15 @@ def infer_output_types_from_ctx_annotation(
if origin is None:
return [], []

# Expecting WorkflowContext[T_Out, T_W_Out]
# Expecting WorkflowContext[OutT, W_OutT]
if origin is not WorkflowContext:
return [], []

args = list(get_args(ctx_annotation))
if not args:
return [], []

# WorkflowContext[T_Out] -> message_types from T_Out, no workflow output types
# WorkflowContext[OutT] -> message_types from OutT, no workflow output types
if len(args) == 1:
t = args[0]
t_origin = get_origin(t)
Expand All @@ -90,10 +90,10 @@ def infer_output_types_from_ctx_annotation(
return [], []
return [t], []

# WorkflowContext[T_Out, T_W_Out] -> message_types from T_Out, workflow_output_types from T_W_Out
# WorkflowContext[OutT, W_OutT] -> message_types from OutT, workflow_output_types from W_OutT
t_out, t_w_out = args[:2] # Take first two args in case there are more

# Process T_Out for message_types
# Process OutT for message_types
message_types: list[type[Any] | UnionType] = []
t_out_origin = get_origin(t_out)
if t_out is Any:
Expand All @@ -104,7 +104,7 @@ def infer_output_types_from_ctx_annotation(
else:
message_types = [t_out]

# Process T_W_Out for workflow_output_types
# Process W_OutT for workflow_output_types
workflow_output_types: list[type[Any] | UnionType] = []
t_w_out_origin = get_origin(t_w_out)
if t_w_out is Any:
Expand Down Expand Up @@ -176,7 +176,7 @@ def _is_type_like(x: Any) -> bool:
return isinstance(x, type) or get_origin(x) is not None or x is Never

for i, type_arg in enumerate(type_args):
param_description = "T_Out" if i == 0 else "T_W_Out"
param_description = "OutT" if i == 0 else "W_OutT"

# Allow Any explicitly
if type_arg is Any:
Expand Down Expand Up @@ -216,7 +216,7 @@ def _is_type_like(x: Any) -> bool:
)


class WorkflowContext(Generic[T_Out, T_W_Out]):
class WorkflowContext(Generic[OutT, W_OutT]):
"""Execution context that enables executors to interact with workflows and other executors.

## Overview
Expand All @@ -235,17 +235,17 @@ class WorkflowContext(Generic[T_Out, T_W_Out]):
async def log_handler(message: str, ctx: WorkflowContext) -> None:
print(f"Received: {message}") # Only side effects

### WorkflowContext[T_Out]
Enables sending messages of type T_Out to other executors:
### WorkflowContext[OutT]
Enables sending messages of type OutT to other executors:

.. code-block:: python

async def processor(message: str, ctx: WorkflowContext[int]) -> None:
result = len(message)
await ctx.send_message(result) # Send int to downstream executors

### WorkflowContext[T_Out, T_W_Out]
Enables both sending messages (T_Out) and yielding workflow outputs (T_W_Out):
### WorkflowContext[OutT, W_OutT]
Enables both sending messages (OutT) and yielding workflow outputs (W_OutT):

.. code-block:: python

Expand Down Expand Up @@ -317,7 +317,7 @@ def request_id(self) -> str | None:
"""
return self._request_id

async def send_message(self, message: T_Out, target_id: str | None = None) -> None:
async def send_message(self, message: OutT, target_id: str | None = None) -> None:
"""Send a message to the workflow context.

Args:
Expand Down Expand Up @@ -349,7 +349,7 @@ async def send_message(self, message: T_Out, target_id: str | None = None) -> No

await self._runner_context.send_message(msg)

async def yield_output(self, output: T_W_Out) -> None:
async def yield_output(self, output: W_OutT) -> None:
"""Set the output of the workflow.

Args:
Expand Down
Loading