From f3810fc797cce569cadb1915b24e532c7a7a787f Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Wed, 4 Feb 2026 11:05:18 +0900 Subject: [PATCH 1/3] Adjust workflows TypeVars from prefix to suffix naming convention --- .../agent_framework/_workflows/__init__.py | 2 -- .../_base_group_chat_orchestrator.py | 14 ++++----- .../core/agent_framework/_workflows/_edge.py | 7 +++-- .../agent_framework/_workflows/_executor.py | 8 ++--- .../_workflows/_function_executor.py | 8 ++--- .../agent_framework/_workflows/_group_chat.py | 10 +++---- .../agent_framework/_workflows/_magentic.py | 16 +++++----- .../_workflows/_workflow_context.py | 30 +++++++++---------- 8 files changed, 47 insertions(+), 48 deletions(-) diff --git a/python/packages/core/agent_framework/_workflows/__init__.py b/python/packages/core/agent_framework/_workflows/__init__.py index 70706ff827..7c0a2e4ad4 100644 --- a/python/packages/core/agent_framework/_workflows/__init__.py +++ b/python/packages/core/agent_framework/_workflows/__init__.py @@ -102,7 +102,6 @@ RunnerContext, ) from ._sequential import SequentialBuilder -from ._shared_state import SharedState from ._validation import ( EdgeDuplicationError, GraphConnectivityError, @@ -179,7 +178,6 @@ "Runner", "RunnerContext", "SequentialBuilder", - "SharedState", "SingleEdgeGroup", "StandardMagenticManager", "SubWorkflowRequestMessage", diff --git a/python/packages/core/agent_framework/_workflows/_base_group_chat_orchestrator.py b/python/packages/core/agent_framework/_workflows/_base_group_chat_orchestrator.py index e3cc4bc7d2..e5a71d52d9 100644 --- a/python/packages/core/agent_framework/_workflows/_base_group_chat_orchestrator.py +++ b/python/packages/core/agent_framework/_workflows/_base_group_chat_orchestrator.py @@ -57,7 +57,7 @@ class GroupChatResponseMessage: TerminationCondition: TypeAlias = Callable[[list[ChatMessage]], bool | Awaitable[bool]] -GroupChatWorkflowContext_T_Out: TypeAlias = AgentExecutorRequest | GroupChatRequestMessage | GroupChatParticipantMessage +GroupChatWorkflowContextOutT: TypeAlias = AgentExecutorRequest | GroupChatRequestMessage | GroupChatParticipantMessage # region Group chat events @@ -201,7 +201,7 @@ def __init__( async def handle_str( self, task: str, - ctx: WorkflowContext[GroupChatWorkflowContext_T_Out, list[ChatMessage]], + ctx: WorkflowContext[GroupChatWorkflowContextOutT, list[ChatMessage]], ) -> None: """Handler for string input as workflow entry point. @@ -220,7 +220,7 @@ async def handle_str( async def handle_message( self, task: ChatMessage, - ctx: WorkflowContext[GroupChatWorkflowContext_T_Out, list[ChatMessage]], + ctx: WorkflowContext[GroupChatWorkflowContextOutT, list[ChatMessage]], ) -> None: """Handler for single ChatMessage input as workflow entry point. @@ -239,7 +239,7 @@ async def handle_message( async def handle_messages( self, task: list[ChatMessage], - ctx: WorkflowContext[GroupChatWorkflowContext_T_Out, list[ChatMessage]], + ctx: WorkflowContext[GroupChatWorkflowContextOutT, list[ChatMessage]], ) -> None: """Handler for list of ChatMessages as workflow entry point. @@ -262,7 +262,7 @@ async def handle_messages( async def handle_participant_response( self, response: AgentExecutorResponse | GroupChatResponseMessage, - ctx: WorkflowContext[GroupChatWorkflowContext_T_Out, list[ChatMessage]], + ctx: WorkflowContext[GroupChatWorkflowContextOutT, list[ChatMessage]], ) -> None: """Handler for participant responses. @@ -288,7 +288,7 @@ async def handle_participant_response( async def _handle_messages( self, messages: list[ChatMessage], - ctx: WorkflowContext[GroupChatWorkflowContext_T_Out, list[ChatMessage]], + ctx: WorkflowContext[GroupChatWorkflowContextOutT, list[ChatMessage]], ) -> None: """Handle task messages from users as workflow entry point. @@ -303,7 +303,7 @@ async def _handle_messages( async def _handle_response( self, response: AgentExecutorResponse | GroupChatResponseMessage, - ctx: WorkflowContext[GroupChatWorkflowContext_T_Out, list[ChatMessage]], + ctx: WorkflowContext[GroupChatWorkflowContextOutT, list[ChatMessage]], ) -> None: """Handle a participant response. diff --git a/python/packages/core/agent_framework/_workflows/_edge.py b/python/packages/core/agent_framework/_workflows/_edge.py index 02ca1722dd..3212eff41a 100644 --- a/python/packages/core/agent_framework/_workflows/_edge.py +++ b/python/packages/core/agent_framework/_workflows/_edge.py @@ -17,6 +17,9 @@ # Conditions receive the message data and return bool (sync or async). EdgeCondition: TypeAlias = Callable[[Any], bool | Awaitable[bool]] +# TypeVar for EdgeGroup subclasses used in class methods +EdgeGroupT = TypeVar("EdgeGroupT", bound="EdgeGroup") + def _extract_function_name(func: Callable[..., Any]) -> str: """Map a Python callable to a concise, human-focused identifier. @@ -308,8 +311,6 @@ class EdgeGroup(DictConvertible): from builtins import type as builtin_type - _T_EdgeGroup = TypeVar("_T_EdgeGroup", bound="EdgeGroup") - _TYPE_REGISTRY: ClassVar[dict[str, builtin_type["EdgeGroup"]]] = {} def __init__( @@ -392,7 +393,7 @@ def to_dict(self) -> dict[str, Any]: } @classmethod - def register(cls, subclass: builtin_type[_T_EdgeGroup]) -> builtin_type[_T_EdgeGroup]: + def register(cls, subclass: builtin_type[EdgeGroupT]) -> builtin_type[EdgeGroupT]: """Register a subclass so deserialisation can recover the right type. Registration is typically performed via the decorator syntax applied to diff --git a/python/packages/core/agent_framework/_workflows/_executor.py b/python/packages/core/agent_framework/_workflows/_executor.py index 18adc4b904..60a02e66eb 100644 --- a/python/packages/core/agent_framework/_workflows/_executor.py +++ b/python/packages/core/agent_framework/_workflows/_executor.py @@ -116,8 +116,8 @@ class LoggingExecutor(Executor): async def log_message(self, msg: str, ctx: WorkflowContext) -> None: print(f"Received: {msg}") # Only logging, no outputs - ### WorkflowContext[T_Out] - Enables sending messages of type T_Out via `ctx.send_message()`: + ### WorkflowContext[OutT] + Enables sending messages of type OutT via `ctx.send_message()`: .. code-block:: python @@ -126,8 +126,8 @@ class ProcessorExecutor(Executor): async def handler(self, msg: str, ctx: WorkflowContext[int]) -> None: await ctx.send_message(42) # Can send int messages - ### WorkflowContext[T_Out, T_W_Out] - Enables both sending messages (T_Out) and yielding workflow outputs (T_W_Out): + ### WorkflowContext[OutT, W_OutT] + Enables both sending messages (OutT) and yielding workflow outputs (W_OutT): .. code-block:: python diff --git a/python/packages/core/agent_framework/_workflows/_function_executor.py b/python/packages/core/agent_framework/_workflows/_function_executor.py index cac77d8173..a27e250690 100644 --- a/python/packages/core/agent_framework/_workflows/_function_executor.py +++ b/python/packages/core/agent_framework/_workflows/_function_executor.py @@ -64,11 +64,11 @@ def __init__( output: Optional explicit output type(s) that can be sent via ``ctx.send_message()``. Supports union types (e.g., ``str | int``) and string forward references. When provided, takes precedence over introspection from the ``WorkflowContext`` - first generic parameter (T_Out). + first generic parameter (OutT). workflow_output: Optional explicit output type(s) that can be yielded via ``ctx.yield_output()``. Supports union types (e.g., ``str | int``) and string forward references. When provided, takes precedence over introspection from the - ``WorkflowContext`` second generic parameter (T_W_Out). + ``WorkflowContext`` second generic parameter (W_OutT). Raises: ValueError: If func is a staticmethod or classmethod (use @handler on instance methods instead) @@ -262,11 +262,11 @@ async def process(self, data: str, ctx: WorkflowContext[str]): output: Optional explicit output type(s) that can be sent via ``ctx.send_message()``. Supports union types (e.g., ``str | int``) and string forward references. When provided, takes precedence over introspection from the ``WorkflowContext`` - first generic parameter (T_Out). + first generic parameter (OutT). workflow_output: Optional explicit output type(s) that can be yielded via ``ctx.yield_output()``. Supports union types (e.g., ``str | int``) and string forward references. When provided, takes precedence over introspection from the - ``WorkflowContext`` second generic parameter (T_W_Out). + ``WorkflowContext`` second generic parameter (W_OutT). Returns: A FunctionExecutor instance that can be wired into a Workflow. diff --git a/python/packages/core/agent_framework/_workflows/_group_chat.py b/python/packages/core/agent_framework/_workflows/_group_chat.py index 3f92d9ebf2..49fbe2fe50 100644 --- a/python/packages/core/agent_framework/_workflows/_group_chat.py +++ b/python/packages/core/agent_framework/_workflows/_group_chat.py @@ -39,7 +39,7 @@ GroupChatParticipantMessage, GroupChatRequestMessage, GroupChatResponseMessage, - GroupChatWorkflowContext_T_Out, + GroupChatWorkflowContextOutT, ParticipantRegistry, TerminationCondition, ) @@ -163,7 +163,7 @@ async def round_robin_selector(state: GroupChatState) -> str: async def _handle_messages( self, messages: list[ChatMessage], - ctx: WorkflowContext[GroupChatWorkflowContext_T_Out, list[ChatMessage]], + ctx: WorkflowContext[GroupChatWorkflowContextOutT, list[ChatMessage]], ) -> None: """Initialize orchestrator state and start the conversation loop.""" self._append_messages(messages) @@ -189,7 +189,7 @@ async def _handle_messages( async def _handle_response( self, response: AgentExecutorResponse | GroupChatResponseMessage, - ctx: WorkflowContext[GroupChatWorkflowContext_T_Out, list[ChatMessage]], + ctx: WorkflowContext[GroupChatWorkflowContextOutT, list[ChatMessage]], ) -> None: """Handle a participant response.""" messages = self._process_participant_response(response) @@ -324,7 +324,7 @@ def _append_messages(self, messages: Sequence[ChatMessage]) -> None: async def _handle_messages( self, messages: list[ChatMessage], - ctx: WorkflowContext[GroupChatWorkflowContext_T_Out, list[ChatMessage]], + ctx: WorkflowContext[GroupChatWorkflowContextOutT, list[ChatMessage]], ) -> None: """Initialize orchestrator state and start the conversation loop.""" self._append_messages(messages) @@ -356,7 +356,7 @@ async def _handle_messages( async def _handle_response( self, response: AgentExecutorResponse | GroupChatResponseMessage, - ctx: WorkflowContext[GroupChatWorkflowContext_T_Out, list[ChatMessage]], + ctx: WorkflowContext[GroupChatWorkflowContextOutT, list[ChatMessage]], ) -> None: """Handle a participant response.""" messages = self._process_participant_response(response) diff --git a/python/packages/core/agent_framework/_workflows/_magentic.py b/python/packages/core/agent_framework/_workflows/_magentic.py index eff87fd5f0..ce46a4a873 100644 --- a/python/packages/core/agent_framework/_workflows/_magentic.py +++ b/python/packages/core/agent_framework/_workflows/_magentic.py @@ -27,7 +27,7 @@ GroupChatParticipantMessage, GroupChatRequestMessage, GroupChatResponseMessage, - GroupChatWorkflowContext_T_Out, + GroupChatWorkflowContextOutT, ParticipantRegistry, ) from ._checkpoint import CheckpointStorage @@ -903,7 +903,7 @@ def __init__( async def _handle_messages( self, messages: list[ChatMessage], - ctx: WorkflowContext[GroupChatWorkflowContext_T_Out, list[ChatMessage]], + ctx: WorkflowContext[GroupChatWorkflowContextOutT, list[ChatMessage]], ) -> None: """Handle the initial task messages to start the workflow.""" if self._terminated: @@ -954,7 +954,7 @@ async def _handle_messages( async def _handle_response( self, response: AgentExecutorResponse | GroupChatResponseMessage, - ctx: WorkflowContext[GroupChatWorkflowContext_T_Out, list[ChatMessage]], + ctx: WorkflowContext[GroupChatWorkflowContextOutT, list[ChatMessage]], ) -> None: """Handle a response message from a participant.""" if self._magentic_context is None or self._task_ledger is None: @@ -980,7 +980,7 @@ async def handle_plan_review_response( self, original_request: MagenticPlanReviewRequest, response: MagenticPlanReviewResponse, - ctx: WorkflowContext[GroupChatWorkflowContext_T_Out, list[ChatMessage]], + ctx: WorkflowContext[GroupChatWorkflowContextOutT, list[ChatMessage]], ) -> None: """Handle the human response to the plan review request. @@ -1038,7 +1038,7 @@ async def _send_plan_review_request(self, ctx: WorkflowContext, is_stalled: bool async def _run_inner_loop( self, - ctx: WorkflowContext[GroupChatWorkflowContext_T_Out, list[ChatMessage]], + ctx: WorkflowContext[GroupChatWorkflowContextOutT, list[ChatMessage]], ) -> None: """Run the inner orchestration loop. Coordination phase. Serialized with a lock.""" if self._magentic_context is None or self._task_ledger is None: @@ -1048,7 +1048,7 @@ async def _run_inner_loop( async def _run_inner_loop_helper( self, - ctx: WorkflowContext[GroupChatWorkflowContext_T_Out, list[ChatMessage]], + ctx: WorkflowContext[GroupChatWorkflowContextOutT, list[ChatMessage]], ) -> None: """Run inner loop with exclusive access.""" # Narrow optional context for the remainder of this method @@ -1134,7 +1134,7 @@ async def _run_inner_loop_helper( async def _reset_and_replan( self, - ctx: WorkflowContext[GroupChatWorkflowContext_T_Out, list[ChatMessage]], + ctx: WorkflowContext[GroupChatWorkflowContextOutT, list[ChatMessage]], ) -> None: """Reset context and replan.""" if self._magentic_context is None: @@ -1169,7 +1169,7 @@ async def _reset_and_replan( async def _run_outer_loop( self, - ctx: WorkflowContext[GroupChatWorkflowContext_T_Out, list[ChatMessage]], + ctx: WorkflowContext[GroupChatWorkflowContextOutT, list[ChatMessage]], ) -> None: """Run the outer orchestration loop - planning phase.""" if self._magentic_context is None: diff --git a/python/packages/core/agent_framework/_workflows/_workflow_context.py b/python/packages/core/agent_framework/_workflows/_workflow_context.py index 708cdf3c51..65de26e1e0 100644 --- a/python/packages/core/agent_framework/_workflows/_workflow_context.py +++ b/python/packages/core/agent_framework/_workflows/_workflow_context.py @@ -31,8 +31,8 @@ if TYPE_CHECKING: from ._executor import Executor -T_Out = TypeVar("T_Out", default=Never) -T_W_Out = TypeVar("T_W_Out", default=Never) +OutT = TypeVar("OutT", default=Never) +W_OutT = TypeVar("W_OutT", default=Never) logger = logging.getLogger(__name__) @@ -67,7 +67,7 @@ def infer_output_types_from_ctx_annotation( if origin is None: return [], [] - # Expecting WorkflowContext[T_Out, T_W_Out] + # Expecting WorkflowContext[OutT, W_OutT] if origin is not WorkflowContext: return [], [] @@ -75,7 +75,7 @@ def infer_output_types_from_ctx_annotation( if not args: return [], [] - # WorkflowContext[T_Out] -> message_types from T_Out, no workflow output types + # WorkflowContext[OutT] -> message_types from OutT, no workflow output types if len(args) == 1: t = args[0] t_origin = get_origin(t) @@ -90,10 +90,10 @@ def infer_output_types_from_ctx_annotation( return [], [] return [t], [] - # WorkflowContext[T_Out, T_W_Out] -> message_types from T_Out, workflow_output_types from T_W_Out + # WorkflowContext[OutT, W_OutT] -> message_types from OutT, workflow_output_types from W_OutT t_out, t_w_out = args[:2] # Take first two args in case there are more - # Process T_Out for message_types + # Process OutT for message_types message_types: list[type[Any] | UnionType] = [] t_out_origin = get_origin(t_out) if t_out is Any: @@ -104,7 +104,7 @@ def infer_output_types_from_ctx_annotation( else: message_types = [t_out] - # Process T_W_Out for workflow_output_types + # Process W_OutT for workflow_output_types workflow_output_types: list[type[Any] | UnionType] = [] t_w_out_origin = get_origin(t_w_out) if t_w_out is Any: @@ -176,7 +176,7 @@ def _is_type_like(x: Any) -> bool: return isinstance(x, type) or get_origin(x) is not None or x is Never for i, type_arg in enumerate(type_args): - param_description = "T_Out" if i == 0 else "T_W_Out" + param_description = "OutT" if i == 0 else "W_OutT" # Allow Any explicitly if type_arg is Any: @@ -216,7 +216,7 @@ def _is_type_like(x: Any) -> bool: ) -class WorkflowContext(Generic[T_Out, T_W_Out]): +class WorkflowContext(Generic[OutT, W_OutT]): """Execution context that enables executors to interact with workflows and other executors. ## Overview @@ -235,8 +235,8 @@ class WorkflowContext(Generic[T_Out, T_W_Out]): async def log_handler(message: str, ctx: WorkflowContext) -> None: print(f"Received: {message}") # Only side effects - ### WorkflowContext[T_Out] - Enables sending messages of type T_Out to other executors: + ### WorkflowContext[OutT] + Enables sending messages of type OutT to other executors: .. code-block:: python @@ -244,8 +244,8 @@ async def processor(message: str, ctx: WorkflowContext[int]) -> None: result = len(message) await ctx.send_message(result) # Send int to downstream executors - ### WorkflowContext[T_Out, T_W_Out] - Enables both sending messages (T_Out) and yielding workflow outputs (T_W_Out): + ### WorkflowContext[OutT, W_OutT] + Enables both sending messages (OutT) and yielding workflow outputs (W_OutT): .. code-block:: python @@ -317,7 +317,7 @@ def request_id(self) -> str | None: """ return self._request_id - async def send_message(self, message: T_Out, target_id: str | None = None) -> None: + async def send_message(self, message: OutT, target_id: str | None = None) -> None: """Send a message to the workflow context. Args: @@ -349,7 +349,7 @@ async def send_message(self, message: T_Out, target_id: str | None = None) -> No await self._runner_context.send_message(msg) - async def yield_output(self, output: T_W_Out) -> None: + async def yield_output(self, output: W_OutT) -> None: """Set the output of the workflow. Args: From 92b095b455a35106736283444a9434bb1e9e0c72 Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Wed, 4 Feb 2026 11:14:52 +0900 Subject: [PATCH 2/3] Adjust shared state import --- python/packages/core/tests/workflow/test_edge.py | 2 +- python/packages/core/tests/workflow/test_workflow_states.py | 2 +- .../agent_framework_declarative/_workflows/_declarative_base.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/python/packages/core/tests/workflow/test_edge.py b/python/packages/core/tests/workflow/test_edge.py index 42e3893a73..95dc71219d 100644 --- a/python/packages/core/tests/workflow/test_edge.py +++ b/python/packages/core/tests/workflow/test_edge.py @@ -10,7 +10,6 @@ Executor, InProcRunnerContext, Message, - SharedState, WorkflowContext, handler, ) @@ -24,6 +23,7 @@ SwitchCaseEdgeGroupDefault, ) from agent_framework._workflows._edge_runner import create_edge_runner +from agent_framework._workflows._shared_state import SharedState from agent_framework.observability import EdgeGroupDeliveryStatus # Add for test diff --git a/python/packages/core/tests/workflow/test_workflow_states.py b/python/packages/core/tests/workflow/test_workflow_states.py index 53baf86383..4aec349d15 100644 --- a/python/packages/core/tests/workflow/test_workflow_states.py +++ b/python/packages/core/tests/workflow/test_workflow_states.py @@ -8,7 +8,6 @@ ExecutorFailedEvent, InProcRunnerContext, RequestInfoEvent, - SharedState, Workflow, WorkflowBuilder, WorkflowContext, @@ -20,6 +19,7 @@ WorkflowStatusEvent, handler, ) +from agent_framework._workflows._shared_state import SharedState class FailingExecutor(Executor): diff --git a/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py b/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py index 309a71a4b7..5fc34e1d7a 100644 --- a/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py +++ b/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py @@ -32,9 +32,9 @@ from agent_framework._workflows import ( Executor, - SharedState, WorkflowContext, ) +from agent_framework._workflows._shared_state import SharedState from powerfx import Engine if sys.version_info >= (3, 11): From 6d2dcfa198d290a3f01e6abe4e13b278c03aa34a Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Wed, 4 Feb 2026 11:32:00 +0900 Subject: [PATCH 3/3] Fix MCP tool kwargs serialization bug --- .../packages/core/agent_framework/_tools.py | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/python/packages/core/agent_framework/_tools.py b/python/packages/core/agent_framework/_tools.py index 2ebd7b9015..d88fa4b54c 100644 --- a/python/packages/core/agent_framework/_tools.py +++ b/python/packages/core/agent_framework/_tools.py @@ -796,11 +796,26 @@ async def invoke( attributes = get_function_span_attributes(self, tool_call_id=tool_call_id) if OBSERVABILITY_SETTINGS.SENSITIVE_DATA_ENABLED: # type: ignore[name-defined] + # Filter out framework kwargs that are not JSON serializable + serializable_kwargs = { + k: v + for k, v in kwargs.items() + if k + not in { + "chat_options", + "tools", + "tool_choice", + "thread", + "conversation_id", + "options", + "response_format", + } + } attributes.update({ OtelAttr.TOOL_ARGUMENTS: arguments.model_dump_json() if arguments - else json.dumps(kwargs) - if kwargs + else json.dumps(serializable_kwargs, default=str) + if serializable_kwargs else "None" }) with get_function_span(attributes=attributes) as span: