Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
ed3cbf4
Further support for declarative python workflows
moonbox3 Dec 11, 2025
b42e2e9
Add tests. Clean up for typing and formatting
moonbox3 Dec 12, 2025
620b603
Improvements and cleanup
moonbox3 Dec 12, 2025
21c1a42
Merge main
moonbox3 Dec 12, 2025
6b8836c
Typing cleanup. Improve docstrings
moonbox3 Dec 12, 2025
2c463ae
Proper code in docstrings
moonbox3 Dec 12, 2025
8b86dc9
Fix malformed code-block directive in docstring
moonbox3 Dec 12, 2025
3df53e9
Merge main
moonbox3 Dec 12, 2025
941a2be
Remove dead links
moonbox3 Dec 12, 2025
64e4978
PR feedback
moonbox3 Dec 15, 2025
906253c
Address PR feedback
moonbox3 Dec 16, 2025
f5bf267
Address PR feedback
moonbox3 Dec 17, 2025
9b5c80d
Remove sl
moonbox3 Dec 17, 2025
afe12e2
Merge main
moonbox3 Dec 17, 2025
7505b16
Update devui frontend
moonbox3 Dec 17, 2025
e6de1e0
More cleanup
moonbox3 Dec 19, 2025
49ed21d
Merge main
moonbox3 Dec 19, 2025
ff15e9e
Fix uv lock
moonbox3 Dec 19, 2025
d4ba6bd
Skip Py 3.14 tests as powerfx doesn't support it
moonbox3 Dec 19, 2025
7a33a12
Fix mypy error
moonbox3 Dec 19, 2025
03591db
Merge main to branch
moonbox3 Jan 9, 2026
5d4ab0f
Fix for tool calls
moonbox3 Jan 9, 2026
daf1eeb
Updates
moonbox3 Jan 9, 2026
b0cc516
Removed stale docstring
moonbox3 Jan 9, 2026
32b1f9f
Fix lint
moonbox3 Jan 9, 2026
6d9f181
Merge main to branch
moonbox3 Jan 12, 2026
8818037
Standardize on .NET namespaces. Revert DevUI changes (bring in later)
moonbox3 Jan 13, 2026
2dba3a4
Implement remaining items for Python declarative support to match dotnet
moonbox3 Jan 13, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -226,3 +226,4 @@ local.settings.json

# Database files
*.db
python/dotnet-ref
3 changes: 0 additions & 3 deletions python/.vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
},
"python.analysis.autoFormatStrings": true,
"python.analysis.importFormat": "relative",
"python.analysis.exclude": [
"samples/semantic-kernel-migration"
],
"python.analysis.packageIndexDepths": [
{
"name": "agent_framework",
Expand Down
2 changes: 2 additions & 0 deletions python/packages/core/agent_framework/_workflows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
Case,
Default,
Edge,
EdgeCondition,
FanInEdgeGroup,
FanOutEdgeGroup,
SingleEdgeGroup,
Expand Down Expand Up @@ -132,6 +133,7 @@
"ConcurrentBuilder",
"Default",
"Edge",
"EdgeCondition",
"EdgeDuplicationError",
"Executor",
"ExecutorCompletedEvent",
Expand Down
73 changes: 54 additions & 19 deletions python/packages/core/agent_framework/_workflows/_edge.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
# Copyright (c) Microsoft. All rights reserved.

import inspect
import logging
import uuid
from collections.abc import Callable, Sequence
from collections.abc import Awaitable, Callable, Sequence
from dataclasses import dataclass, field
from typing import Any, ClassVar
from typing import Any, ClassVar, TypeAlias, TypeVar

from ._const import INTERNAL_SOURCE_ID
from ._executor import Executor
from ._model_utils import DictConvertible, encode_value

logger = logging.getLogger(__name__)

# Type alias for edge condition functions.
# Conditions receive the message data and return bool (sync or async).
EdgeCondition: TypeAlias = Callable[[Any], bool | Awaitable[bool]]


def _extract_function_name(func: Callable[..., Any]) -> str:
"""Map a Python callable to a concise, human-focused identifier.
Expand Down Expand Up @@ -71,26 +76,27 @@ class Edge(DictConvertible):
serialising the edge down to primitives we can reconstruct the topology of
a workflow irrespective of the original Python process.

Edge conditions receive the message data and return a boolean (sync or async).

Examples:
.. code-block:: python

edge = Edge(source_id="ingest", target_id="score", condition=lambda payload: payload["ready"])
assert edge.should_route({"ready": True}) is True
assert edge.should_route({"ready": False}) is False
edge = Edge(source_id="ingest", target_id="score", condition=lambda data: data["ready"])
assert await edge.should_route({"ready": True}) is True
"""

ID_SEPARATOR: ClassVar[str] = "->"

source_id: str
target_id: str
condition_name: str | None
_condition: Callable[[Any], bool] | None = field(default=None, repr=False, compare=False)
_condition: EdgeCondition | None = field(default=None, repr=False, compare=False)

def __init__(
self,
source_id: str,
target_id: str,
condition: Callable[[Any], bool] | None = None,
condition: EdgeCondition | None = None,
*,
condition_name: str | None = None,
) -> None:
Expand All @@ -103,9 +109,9 @@ def __init__(
target_id:
Canonical identifier of the downstream executor instance.
condition:
Optional predicate that receives the message payload and returns
`True` when the edge should be traversed. When omitted, the edge is
considered unconditionally active.
Optional predicate that receives the message data and returns
`True` when the edge should be traversed. Can be sync or async.
When omitted, the edge is unconditionally active.
condition_name:
Optional override that pins a human-friendly name for the condition
when the callable cannot be introspected (for example after
Expand All @@ -125,7 +131,9 @@ def __init__(
self.source_id = source_id
self.target_id = target_id
self._condition = condition
self.condition_name = _extract_function_name(condition) if condition is not None else condition_name
self.condition_name = (
_extract_function_name(condition) if condition is not None and condition_name is None else condition_name
)

@property
def id(self) -> str:
Expand All @@ -144,25 +152,44 @@ def id(self) -> str:
"""
return f"{self.source_id}{self.ID_SEPARATOR}{self.target_id}"

def should_route(self, data: Any) -> bool:
"""Evaluate the edge predicate against an incoming payload.
@property
def has_condition(self) -> bool:
"""Check if this edge has a condition.

Returns True if the edge was configured with a condition function.
"""
return self._condition is not None

async def should_route(self, data: Any) -> bool:
"""Evaluate the edge predicate against payload.

When the edge was defined without an explicit predicate the method
returns `True`, signalling an unconditional routing rule. Otherwise the
user-supplied callable decides whether the message should proceed along
this edge. Any exception raised by the callable is deliberately allowed
to surface to the caller to avoid masking logic bugs.

The condition receives the message data and may be sync or async.

Args:
data: The message payload

Returns:
True if the edge should be traversed, False otherwise.

Examples:
.. code-block:: python

edge = Edge("stage1", "stage2", condition=lambda payload: payload["score"] > 0.8)
assert edge.should_route({"score": 0.9}) is True
assert edge.should_route({"score": 0.4}) is False
edge = Edge("stage1", "stage2", condition=lambda data: data["score"] > 0.8)
assert await edge.should_route({"score": 0.9}) is True
assert await edge.should_route({"score": 0.4}) is False
"""
if self._condition is None:
return True
return self._condition(data)
result = self._condition(data)
if inspect.isawaitable(result):
return bool(await result)
return bool(result)

def to_dict(self) -> dict[str, Any]:
"""Produce a JSON-serialisable view of the edge metadata.
Expand Down Expand Up @@ -281,6 +308,8 @@ class EdgeGroup(DictConvertible):

from builtins import type as builtin_type

_T_EdgeGroup = TypeVar("_T_EdgeGroup", bound="EdgeGroup")

_TYPE_REGISTRY: ClassVar[dict[str, builtin_type["EdgeGroup"]]] = {}

def __init__(
Expand Down Expand Up @@ -363,7 +392,7 @@ def to_dict(self) -> dict[str, Any]:
}

@classmethod
def register(cls, subclass: builtin_type["EdgeGroup"]) -> builtin_type["EdgeGroup"]:
def register(cls, subclass: builtin_type[_T_EdgeGroup]) -> builtin_type[_T_EdgeGroup]:
"""Register a subclass so deserialisation can recover the right type.

Registration is typically performed via the decorator syntax applied to
Expand Down Expand Up @@ -443,12 +472,18 @@ def __init__(
self,
source_id: str,
target_id: str,
condition: Callable[[Any], bool] | None = None,
condition: EdgeCondition | None = None,
*,
id: str | None = None,
) -> None:
"""Create a one-to-one edge group between two executors.

Args:
source_id: The source executor ID.
target_id: The target executor ID.
condition: Optional condition function `(data) -> bool | Awaitable[bool]`.
id: Optional explicit ID for the edge group.

Examples:
.. code-block:: python

Expand Down
18 changes: 12 additions & 6 deletions python/packages/core/agent_framework/_workflows/_edge_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ async def send_message(self, message: Message, shared_state: SharedState, ctx: R
return False

if self._can_handle(self._edge.target_id, message):
if self._edge.should_route(message.data):
route_result = await self._edge.should_route(message.data)

if route_result:
span.set_attributes({
OtelAttr.EDGE_GROUP_DELIVERED: True,
OtelAttr.EDGE_GROUP_DELIVERY_STATUS: EdgeGroupDeliveryStatus.DELIVERED.value,
Expand Down Expand Up @@ -162,8 +164,8 @@ def __init__(self, edge_group: FanOutEdgeGroup, executors: dict[str, Executor])

async def send_message(self, message: Message, shared_state: SharedState, ctx: RunnerContext) -> bool:
"""Send a message through all edges in the fan-out edge group."""
deliverable_edges = []
single_target_edge = None
deliverable_edges: list[Edge] = []
single_target_edge: Edge | None = None
# Process routing logic within span
with create_edge_group_processing_span(
self._edge_group.__class__.__name__,
Expand Down Expand Up @@ -192,7 +194,9 @@ async def send_message(self, message: Message, shared_state: SharedState, ctx: R
if message.target_id in selection_results:
edge = self._target_map.get(message.target_id)
if edge and self._can_handle(edge.target_id, message):
if edge.should_route(message.data):
route_result = await edge.should_route(message.data)

if route_result:
span.set_attributes({
OtelAttr.EDGE_GROUP_DELIVERED: True,
OtelAttr.EDGE_GROUP_DELIVERY_STATUS: EdgeGroupDeliveryStatus.DELIVERED.value,
Expand Down Expand Up @@ -223,8 +227,10 @@ async def send_message(self, message: Message, shared_state: SharedState, ctx: R
# If no target ID, send the message to the selected targets
for target_id in selection_results:
edge = self._target_map[target_id]
if self._can_handle(edge.target_id, message) and edge.should_route(message.data):
deliverable_edges.append(edge)
if self._can_handle(edge.target_id, message):
route_result = await edge.should_route(message.data)
if route_result:
deliverable_edges.append(edge)

if len(deliverable_edges) > 0:
span.set_attributes({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from ._base_group_chat_orchestrator import BaseGroupChatOrchestrator
from ._checkpoint import CheckpointStorage
from ._conversation_history import ensure_author, latest_user_message
from ._edge import EdgeCondition
from ._executor import Executor, handler
from ._orchestration_request_info import RequestInfoInterceptor
from ._participant_utils import GroupChatParticipantSpec, prepare_participant_metadata, wrap_participant
Expand Down Expand Up @@ -213,7 +214,7 @@ class _GroupChatConfig:
# region Default participant factory

_GroupChatOrchestratorFactory: TypeAlias = Callable[[_GroupChatConfig], Executor]
_InterceptorSpec: TypeAlias = tuple[Callable[[_GroupChatConfig], Executor], Callable[[Any], bool]]
_InterceptorSpec: TypeAlias = tuple[Callable[[_GroupChatConfig], Executor], EdgeCondition]


def _default_participant_factory(
Expand Down Expand Up @@ -1701,7 +1702,7 @@ def with_request_handler(
self,
handler: Callable[[_GroupChatConfig], Executor] | Executor,
*,
condition: Callable[[Any], bool],
condition: EdgeCondition,
) -> "GroupChatBuilder":
"""Register an interceptor factory that creates executors for special requests.

Expand Down
3 changes: 2 additions & 1 deletion python/packages/core/agent_framework/_workflows/_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ def _normalize_message_payload(message: Message) -> None:
# Route all messages through normal workflow edges
associated_edge_runners = self._edge_runner_map.get(source_executor_id, [])
if not associated_edge_runners:
logger.warning(f"No outgoing edges found for executor {source_executor_id}; dropping messages.")
# This is expected for terminal nodes (e.g., EndWorkflow, last action in workflow)
logger.debug(f"No outgoing edges found for executor {source_executor_id}; dropping messages.")
return

for message in messages:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from ._edge import (
Case,
Default,
EdgeCondition,
EdgeGroup,
FanInEdgeGroup,
FanOutEdgeGroup,
Expand Down Expand Up @@ -48,12 +49,12 @@ class _EdgeRegistration:
Args:
source: The registered source name.
target: The registered target name.
condition: An optional condition function for the edge.
condition: An optional condition function `(data) -> bool | Awaitable[bool]`.
"""

source: str
target: str
condition: Callable[[Any], bool] | None = None
condition: EdgeCondition | None = None


@dataclass
Expand Down Expand Up @@ -437,7 +438,10 @@ def add_agent(
"Consider using register_agent() for lazy initialization instead."
)
executor = self._maybe_wrap_agent(
agent, agent_thread=agent_thread, output_response=output_response, executor_id=id
agent,
agent_thread=agent_thread,
output_response=output_response,
executor_id=id,
)
self._add_executor(executor)
return self
Expand All @@ -446,7 +450,7 @@ def add_edge(
self,
source: Executor | AgentProtocol | str,
target: Executor | AgentProtocol | str,
condition: Callable[[Any], bool] | None = None,
condition: EdgeCondition | None = None,
) -> Self:
"""Add a directed edge between two executors.

Expand All @@ -456,13 +460,14 @@ def add_edge(
Args:
source: The source executor or registered name of the source factory for the edge.
target: The target executor or registered name of the target factory for the edge.
condition: An optional condition function that determines whether the edge
should be traversed based on the message.
condition: An optional condition function `(data) -> bool | Awaitable[bool]`
that determines whether the edge should be traversed.
Example: `lambda data: data["ready"]`.

Note: If instances are provided for both source and target, they will be shared across
all workflow instances created from the built Workflow. To avoid this, consider
registering the executors and agents using `register_executor` and `register_agent`
and referencing them by factory name for lazy initialization instead.
Note: If instances are provided for both source and target, they will be shared across
all workflow instances created from the built Workflow. To avoid this, consider
registering the executors and agents using `register_executor` and `register_agent`
and referencing them by factory name for lazy initialization instead.

Returns:
Self: The WorkflowBuilder instance for method chaining.
Expand Down Expand Up @@ -496,12 +501,6 @@ async def process(self, count: int, ctx: WorkflowContext[Never, str]) -> None:
.build()
)


# With a condition
def only_large_numbers(msg: int) -> bool:
return msg > 100


workflow = (
WorkflowBuilder()
.register_executor(lambda: ProcessorA(id="a"), name="ProcessorA")
Expand Down Expand Up @@ -529,7 +528,7 @@ def only_large_numbers(msg: int) -> bool:
target_exec = self._maybe_wrap_agent(target) # type: ignore[arg-type]
source_id = self._add_executor(source_exec)
target_id = self._add_executor(target_exec)
self._edge_groups.append(SingleEdgeGroup(source_id, target_id, condition)) # type: ignore[call-arg]
self._edge_groups.append(SingleEdgeGroup(source_id, target_id, condition))
return self

def add_fan_out_edges(
Expand Down Expand Up @@ -1141,7 +1140,9 @@ async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None:
self._checkpoint_storage = checkpoint_storage
return self

def _resolve_edge_registry(self) -> tuple[Executor, list[Executor], list[EdgeGroup]]:
def _resolve_edge_registry(
self,
) -> tuple[Executor, list[Executor], list[EdgeGroup]]:
"""Resolve deferred edge registrations into executors and edge groups."""
if not self._start_executor:
raise ValueError("Starting executor must be set using set_start_executor before building the workflow.")
Expand Down Expand Up @@ -1211,7 +1212,11 @@ def _get_executor(name: str) -> Executor:
if start_executor is None:
raise ValueError("Failed to resolve starting executor from registered factories.")

return start_executor, list(executor_id_to_instance.values()), deferred_edge_groups
return (
start_executor,
list(executor_id_to_instance.values()),
deferred_edge_groups,
)

def build(self) -> Workflow:
"""Build and return the constructed workflow.
Expand Down
Loading
Loading