From 166e2ae4bf825f31a0df850211ac5006f78c2e2a Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Fri, 14 Nov 2025 15:11:53 +0900 Subject: [PATCH 1/4] Adjust magentic event types raised. No need for custom events. --- .../agent_framework/_workflows/__init__.py | 20 ++- .../agent_framework/_workflows/__init__.pyi | 20 ++- .../core/agent_framework/_workflows/_agent.py | 8 +- .../agent_framework/_workflows/_magentic.py | 143 +++++------------- .../core/tests/workflow/test_magentic.py | 17 ++- .../agents/magentic_workflow_as_agent.py | 73 +++------ .../magentic_human_plan_update.py | 54 +++---- 7 files changed, 116 insertions(+), 219 deletions(-) diff --git a/python/packages/core/agent_framework/_workflows/__init__.py b/python/packages/core/agent_framework/_workflows/__init__.py index 6c4948f0ea..ba0c591086 100644 --- a/python/packages/core/agent_framework/_workflows/__init__.py +++ b/python/packages/core/agent_framework/_workflows/__init__.py @@ -62,13 +62,15 @@ ) from ._handoff import HandoffBuilder, HandoffUserInputRequest from ._magentic import ( - MagenticAgentDeltaEvent, - MagenticAgentMessageEvent, + MAGENTIC_EVENT_TYPE_AGENT_DELTA, + MAGENTIC_EVENT_TYPE_ORCHESTRATOR, + ORCH_MSG_KIND_INSTRUCTION, + ORCH_MSG_KIND_NOTICE, + ORCH_MSG_KIND_TASK_LEDGER, + ORCH_MSG_KIND_USER_TASK, MagenticBuilder, MagenticContext, - MagenticFinalResultEvent, MagenticManagerBase, - MagenticOrchestratorMessageEvent, MagenticPlanReviewDecision, MagenticPlanReviewReply, MagenticPlanReviewRequest, @@ -102,6 +104,12 @@ "DEFAULT_MANAGER_INSTRUCTIONS", "DEFAULT_MANAGER_STRUCTURED_OUTPUT_PROMPT", "DEFAULT_MAX_ITERATIONS", + "MAGENTIC_EVENT_TYPE_AGENT_DELTA", + "MAGENTIC_EVENT_TYPE_ORCHESTRATOR", + "ORCH_MSG_KIND_INSTRUCTION", + "ORCH_MSG_KIND_NOTICE", + "ORCH_MSG_KIND_TASK_LEDGER", + "ORCH_MSG_KIND_USER_TASK", "AgentExecutor", "AgentExecutorRequest", "AgentExecutorResponse", @@ -130,13 +138,9 @@ "HandoffUserInputRequest", "InMemoryCheckpointStorage", "InProcRunnerContext", - "MagenticAgentDeltaEvent", - "MagenticAgentMessageEvent", "MagenticBuilder", "MagenticContext", - "MagenticFinalResultEvent", "MagenticManagerBase", - "MagenticOrchestratorMessageEvent", "MagenticPlanReviewDecision", "MagenticPlanReviewReply", "MagenticPlanReviewRequest", diff --git a/python/packages/core/agent_framework/_workflows/__init__.pyi b/python/packages/core/agent_framework/_workflows/__init__.pyi index 44247d685c..de7ca0e2b2 100644 --- a/python/packages/core/agent_framework/_workflows/__init__.pyi +++ b/python/packages/core/agent_framework/_workflows/__init__.pyi @@ -59,13 +59,15 @@ from ._group_chat import ( ) from ._handoff import HandoffBuilder, HandoffUserInputRequest from ._magentic import ( - MagenticAgentDeltaEvent, - MagenticAgentMessageEvent, + MAGENTIC_EVENT_TYPE_AGENT_DELTA, + MAGENTIC_EVENT_TYPE_ORCHESTRATOR, + ORCH_MSG_KIND_INSTRUCTION, + ORCH_MSG_KIND_NOTICE, + ORCH_MSG_KIND_TASK_LEDGER, + ORCH_MSG_KIND_USER_TASK, MagenticBuilder, MagenticContext, - MagenticFinalResultEvent, MagenticManagerBase, - MagenticOrchestratorMessageEvent, MagenticPlanReviewDecision, MagenticPlanReviewReply, MagenticPlanReviewRequest, @@ -99,6 +101,12 @@ __all__ = [ "DEFAULT_MANAGER_INSTRUCTIONS", "DEFAULT_MANAGER_STRUCTURED_OUTPUT_PROMPT", "DEFAULT_MAX_ITERATIONS", + "MAGENTIC_EVENT_TYPE_AGENT_DELTA", + "MAGENTIC_EVENT_TYPE_ORCHESTRATOR", + "ORCH_MSG_KIND_INSTRUCTION", + "ORCH_MSG_KIND_NOTICE", + "ORCH_MSG_KIND_TASK_LEDGER", + "ORCH_MSG_KIND_USER_TASK", "AgentExecutor", "AgentExecutorRequest", "AgentExecutorResponse", @@ -127,13 +135,9 @@ __all__ = [ "HandoffUserInputRequest", "InMemoryCheckpointStorage", "InProcRunnerContext", - "MagenticAgentDeltaEvent", - "MagenticAgentMessageEvent", "MagenticBuilder", "MagenticContext", - "MagenticFinalResultEvent", "MagenticManagerBase", - "MagenticOrchestratorMessageEvent", "MagenticPlanReviewDecision", "MagenticPlanReviewReply", "MagenticPlanReviewRequest", diff --git a/python/packages/core/agent_framework/_workflows/_agent.py b/python/packages/core/agent_framework/_workflows/_agent.py index f2a0ea9d75..1831c72953 100644 --- a/python/packages/core/agent_framework/_workflows/_agent.py +++ b/python/packages/core/agent_framework/_workflows/_agent.py @@ -236,8 +236,9 @@ def _convert_workflow_event_to_agent_update( ) -> AgentRunResponseUpdate | None: """Convert a workflow event to an AgentRunResponseUpdate. - Only AgentRunUpdateEvent and RequestInfoEvent are processed and the rest - are not relevant. Returns None if the event is not relevant. + Only AgentRunUpdateEvent and RequestInfoEvent are processed. + Other workflow events are ignored as they are workflow-internal and should + have corresponding AgentRunUpdateEvent emissions if relevant to agent consumers. """ match event: case AgentRunUpdateEvent(data=update): @@ -271,9 +272,8 @@ def _convert_workflow_event_to_agent_update( created_at=datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ"), ) case _: - # Ignore non-agent workflow events + # Ignore workflow-internal events pass - # We only care about the above two events and discard the rest. return None def _extract_function_responses(self, input_messages: list[ChatMessage]) -> dict[str, Any]: diff --git a/python/packages/core/agent_framework/_workflows/_magentic.py b/python/packages/core/agent_framework/_workflows/_magentic.py index 9d21391ad8..ae170bbbbd 100644 --- a/python/packages/core/agent_framework/_workflows/_magentic.py +++ b/python/packages/core/agent_framework/_workflows/_magentic.py @@ -10,7 +10,7 @@ from collections.abc import AsyncIterable, Sequence from dataclasses import dataclass, field from enum import Enum -from typing import Any, Protocol, TypeVar, Union, cast +from typing import Any, TypeVar, cast from uuid import uuid4 from agent_framework import ( @@ -19,15 +19,13 @@ AgentRunResponseUpdate, ChatClientProtocol, ChatMessage, - FunctionCallContent, - FunctionResultContent, Role, ) from ._base_group_chat_orchestrator import BaseGroupChatOrchestrator from ._checkpoint import CheckpointStorage, WorkflowCheckpoint from ._const import EXECUTOR_STATE_KEY -from ._events import WorkflowEvent +from ._events import AgentRunUpdateEvent, WorkflowEvent from ._executor import Executor, handler from ._group_chat import ( GroupChatBuilder, @@ -98,64 +96,16 @@ def _message_from_payload(payload: Any) -> ChatMessage: raise TypeError("Unable to reconstruct ChatMessage from payload") -# region Unified callback API (developer-facing) +# region Magentic event metadata constants +# Event type identifiers for magentic_event_type in additional_properties +MAGENTIC_EVENT_TYPE_ORCHESTRATOR = "orchestrator_message" +MAGENTIC_EVENT_TYPE_AGENT_DELTA = "agent_delta" -@dataclass -class MagenticOrchestratorMessageEvent(WorkflowEvent): - orchestrator_id: str = "" - message: ChatMessage | None = None - kind: str = "" - - def __post_init__(self) -> None: - super().__init__(data=self.message) - - -@dataclass -class MagenticAgentDeltaEvent(WorkflowEvent): - agent_id: str | None = None - text: str | None = None - function_call_id: str | None = None - function_call_name: str | None = None - function_call_arguments: Any | None = None - function_result_id: str | None = None - function_result: Any | None = None - role: Role | None = None - - def __post_init__(self) -> None: - super().__init__(data=self.text) - - -@dataclass -class MagenticAgentMessageEvent(WorkflowEvent): - agent_id: str = "" - message: ChatMessage | None = None - - def __post_init__(self) -> None: - super().__init__(data=self.message) - - -@dataclass -class MagenticFinalResultEvent(WorkflowEvent): - message: ChatMessage | None = None - - def __post_init__(self) -> None: - super().__init__(data=self.message) +# Orchestrator message kind values for orchestrator_message_kind in additional_properties +# (already defined above as ORCH_MSG_KIND_*) - -MagenticCallbackEvent = Union[ - MagenticOrchestratorMessageEvent, - MagenticAgentDeltaEvent, - MagenticAgentMessageEvent, - MagenticFinalResultEvent, -] - - -class CallbackSink(Protocol): - async def __call__(self, event: MagenticCallbackEvent) -> None: ... - - -# endregion Unified callback API +# endregion Magentic event metadata constants # region Magentic One Prompts @@ -1015,9 +965,9 @@ async def _emit_orchestrator_message( ) -> None: """Emit orchestrator message to the workflow event stream. - Orchestrator messages flow through the unified workflow event stream as - MagenticOrchestratorMessageEvent instances. Consumers should subscribe to - these events via workflow.run_stream(). + Emits both MagenticOrchestratorMessageEvent (for backward compatibility) and + AgentRunUpdateEvent (for agent wrapper consumers) with metadata indicating + the orchestrator event type. Args: ctx: Workflow context for adding events to the stream @@ -1026,15 +976,24 @@ async def _emit_orchestrator_message( Example: async for event in workflow.run_stream("task"): - if isinstance(event, MagenticOrchestratorMessageEvent): - print(f"Orchestrator {event.kind}: {event.message.text}") + if isinstance(event, AgentRunUpdateEvent): + props = event.data.additional_properties if event.data else None + if props and props.get("magentic_event_type") == "orchestrator_message": + kind = props.get("orchestrator_message_kind", "") + print(f"Orchestrator {kind}: {event.data.text}") """ - event = MagenticOrchestratorMessageEvent( - orchestrator_id=self.id, - message=message, - kind=kind, + # Emit AgentRunUpdateEvent with metadata + update = AgentRunResponseUpdate( + text=message.text, + role=message.role, + author_name=self._get_author_name(), + additional_properties={ + "magentic_event_type": MAGENTIC_EVENT_TYPE_ORCHESTRATOR, + "orchestrator_message_kind": kind, + "orchestrator_id": self.id, + }, ) - await ctx.add_event(event) + await ctx.add_event(AgentRunUpdateEvent(executor_id=self.id, data=update)) def snapshot_state(self) -> dict[str, Any]: """Capture current orchestrator state for checkpointing. @@ -1559,7 +1518,6 @@ async def _prepare_final_answer( # Emit a completed event for the workflow await context.yield_output(final_answer) - await context.add_event(MagenticFinalResultEvent(message=final_answer)) async def _check_within_limits_or_complete( self, @@ -1591,7 +1549,6 @@ async def _check_within_limits_or_complete( # Yield the partial result and signal completion await context.yield_output(partial_result) - await context.add_event(MagenticFinalResultEvent(message=partial_result)) return False return True @@ -1790,45 +1747,23 @@ async def _emit_agent_delta_event( ctx: WorkflowContext[Any, Any], update: AgentRunResponseUpdate, ) -> None: - contents = list(getattr(update, "contents", []) or []) - chunk = getattr(update, "text", None) - if not chunk: - chunk = "".join(getattr(item, "text", "") for item in contents if hasattr(item, "text")) - if chunk: - await ctx.add_event( - MagenticAgentDeltaEvent( - agent_id=self._agent_id, - text=chunk or None, - role=getattr(update, "role", None), - ) - ) - for item in contents: - if isinstance(item, FunctionCallContent): - await ctx.add_event( - MagenticAgentDeltaEvent( - agent_id=self._agent_id, - function_call_id=getattr(item, "call_id", None), - function_call_name=getattr(item, "name", None), - function_call_arguments=getattr(item, "arguments", None), - role=getattr(update, "role", None), - ) - ) - elif isinstance(item, FunctionResultContent): - await ctx.add_event( - MagenticAgentDeltaEvent( - agent_id=self._agent_id, - function_result_id=getattr(item, "call_id", None), - function_result=getattr(item, "result", None), - role=getattr(update, "role", None), - ) - ) + # Add metadata to identify this as an agent streaming update + if update.additional_properties is None: + update.additional_properties = {} + update.additional_properties["magentic_event_type"] = MAGENTIC_EVENT_TYPE_AGENT_DELTA + update.additional_properties["agent_id"] = self._agent_id + + # Emit AgentRunUpdateEvent with the agent response update + await ctx.add_event(AgentRunUpdateEvent(executor_id=self._agent_id, data=update)) async def _emit_agent_message_event( self, ctx: WorkflowContext[Any, Any], message: ChatMessage, ) -> None: - await ctx.add_event(MagenticAgentMessageEvent(agent_id=self._agent_id, message=message)) + # Agent message completion is already communicated via streaming updates + # No additional event needed + pass async def _invoke_agent( self, diff --git a/python/packages/core/tests/workflow/test_magentic.py b/python/packages/core/tests/workflow/test_magentic.py index eda0675361..d046a87a59 100644 --- a/python/packages/core/tests/workflow/test_magentic.py +++ b/python/packages/core/tests/workflow/test_magentic.py @@ -9,13 +9,13 @@ from agent_framework import ( AgentRunResponse, AgentRunResponseUpdate, + AgentRunUpdateEvent, BaseAgent, ChatClientProtocol, ChatMessage, ChatResponse, ChatResponseUpdate, Executor, - MagenticAgentMessageEvent, MagenticBuilder, MagenticManagerBase, MagenticPlanReviewDecision, @@ -185,7 +185,6 @@ async def test_standard_manager_progress_ledger_and_fallback(): assert ledger2.is_request_satisfied.answer is False -@pytest.mark.skip(reason="Response handling refactored - responses no longer passed to run_stream()") async def test_magentic_workflow_plan_review_approval_to_completion(): manager = FakeManager(max_round_count=10) wf = ( @@ -204,7 +203,7 @@ async def test_magentic_workflow_plan_review_approval_to_completion(): completed = False output: ChatMessage | None = None - async for ev in wf.run_stream( + async for ev in wf.send_responses_streaming( responses={req_event.request_id: MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.APPROVE)} ): if isinstance(ev, WorkflowStatusEvent) and ev.state == WorkflowRunState.IDLE: @@ -218,7 +217,6 @@ async def test_magentic_workflow_plan_review_approval_to_completion(): assert isinstance(output, ChatMessage) -@pytest.mark.skip(reason="Response handling refactored - responses no longer passed to run_stream()") async def test_magentic_plan_review_approve_with_comments_replans_and_proceeds(): class CountingManager(FakeManager): # Declare as a model field so assignment is allowed under Pydantic @@ -250,7 +248,7 @@ async def replan(self, magentic_context: MagenticContext) -> ChatMessage: # typ # Reply APPROVE with comments (no edited text). Expect one replan and no second review round. saw_second_review = False completed = False - async for ev in wf.run_stream( + async for ev in wf.send_responses_streaming( responses={ req_event.request_id: MagenticPlanReviewReply( decision=MagenticPlanReviewDecision.APPROVE, @@ -298,7 +296,6 @@ async def test_magentic_orchestrator_round_limit_produces_partial_result(): assert data.role == Role.ASSISTANT -@pytest.mark.skip(reason="Response handling refactored - send_responses_streaming no longer exists") async def test_magentic_checkpoint_resume_round_trip(): storage = InMemoryCheckpointStorage() @@ -556,8 +553,12 @@ async def _collect_agent_responses_setup(participant_obj: object): events.append(ev) if isinstance(ev, WorkflowOutputEvent): break - if isinstance(ev, MagenticAgentMessageEvent) and ev.message is not None: - captured.append(ev.message) + if isinstance(ev, AgentRunUpdateEvent) and ev.data is not None: + captured.append( + ChatMessage( + role=ev.data.role or Role.ASSISTANT, text=ev.data.text or "", author_name=ev.data.author_name + ) + ) if len(events) > 50: break diff --git a/python/samples/getting_started/workflows/agents/magentic_workflow_as_agent.py b/python/samples/getting_started/workflows/agents/magentic_workflow_as_agent.py index 6fab7c495c..091f37c055 100644 --- a/python/samples/getting_started/workflows/agents/magentic_workflow_as_agent.py +++ b/python/samples/getting_started/workflows/agents/magentic_workflow_as_agent.py @@ -4,14 +4,11 @@ import logging from agent_framework import ( + MAGENTIC_EVENT_TYPE_AGENT_DELTA, + MAGENTIC_EVENT_TYPE_ORCHESTRATOR, ChatAgent, HostedCodeInterpreterTool, - MagenticAgentDeltaEvent, - MagenticAgentMessageEvent, MagenticBuilder, - MagenticFinalResultEvent, - MagenticOrchestratorMessageEvent, - WorkflowOutputEvent, ) from agent_framework.openai import OpenAIChatClient, OpenAIResponsesClient @@ -78,58 +75,24 @@ async def main() -> None: print("\nStarting workflow execution...") try: - last_stream_agent_id: str | None = None - stream_line_open: bool = False - final_output: str | None = None - - async for event in workflow.run_stream(task): - if isinstance(event, MagenticOrchestratorMessageEvent): - print(f"\n[ORCH:{event.kind}]\n\n{getattr(event.message, 'text', '')}\n{'-' * 26}") - elif isinstance(event, MagenticAgentDeltaEvent): - if last_stream_agent_id != event.agent_id or not stream_line_open: - if stream_line_open: - print() - print(f"\n[STREAM:{event.agent_id}]: ", end="", flush=True) - last_stream_agent_id = event.agent_id - stream_line_open = True - if event.text: - print(event.text, end="", flush=True) - elif isinstance(event, MagenticAgentMessageEvent): - if stream_line_open: - print(" (final)") - stream_line_open = False - print() - msg = event.message - if msg is not None: - response_text = (msg.text or "").replace("\n", " ") - print(f"\n[AGENT:{event.agent_id}] {msg.role.value}\n\n{response_text}\n{'-' * 26}") - elif isinstance(event, MagenticFinalResultEvent): - print("\n" + "=" * 50) - print("FINAL RESULT:") - print("=" * 50) - if event.message is not None: - print(event.message.text) - print("=" * 50) - elif isinstance(event, WorkflowOutputEvent): - final_output = str(event.data) if event.data is not None else None - - if stream_line_open: - print() - stream_line_open = False - - if final_output is not None: - print(f"\nWorkflow completed with result:\n\n{final_output}\n") - # Wrap the workflow as an agent for composition scenarios + print("\nWrapping workflow as an agent and running...") workflow_agent = workflow.as_agent(name="MagenticWorkflowAgent") - agent_result = await workflow_agent.run(task) - - if agent_result.messages: - print("\n===== as_agent() Transcript =====") - for i, msg in enumerate(agent_result.messages, start=1): - role_value = getattr(msg.role, "value", msg.role) - speaker = msg.author_name or role_value - print(f"{'-' * 50}\n{i:02d} [{speaker}]\n{msg.text}") + async for response in workflow_agent.run_stream(task): + # AgentRunResponseUpdate objects contain the streaming agent data + # Check metadata to understand event type + props = response.additional_properties + event_type = props.get("magentic_event_type") if props else None + + if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR: + kind = props.get("orchestrator_message_kind", "") if props else "" + print(f"\n[ORCHESTRATOR:{kind}] {response.text}") + elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA: + if response.text: + print(response.text, end="", flush=True) + elif response.text: + # Fallback for any other events with text + print(response.text, end="", flush=True) except Exception as e: print(f"Workflow execution failed: {e}") diff --git a/python/samples/getting_started/workflows/orchestration/magentic_human_plan_update.py b/python/samples/getting_started/workflows/orchestration/magentic_human_plan_update.py index 5ba8b5cc23..52df5b24c6 100644 --- a/python/samples/getting_started/workflows/orchestration/magentic_human_plan_update.py +++ b/python/samples/getting_started/workflows/orchestration/magentic_human_plan_update.py @@ -5,13 +5,12 @@ from typing import cast from agent_framework import ( + MAGENTIC_EVENT_TYPE_AGENT_DELTA, + MAGENTIC_EVENT_TYPE_ORCHESTRATOR, + AgentRunUpdateEvent, ChatAgent, HostedCodeInterpreterTool, - MagenticAgentDeltaEvent, - MagenticAgentMessageEvent, MagenticBuilder, - MagenticFinalResultEvent, - MagenticOrchestratorMessageEvent, MagenticPlanReviewDecision, MagenticPlanReviewReply, MagenticPlanReviewRequest, @@ -117,34 +116,25 @@ def on_exception(exception: Exception) -> None: # Collect events from the stream async for event in stream: - if isinstance(event, MagenticOrchestratorMessageEvent): - print(f"\n[ORCH:{event.kind}]\n\n{getattr(event.message, 'text', '')}\n{'-' * 26}") - elif isinstance(event, MagenticAgentDeltaEvent): - if last_stream_agent_id != event.agent_id or not stream_line_open: - if stream_line_open: - print() - print(f"\n[STREAM:{event.agent_id}]: ", end="", flush=True) - last_stream_agent_id = event.agent_id - stream_line_open = True - if event.text: - print(event.text, end="", flush=True) - elif isinstance(event, MagenticAgentMessageEvent): - if stream_line_open: - print(" (final)") - stream_line_open = False - print() - msg = event.message - if msg is not None: - response_text = (msg.text or "").replace("\n", " ") - print(f"\n[AGENT:{event.agent_id}] {msg.role.value}\n\n{response_text}\n{'-' * 26}") - elif isinstance(event, MagenticFinalResultEvent): - print("\n" + "=" * 50) - print("FINAL RESULT:") - print("=" * 50) - if event.message is not None: - print(event.message.text) - print("=" * 50) - if isinstance(event, RequestInfoEvent) and event.request_type is MagenticPlanReviewRequest: + if isinstance(event, AgentRunUpdateEvent): + props = event.data.additional_properties if event.data else None + event_type = props.get("magentic_event_type") if props else None + + if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR: + kind = props.get("orchestrator_message_kind", "") if props else "" + text = event.data.text if event.data else "" + print(f"\n[ORCH:{kind}]\n\n{text}\n{'-' * 26}") + elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA: + agent_id = props.get("agent_id", "unknown") if props else "unknown" + if last_stream_agent_id != agent_id or not stream_line_open: + if stream_line_open: + print() + print(f"\n[STREAM:{agent_id}]: ", end="", flush=True) + last_stream_agent_id = agent_id + stream_line_open = True + if event.data and event.data.text: + print(event.data.text, end="", flush=True) + elif isinstance(event, RequestInfoEvent) and event.request_type is MagenticPlanReviewRequest: pending_request = event review_req = cast(MagenticPlanReviewRequest, event.data) if review_req.plan_text: From af132d57abc73c1d0b9edfab7065d353fb604026 Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Fri, 14 Nov 2025 15:18:05 +0900 Subject: [PATCH 2/4] Cleanup --- python/packages/core/agent_framework/_workflows/_magentic.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/python/packages/core/agent_framework/_workflows/_magentic.py b/python/packages/core/agent_framework/_workflows/_magentic.py index ae170bbbbd..3b53a3ac6d 100644 --- a/python/packages/core/agent_framework/_workflows/_magentic.py +++ b/python/packages/core/agent_framework/_workflows/_magentic.py @@ -102,9 +102,6 @@ def _message_from_payload(payload: Any) -> ChatMessage: MAGENTIC_EVENT_TYPE_ORCHESTRATOR = "orchestrator_message" MAGENTIC_EVENT_TYPE_AGENT_DELTA = "agent_delta" -# Orchestrator message kind values for orchestrator_message_kind in additional_properties -# (already defined above as ORCH_MSG_KIND_*) - # endregion Magentic event metadata constants # region Magentic One Prompts From 5b58afc143efa3e7ec632db828761bc919997d2e Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Fri, 14 Nov 2025 15:22:17 +0900 Subject: [PATCH 3/4] Fix test --- .../core/tests/workflow/test_group_chat.py | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/python/packages/core/tests/workflow/test_group_chat.py b/python/packages/core/tests/workflow/test_group_chat.py index ab920d0663..8d1798a7a4 100644 --- a/python/packages/core/tests/workflow/test_group_chat.py +++ b/python/packages/core/tests/workflow/test_group_chat.py @@ -6,19 +6,20 @@ import pytest from agent_framework import ( + MAGENTIC_EVENT_TYPE_AGENT_DELTA, + MAGENTIC_EVENT_TYPE_ORCHESTRATOR, AgentRunResponse, AgentRunResponseUpdate, + AgentRunUpdateEvent, AgentThread, BaseAgent, ChatMessage, GroupChatBuilder, GroupChatDirective, GroupChatStateSnapshot, - MagenticAgentMessageEvent, MagenticBuilder, MagenticContext, MagenticManagerBase, - MagenticOrchestratorMessageEvent, Role, TextContent, Workflow, @@ -155,14 +156,17 @@ async def test_magentic_builder_returns_workflow_and_runs() -> None: assert isinstance(workflow, Workflow) outputs: list[ChatMessage] = [] - orchestrator_events: list[MagenticOrchestratorMessageEvent] = [] - agent_events: list[MagenticAgentMessageEvent] = [] + orchestrator_event_count = 0 + agent_event_count = 0 start_message = _MagenticStartMessage.from_string("compose summary") async for event in workflow.run_stream(start_message): - if isinstance(event, MagenticOrchestratorMessageEvent): - orchestrator_events.append(event) - if isinstance(event, MagenticAgentMessageEvent): - agent_events.append(event) + if isinstance(event, AgentRunUpdateEvent): + props = event.data.additional_properties if event.data else None + event_type = props.get("magentic_event_type") if props else None + if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR: + orchestrator_event_count += 1 + elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA: + agent_event_count += 1 if isinstance(event, WorkflowOutputEvent): msg = event.data if isinstance(msg, ChatMessage): @@ -172,8 +176,8 @@ async def test_magentic_builder_returns_workflow_and_runs() -> None: final = outputs[-1] assert final.text == "final" assert final.author_name == "magentic_manager" - assert orchestrator_events, "Expected orchestrator events to be emitted" - assert agent_events, "Expected agent message events to be emitted" + assert orchestrator_event_count > 0, "Expected orchestrator events to be emitted" + assert agent_event_count > 0, "Expected agent delta events to be emitted" async def test_group_chat_as_agent_accepts_conversation() -> None: From 957cda3947ece216977241d5f313454d5c75f993 Mon Sep 17 00:00:00 2001 From: Evan Mattson <35585003+moonbox3@users.noreply.github.com> Date: Fri, 14 Nov 2025 15:25:27 +0900 Subject: [PATCH 4/4] Update python/packages/core/agent_framework/_workflows/_magentic.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- python/packages/core/agent_framework/_workflows/_magentic.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/packages/core/agent_framework/_workflows/_magentic.py b/python/packages/core/agent_framework/_workflows/_magentic.py index 3b53a3ac6d..7cf8814882 100644 --- a/python/packages/core/agent_framework/_workflows/_magentic.py +++ b/python/packages/core/agent_framework/_workflows/_magentic.py @@ -962,8 +962,7 @@ async def _emit_orchestrator_message( ) -> None: """Emit orchestrator message to the workflow event stream. - Emits both MagenticOrchestratorMessageEvent (for backward compatibility) and - AgentRunUpdateEvent (for agent wrapper consumers) with metadata indicating + Emits an AgentRunUpdateEvent (for agent wrapper consumers) with metadata indicating the orchestrator event type. Args: