diff --git a/python/packages/core/agent_framework/_workflows/__init__.py b/python/packages/core/agent_framework/_workflows/__init__.py index 18dd674a92..35c9f11501 100644 --- a/python/packages/core/agent_framework/_workflows/__init__.py +++ b/python/packages/core/agent_framework/_workflows/__init__.py @@ -64,13 +64,15 @@ ) from ._handoff import HandoffBuilder, HandoffUserInputRequest from ._magentic import ( - MagenticAgentDeltaEvent, - MagenticAgentMessageEvent, + MAGENTIC_EVENT_TYPE_AGENT_DELTA, + MAGENTIC_EVENT_TYPE_ORCHESTRATOR, + ORCH_MSG_KIND_INSTRUCTION, + ORCH_MSG_KIND_NOTICE, + ORCH_MSG_KIND_TASK_LEDGER, + ORCH_MSG_KIND_USER_TASK, MagenticBuilder, MagenticContext, - MagenticFinalResultEvent, MagenticManagerBase, - MagenticOrchestratorMessageEvent, MagenticPlanReviewDecision, MagenticPlanReviewReply, MagenticPlanReviewRequest, @@ -104,6 +106,12 @@ "DEFAULT_MANAGER_INSTRUCTIONS", "DEFAULT_MANAGER_STRUCTURED_OUTPUT_PROMPT", "DEFAULT_MAX_ITERATIONS", + "MAGENTIC_EVENT_TYPE_AGENT_DELTA", + "MAGENTIC_EVENT_TYPE_ORCHESTRATOR", + "ORCH_MSG_KIND_INSTRUCTION", + "ORCH_MSG_KIND_NOTICE", + "ORCH_MSG_KIND_TASK_LEDGER", + "ORCH_MSG_KIND_USER_TASK", "AgentExecutor", "AgentExecutorRequest", "AgentExecutorResponse", @@ -132,13 +140,9 @@ "HandoffUserInputRequest", "InMemoryCheckpointStorage", "InProcRunnerContext", - "MagenticAgentDeltaEvent", - "MagenticAgentMessageEvent", "MagenticBuilder", "MagenticContext", - "MagenticFinalResultEvent", "MagenticManagerBase", - "MagenticOrchestratorMessageEvent", "MagenticPlanReviewDecision", "MagenticPlanReviewReply", "MagenticPlanReviewRequest", diff --git a/python/packages/core/agent_framework/_workflows/_agent.py b/python/packages/core/agent_framework/_workflows/_agent.py index f2a0ea9d75..1831c72953 100644 --- a/python/packages/core/agent_framework/_workflows/_agent.py +++ b/python/packages/core/agent_framework/_workflows/_agent.py @@ -236,8 +236,9 @@ def _convert_workflow_event_to_agent_update( ) -> AgentRunResponseUpdate | None: """Convert a workflow event to an AgentRunResponseUpdate. - Only AgentRunUpdateEvent and RequestInfoEvent are processed and the rest - are not relevant. Returns None if the event is not relevant. + Only AgentRunUpdateEvent and RequestInfoEvent are processed. + Other workflow events are ignored as they are workflow-internal and should + have corresponding AgentRunUpdateEvent emissions if relevant to agent consumers. """ match event: case AgentRunUpdateEvent(data=update): @@ -271,9 +272,8 @@ def _convert_workflow_event_to_agent_update( created_at=datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ"), ) case _: - # Ignore non-agent workflow events + # Ignore workflow-internal events pass - # We only care about the above two events and discard the rest. return None def _extract_function_responses(self, input_messages: list[ChatMessage]) -> dict[str, Any]: diff --git a/python/packages/core/agent_framework/_workflows/_magentic.py b/python/packages/core/agent_framework/_workflows/_magentic.py index ea6fb259a6..624c6f50ae 100644 --- a/python/packages/core/agent_framework/_workflows/_magentic.py +++ b/python/packages/core/agent_framework/_workflows/_magentic.py @@ -10,7 +10,7 @@ from collections.abc import AsyncIterable, Sequence from dataclasses import dataclass, field from enum import Enum -from typing import Any, Protocol, TypeVar, Union, cast +from typing import Any, TypeVar, cast from uuid import uuid4 from agent_framework import ( @@ -19,15 +19,13 @@ AgentRunResponseUpdate, ChatClientProtocol, ChatMessage, - FunctionCallContent, - FunctionResultContent, Role, ) from ._base_group_chat_orchestrator import BaseGroupChatOrchestrator from ._checkpoint import CheckpointStorage, WorkflowCheckpoint from ._const import EXECUTOR_STATE_KEY -from ._events import WorkflowEvent +from ._events import AgentRunUpdateEvent, WorkflowEvent from ._executor import Executor, handler from ._group_chat import ( GroupChatBuilder, @@ -104,64 +102,13 @@ def _message_from_payload(payload: Any) -> ChatMessage: raise TypeError("Unable to reconstruct ChatMessage from payload") -# region Unified callback API (developer-facing) +# region Magentic event metadata constants +# Event type identifiers for magentic_event_type in additional_properties +MAGENTIC_EVENT_TYPE_ORCHESTRATOR = "orchestrator_message" +MAGENTIC_EVENT_TYPE_AGENT_DELTA = "agent_delta" -@dataclass -class MagenticOrchestratorMessageEvent(WorkflowEvent): - orchestrator_id: str = "" - message: ChatMessage | None = None - kind: str = "" - - def __post_init__(self) -> None: - super().__init__(data=self.message) - - -@dataclass -class MagenticAgentDeltaEvent(WorkflowEvent): - agent_id: str | None = None - text: str | None = None - function_call_id: str | None = None - function_call_name: str | None = None - function_call_arguments: Any | None = None - function_result_id: str | None = None - function_result: Any | None = None - role: Role | None = None - - def __post_init__(self) -> None: - super().__init__(data=self.text) - - -@dataclass -class MagenticAgentMessageEvent(WorkflowEvent): - agent_id: str = "" - message: ChatMessage | None = None - - def __post_init__(self) -> None: - super().__init__(data=self.message) - - -@dataclass -class MagenticFinalResultEvent(WorkflowEvent): - message: ChatMessage | None = None - - def __post_init__(self) -> None: - super().__init__(data=self.message) - - -MagenticCallbackEvent = Union[ - MagenticOrchestratorMessageEvent, - MagenticAgentDeltaEvent, - MagenticAgentMessageEvent, - MagenticFinalResultEvent, -] - - -class CallbackSink(Protocol): - async def __call__(self, event: MagenticCallbackEvent) -> None: ... - - -# endregion Unified callback API +# endregion Magentic event metadata constants # region Magentic One Prompts @@ -1020,9 +967,8 @@ async def _emit_orchestrator_message( ) -> None: """Emit orchestrator message to the workflow event stream. - Orchestrator messages flow through the unified workflow event stream as - MagenticOrchestratorMessageEvent instances. Consumers should subscribe to - these events via workflow.run_stream(). + Emits an AgentRunUpdateEvent (for agent wrapper consumers) with metadata indicating + the orchestrator event type. Args: ctx: Workflow context for adding events to the stream @@ -1031,15 +977,24 @@ async def _emit_orchestrator_message( Example: async for event in workflow.run_stream("task"): - if isinstance(event, MagenticOrchestratorMessageEvent): - print(f"Orchestrator {event.kind}: {event.message.text}") + if isinstance(event, AgentRunUpdateEvent): + props = event.data.additional_properties if event.data else None + if props and props.get("magentic_event_type") == "orchestrator_message": + kind = props.get("orchestrator_message_kind", "") + print(f"Orchestrator {kind}: {event.data.text}") """ - event = MagenticOrchestratorMessageEvent( - orchestrator_id=self.id, - message=message, - kind=kind, + # Emit AgentRunUpdateEvent with metadata + update = AgentRunResponseUpdate( + text=message.text, + role=message.role, + author_name=self._get_author_name(), + additional_properties={ + "magentic_event_type": MAGENTIC_EVENT_TYPE_ORCHESTRATOR, + "orchestrator_message_kind": kind, + "orchestrator_id": self.id, + }, ) - await ctx.add_event(event) + await ctx.add_event(AgentRunUpdateEvent(executor_id=self.id, data=update)) @override async def on_checkpoint_save(self) -> dict[str, Any]: @@ -1524,7 +1479,6 @@ async def _prepare_final_answer( # Emit a completed event for the workflow await context.yield_output(final_answer) - await context.add_event(MagenticFinalResultEvent(message=final_answer)) async def _check_within_limits_or_complete( self, @@ -1556,7 +1510,6 @@ async def _check_within_limits_or_complete( # Yield the partial result and signal completion await context.yield_output(partial_result) - await context.add_event(MagenticFinalResultEvent(message=partial_result)) return False return True @@ -1733,45 +1686,23 @@ async def _emit_agent_delta_event( ctx: WorkflowContext[Any, Any], update: AgentRunResponseUpdate, ) -> None: - contents = list(getattr(update, "contents", []) or []) - chunk = getattr(update, "text", None) - if not chunk: - chunk = "".join(getattr(item, "text", "") for item in contents if hasattr(item, "text")) - if chunk: - await ctx.add_event( - MagenticAgentDeltaEvent( - agent_id=self._agent_id, - text=chunk or None, - role=getattr(update, "role", None), - ) - ) - for item in contents: - if isinstance(item, FunctionCallContent): - await ctx.add_event( - MagenticAgentDeltaEvent( - agent_id=self._agent_id, - function_call_id=getattr(item, "call_id", None), - function_call_name=getattr(item, "name", None), - function_call_arguments=getattr(item, "arguments", None), - role=getattr(update, "role", None), - ) - ) - elif isinstance(item, FunctionResultContent): - await ctx.add_event( - MagenticAgentDeltaEvent( - agent_id=self._agent_id, - function_result_id=getattr(item, "call_id", None), - function_result=getattr(item, "result", None), - role=getattr(update, "role", None), - ) - ) + # Add metadata to identify this as an agent streaming update + if update.additional_properties is None: + update.additional_properties = {} + update.additional_properties["magentic_event_type"] = MAGENTIC_EVENT_TYPE_AGENT_DELTA + update.additional_properties["agent_id"] = self._agent_id + + # Emit AgentRunUpdateEvent with the agent response update + await ctx.add_event(AgentRunUpdateEvent(executor_id=self._agent_id, data=update)) async def _emit_agent_message_event( self, ctx: WorkflowContext[Any, Any], message: ChatMessage, ) -> None: - await ctx.add_event(MagenticAgentMessageEvent(agent_id=self._agent_id, message=message)) + # Agent message completion is already communicated via streaming updates + # No additional event needed + pass async def _invoke_agent( self, diff --git a/python/packages/core/tests/workflow/test_group_chat.py b/python/packages/core/tests/workflow/test_group_chat.py index ab920d0663..8d1798a7a4 100644 --- a/python/packages/core/tests/workflow/test_group_chat.py +++ b/python/packages/core/tests/workflow/test_group_chat.py @@ -6,19 +6,20 @@ import pytest from agent_framework import ( + MAGENTIC_EVENT_TYPE_AGENT_DELTA, + MAGENTIC_EVENT_TYPE_ORCHESTRATOR, AgentRunResponse, AgentRunResponseUpdate, + AgentRunUpdateEvent, AgentThread, BaseAgent, ChatMessage, GroupChatBuilder, GroupChatDirective, GroupChatStateSnapshot, - MagenticAgentMessageEvent, MagenticBuilder, MagenticContext, MagenticManagerBase, - MagenticOrchestratorMessageEvent, Role, TextContent, Workflow, @@ -155,14 +156,17 @@ async def test_magentic_builder_returns_workflow_and_runs() -> None: assert isinstance(workflow, Workflow) outputs: list[ChatMessage] = [] - orchestrator_events: list[MagenticOrchestratorMessageEvent] = [] - agent_events: list[MagenticAgentMessageEvent] = [] + orchestrator_event_count = 0 + agent_event_count = 0 start_message = _MagenticStartMessage.from_string("compose summary") async for event in workflow.run_stream(start_message): - if isinstance(event, MagenticOrchestratorMessageEvent): - orchestrator_events.append(event) - if isinstance(event, MagenticAgentMessageEvent): - agent_events.append(event) + if isinstance(event, AgentRunUpdateEvent): + props = event.data.additional_properties if event.data else None + event_type = props.get("magentic_event_type") if props else None + if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR: + orchestrator_event_count += 1 + elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA: + agent_event_count += 1 if isinstance(event, WorkflowOutputEvent): msg = event.data if isinstance(msg, ChatMessage): @@ -172,8 +176,8 @@ async def test_magentic_builder_returns_workflow_and_runs() -> None: final = outputs[-1] assert final.text == "final" assert final.author_name == "magentic_manager" - assert orchestrator_events, "Expected orchestrator events to be emitted" - assert agent_events, "Expected agent message events to be emitted" + assert orchestrator_event_count > 0, "Expected orchestrator events to be emitted" + assert agent_event_count > 0, "Expected agent delta events to be emitted" async def test_group_chat_as_agent_accepts_conversation() -> None: diff --git a/python/packages/core/tests/workflow/test_magentic.py b/python/packages/core/tests/workflow/test_magentic.py index cc1e8ad132..7dc8a2c471 100644 --- a/python/packages/core/tests/workflow/test_magentic.py +++ b/python/packages/core/tests/workflow/test_magentic.py @@ -10,13 +10,13 @@ from agent_framework import ( AgentRunResponse, AgentRunResponseUpdate, + AgentRunUpdateEvent, BaseAgent, ChatClientProtocol, ChatMessage, ChatResponse, ChatResponseUpdate, Executor, - MagenticAgentMessageEvent, MagenticBuilder, MagenticManagerBase, MagenticPlanReviewDecision, @@ -561,8 +561,12 @@ async def _collect_agent_responses_setup(participant_obj: object): events.append(ev) if isinstance(ev, WorkflowOutputEvent): break - if isinstance(ev, MagenticAgentMessageEvent) and ev.message is not None: - captured.append(ev.message) + if isinstance(ev, AgentRunUpdateEvent) and ev.data is not None: + captured.append( + ChatMessage( + role=ev.data.role or Role.ASSISTANT, text=ev.data.text or "", author_name=ev.data.author_name + ) + ) if len(events) > 50: break diff --git a/python/samples/getting_started/workflows/agents/magentic_workflow_as_agent.py b/python/samples/getting_started/workflows/agents/magentic_workflow_as_agent.py index 6fab7c495c..091f37c055 100644 --- a/python/samples/getting_started/workflows/agents/magentic_workflow_as_agent.py +++ b/python/samples/getting_started/workflows/agents/magentic_workflow_as_agent.py @@ -4,14 +4,11 @@ import logging from agent_framework import ( + MAGENTIC_EVENT_TYPE_AGENT_DELTA, + MAGENTIC_EVENT_TYPE_ORCHESTRATOR, ChatAgent, HostedCodeInterpreterTool, - MagenticAgentDeltaEvent, - MagenticAgentMessageEvent, MagenticBuilder, - MagenticFinalResultEvent, - MagenticOrchestratorMessageEvent, - WorkflowOutputEvent, ) from agent_framework.openai import OpenAIChatClient, OpenAIResponsesClient @@ -78,58 +75,24 @@ async def main() -> None: print("\nStarting workflow execution...") try: - last_stream_agent_id: str | None = None - stream_line_open: bool = False - final_output: str | None = None - - async for event in workflow.run_stream(task): - if isinstance(event, MagenticOrchestratorMessageEvent): - print(f"\n[ORCH:{event.kind}]\n\n{getattr(event.message, 'text', '')}\n{'-' * 26}") - elif isinstance(event, MagenticAgentDeltaEvent): - if last_stream_agent_id != event.agent_id or not stream_line_open: - if stream_line_open: - print() - print(f"\n[STREAM:{event.agent_id}]: ", end="", flush=True) - last_stream_agent_id = event.agent_id - stream_line_open = True - if event.text: - print(event.text, end="", flush=True) - elif isinstance(event, MagenticAgentMessageEvent): - if stream_line_open: - print(" (final)") - stream_line_open = False - print() - msg = event.message - if msg is not None: - response_text = (msg.text or "").replace("\n", " ") - print(f"\n[AGENT:{event.agent_id}] {msg.role.value}\n\n{response_text}\n{'-' * 26}") - elif isinstance(event, MagenticFinalResultEvent): - print("\n" + "=" * 50) - print("FINAL RESULT:") - print("=" * 50) - if event.message is not None: - print(event.message.text) - print("=" * 50) - elif isinstance(event, WorkflowOutputEvent): - final_output = str(event.data) if event.data is not None else None - - if stream_line_open: - print() - stream_line_open = False - - if final_output is not None: - print(f"\nWorkflow completed with result:\n\n{final_output}\n") - # Wrap the workflow as an agent for composition scenarios + print("\nWrapping workflow as an agent and running...") workflow_agent = workflow.as_agent(name="MagenticWorkflowAgent") - agent_result = await workflow_agent.run(task) - - if agent_result.messages: - print("\n===== as_agent() Transcript =====") - for i, msg in enumerate(agent_result.messages, start=1): - role_value = getattr(msg.role, "value", msg.role) - speaker = msg.author_name or role_value - print(f"{'-' * 50}\n{i:02d} [{speaker}]\n{msg.text}") + async for response in workflow_agent.run_stream(task): + # AgentRunResponseUpdate objects contain the streaming agent data + # Check metadata to understand event type + props = response.additional_properties + event_type = props.get("magentic_event_type") if props else None + + if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR: + kind = props.get("orchestrator_message_kind", "") if props else "" + print(f"\n[ORCHESTRATOR:{kind}] {response.text}") + elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA: + if response.text: + print(response.text, end="", flush=True) + elif response.text: + # Fallback for any other events with text + print(response.text, end="", flush=True) except Exception as e: print(f"Workflow execution failed: {e}") diff --git a/python/samples/getting_started/workflows/orchestration/magentic_human_plan_update.py b/python/samples/getting_started/workflows/orchestration/magentic_human_plan_update.py index 5ba8b5cc23..52df5b24c6 100644 --- a/python/samples/getting_started/workflows/orchestration/magentic_human_plan_update.py +++ b/python/samples/getting_started/workflows/orchestration/magentic_human_plan_update.py @@ -5,13 +5,12 @@ from typing import cast from agent_framework import ( + MAGENTIC_EVENT_TYPE_AGENT_DELTA, + MAGENTIC_EVENT_TYPE_ORCHESTRATOR, + AgentRunUpdateEvent, ChatAgent, HostedCodeInterpreterTool, - MagenticAgentDeltaEvent, - MagenticAgentMessageEvent, MagenticBuilder, - MagenticFinalResultEvent, - MagenticOrchestratorMessageEvent, MagenticPlanReviewDecision, MagenticPlanReviewReply, MagenticPlanReviewRequest, @@ -117,34 +116,25 @@ def on_exception(exception: Exception) -> None: # Collect events from the stream async for event in stream: - if isinstance(event, MagenticOrchestratorMessageEvent): - print(f"\n[ORCH:{event.kind}]\n\n{getattr(event.message, 'text', '')}\n{'-' * 26}") - elif isinstance(event, MagenticAgentDeltaEvent): - if last_stream_agent_id != event.agent_id or not stream_line_open: - if stream_line_open: - print() - print(f"\n[STREAM:{event.agent_id}]: ", end="", flush=True) - last_stream_agent_id = event.agent_id - stream_line_open = True - if event.text: - print(event.text, end="", flush=True) - elif isinstance(event, MagenticAgentMessageEvent): - if stream_line_open: - print(" (final)") - stream_line_open = False - print() - msg = event.message - if msg is not None: - response_text = (msg.text or "").replace("\n", " ") - print(f"\n[AGENT:{event.agent_id}] {msg.role.value}\n\n{response_text}\n{'-' * 26}") - elif isinstance(event, MagenticFinalResultEvent): - print("\n" + "=" * 50) - print("FINAL RESULT:") - print("=" * 50) - if event.message is not None: - print(event.message.text) - print("=" * 50) - if isinstance(event, RequestInfoEvent) and event.request_type is MagenticPlanReviewRequest: + if isinstance(event, AgentRunUpdateEvent): + props = event.data.additional_properties if event.data else None + event_type = props.get("magentic_event_type") if props else None + + if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR: + kind = props.get("orchestrator_message_kind", "") if props else "" + text = event.data.text if event.data else "" + print(f"\n[ORCH:{kind}]\n\n{text}\n{'-' * 26}") + elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA: + agent_id = props.get("agent_id", "unknown") if props else "unknown" + if last_stream_agent_id != agent_id or not stream_line_open: + if stream_line_open: + print() + print(f"\n[STREAM:{agent_id}]: ", end="", flush=True) + last_stream_agent_id = agent_id + stream_line_open = True + if event.data and event.data.text: + print(event.data.text, end="", flush=True) + elif isinstance(event, RequestInfoEvent) and event.request_type is MagenticPlanReviewRequest: pending_request = event review_req = cast(MagenticPlanReviewRequest, event.data) if review_req.plan_text: