Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions python/packages/core/agent_framework/_workflows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,15 @@
)
from ._handoff import HandoffBuilder, HandoffUserInputRequest
from ._magentic import (
MagenticAgentDeltaEvent,
MagenticAgentMessageEvent,
MAGENTIC_EVENT_TYPE_AGENT_DELTA,
MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
ORCH_MSG_KIND_INSTRUCTION,
ORCH_MSG_KIND_NOTICE,
ORCH_MSG_KIND_TASK_LEDGER,
ORCH_MSG_KIND_USER_TASK,
MagenticBuilder,
MagenticContext,
MagenticFinalResultEvent,
MagenticManagerBase,
MagenticOrchestratorMessageEvent,
MagenticPlanReviewDecision,
MagenticPlanReviewReply,
MagenticPlanReviewRequest,
Expand Down Expand Up @@ -104,6 +106,12 @@
"DEFAULT_MANAGER_INSTRUCTIONS",
"DEFAULT_MANAGER_STRUCTURED_OUTPUT_PROMPT",
"DEFAULT_MAX_ITERATIONS",
"MAGENTIC_EVENT_TYPE_AGENT_DELTA",
"MAGENTIC_EVENT_TYPE_ORCHESTRATOR",
"ORCH_MSG_KIND_INSTRUCTION",
"ORCH_MSG_KIND_NOTICE",
"ORCH_MSG_KIND_TASK_LEDGER",
"ORCH_MSG_KIND_USER_TASK",
"AgentExecutor",
"AgentExecutorRequest",
"AgentExecutorResponse",
Expand Down Expand Up @@ -132,13 +140,9 @@
"HandoffUserInputRequest",
"InMemoryCheckpointStorage",
"InProcRunnerContext",
"MagenticAgentDeltaEvent",
"MagenticAgentMessageEvent",
"MagenticBuilder",
"MagenticContext",
"MagenticFinalResultEvent",
"MagenticManagerBase",
"MagenticOrchestratorMessageEvent",
"MagenticPlanReviewDecision",
"MagenticPlanReviewReply",
"MagenticPlanReviewRequest",
Expand Down
8 changes: 4 additions & 4 deletions python/packages/core/agent_framework/_workflows/_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,9 @@ def _convert_workflow_event_to_agent_update(
) -> AgentRunResponseUpdate | None:
"""Convert a workflow event to an AgentRunResponseUpdate.

Only AgentRunUpdateEvent and RequestInfoEvent are processed and the rest
are not relevant. Returns None if the event is not relevant.
Only AgentRunUpdateEvent and RequestInfoEvent are processed.
Other workflow events are ignored as they are workflow-internal and should
have corresponding AgentRunUpdateEvent emissions if relevant to agent consumers.
"""
match event:
case AgentRunUpdateEvent(data=update):
Expand Down Expand Up @@ -271,9 +272,8 @@ def _convert_workflow_event_to_agent_update(
created_at=datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
)
case _:
# Ignore non-agent workflow events
# Ignore workflow-internal events
pass
# We only care about the above two events and discard the rest.
return None

def _extract_function_responses(self, input_messages: list[ChatMessage]) -> dict[str, Any]:
Expand Down
141 changes: 36 additions & 105 deletions python/packages/core/agent_framework/_workflows/_magentic.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from collections.abc import AsyncIterable, Sequence
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Protocol, TypeVar, Union, cast
from typing import Any, TypeVar, cast
from uuid import uuid4

from agent_framework import (
Expand All @@ -19,15 +19,13 @@
AgentRunResponseUpdate,
ChatClientProtocol,
ChatMessage,
FunctionCallContent,
FunctionResultContent,
Role,
)

from ._base_group_chat_orchestrator import BaseGroupChatOrchestrator
from ._checkpoint import CheckpointStorage, WorkflowCheckpoint
from ._const import EXECUTOR_STATE_KEY
from ._events import WorkflowEvent
from ._events import AgentRunUpdateEvent, WorkflowEvent
from ._executor import Executor, handler
from ._group_chat import (
GroupChatBuilder,
Expand Down Expand Up @@ -104,64 +102,13 @@ def _message_from_payload(payload: Any) -> ChatMessage:
raise TypeError("Unable to reconstruct ChatMessage from payload")


# region Unified callback API (developer-facing)
# region Magentic event metadata constants

# Event type identifiers for magentic_event_type in additional_properties
MAGENTIC_EVENT_TYPE_ORCHESTRATOR = "orchestrator_message"
MAGENTIC_EVENT_TYPE_AGENT_DELTA = "agent_delta"

@dataclass
class MagenticOrchestratorMessageEvent(WorkflowEvent):
orchestrator_id: str = ""
message: ChatMessage | None = None
kind: str = ""

def __post_init__(self) -> None:
super().__init__(data=self.message)


@dataclass
class MagenticAgentDeltaEvent(WorkflowEvent):
agent_id: str | None = None
text: str | None = None
function_call_id: str | None = None
function_call_name: str | None = None
function_call_arguments: Any | None = None
function_result_id: str | None = None
function_result: Any | None = None
role: Role | None = None

def __post_init__(self) -> None:
super().__init__(data=self.text)


@dataclass
class MagenticAgentMessageEvent(WorkflowEvent):
agent_id: str = ""
message: ChatMessage | None = None

def __post_init__(self) -> None:
super().__init__(data=self.message)


@dataclass
class MagenticFinalResultEvent(WorkflowEvent):
message: ChatMessage | None = None

def __post_init__(self) -> None:
super().__init__(data=self.message)


MagenticCallbackEvent = Union[
MagenticOrchestratorMessageEvent,
MagenticAgentDeltaEvent,
MagenticAgentMessageEvent,
MagenticFinalResultEvent,
]


class CallbackSink(Protocol):
async def __call__(self, event: MagenticCallbackEvent) -> None: ...


# endregion Unified callback API
# endregion Magentic event metadata constants

# region Magentic One Prompts

Expand Down Expand Up @@ -1020,9 +967,8 @@ async def _emit_orchestrator_message(
) -> None:
"""Emit orchestrator message to the workflow event stream.

Orchestrator messages flow through the unified workflow event stream as
MagenticOrchestratorMessageEvent instances. Consumers should subscribe to
these events via workflow.run_stream().
Emits an AgentRunUpdateEvent (for agent wrapper consumers) with metadata indicating
the orchestrator event type.

Args:
ctx: Workflow context for adding events to the stream
Expand All @@ -1031,15 +977,24 @@ async def _emit_orchestrator_message(

Example:
async for event in workflow.run_stream("task"):
if isinstance(event, MagenticOrchestratorMessageEvent):
print(f"Orchestrator {event.kind}: {event.message.text}")
if isinstance(event, AgentRunUpdateEvent):
props = event.data.additional_properties if event.data else None
if props and props.get("magentic_event_type") == "orchestrator_message":
kind = props.get("orchestrator_message_kind", "")
print(f"Orchestrator {kind}: {event.data.text}")
"""
event = MagenticOrchestratorMessageEvent(
orchestrator_id=self.id,
message=message,
kind=kind,
# Emit AgentRunUpdateEvent with metadata
update = AgentRunResponseUpdate(
text=message.text,
role=message.role,
author_name=self._get_author_name(),
additional_properties={
"magentic_event_type": MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
"orchestrator_message_kind": kind,
"orchestrator_id": self.id,
},
)
await ctx.add_event(event)
await ctx.add_event(AgentRunUpdateEvent(executor_id=self.id, data=update))

@override
async def on_checkpoint_save(self) -> dict[str, Any]:
Expand Down Expand Up @@ -1524,7 +1479,6 @@ async def _prepare_final_answer(

# Emit a completed event for the workflow
await context.yield_output(final_answer)
await context.add_event(MagenticFinalResultEvent(message=final_answer))

async def _check_within_limits_or_complete(
self,
Expand Down Expand Up @@ -1556,7 +1510,6 @@ async def _check_within_limits_or_complete(

# Yield the partial result and signal completion
await context.yield_output(partial_result)
await context.add_event(MagenticFinalResultEvent(message=partial_result))
return False

return True
Expand Down Expand Up @@ -1733,45 +1686,23 @@ async def _emit_agent_delta_event(
ctx: WorkflowContext[Any, Any],
update: AgentRunResponseUpdate,
) -> None:
contents = list(getattr(update, "contents", []) or [])
chunk = getattr(update, "text", None)
if not chunk:
chunk = "".join(getattr(item, "text", "") for item in contents if hasattr(item, "text"))
if chunk:
await ctx.add_event(
MagenticAgentDeltaEvent(
agent_id=self._agent_id,
text=chunk or None,
role=getattr(update, "role", None),
)
)
for item in contents:
if isinstance(item, FunctionCallContent):
await ctx.add_event(
MagenticAgentDeltaEvent(
agent_id=self._agent_id,
function_call_id=getattr(item, "call_id", None),
function_call_name=getattr(item, "name", None),
function_call_arguments=getattr(item, "arguments", None),
role=getattr(update, "role", None),
)
)
elif isinstance(item, FunctionResultContent):
await ctx.add_event(
MagenticAgentDeltaEvent(
agent_id=self._agent_id,
function_result_id=getattr(item, "call_id", None),
function_result=getattr(item, "result", None),
role=getattr(update, "role", None),
)
)
# Add metadata to identify this as an agent streaming update
if update.additional_properties is None:
update.additional_properties = {}
update.additional_properties["magentic_event_type"] = MAGENTIC_EVENT_TYPE_AGENT_DELTA
update.additional_properties["agent_id"] = self._agent_id

# Emit AgentRunUpdateEvent with the agent response update
await ctx.add_event(AgentRunUpdateEvent(executor_id=self._agent_id, data=update))

async def _emit_agent_message_event(
self,
ctx: WorkflowContext[Any, Any],
message: ChatMessage,
) -> None:
await ctx.add_event(MagenticAgentMessageEvent(agent_id=self._agent_id, message=message))
# Agent message completion is already communicated via streaming updates
# No additional event needed
pass

async def _invoke_agent(
self,
Expand Down
24 changes: 14 additions & 10 deletions python/packages/core/tests/workflow/test_group_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@
import pytest

from agent_framework import (
MAGENTIC_EVENT_TYPE_AGENT_DELTA,
MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
AgentRunResponse,
AgentRunResponseUpdate,
AgentRunUpdateEvent,
AgentThread,
BaseAgent,
ChatMessage,
GroupChatBuilder,
GroupChatDirective,
GroupChatStateSnapshot,
MagenticAgentMessageEvent,
MagenticBuilder,
MagenticContext,
MagenticManagerBase,
MagenticOrchestratorMessageEvent,
Role,
TextContent,
Workflow,
Expand Down Expand Up @@ -155,14 +156,17 @@ async def test_magentic_builder_returns_workflow_and_runs() -> None:
assert isinstance(workflow, Workflow)

outputs: list[ChatMessage] = []
orchestrator_events: list[MagenticOrchestratorMessageEvent] = []
agent_events: list[MagenticAgentMessageEvent] = []
orchestrator_event_count = 0
agent_event_count = 0
start_message = _MagenticStartMessage.from_string("compose summary")
async for event in workflow.run_stream(start_message):
if isinstance(event, MagenticOrchestratorMessageEvent):
orchestrator_events.append(event)
if isinstance(event, MagenticAgentMessageEvent):
agent_events.append(event)
if isinstance(event, AgentRunUpdateEvent):
props = event.data.additional_properties if event.data else None
event_type = props.get("magentic_event_type") if props else None
if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR:
orchestrator_event_count += 1
elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA:
agent_event_count += 1
if isinstance(event, WorkflowOutputEvent):
msg = event.data
if isinstance(msg, ChatMessage):
Expand All @@ -172,8 +176,8 @@ async def test_magentic_builder_returns_workflow_and_runs() -> None:
final = outputs[-1]
assert final.text == "final"
assert final.author_name == "magentic_manager"
assert orchestrator_events, "Expected orchestrator events to be emitted"
assert agent_events, "Expected agent message events to be emitted"
assert orchestrator_event_count > 0, "Expected orchestrator events to be emitted"
assert agent_event_count > 0, "Expected agent delta events to be emitted"


async def test_group_chat_as_agent_accepts_conversation() -> None:
Expand Down
10 changes: 7 additions & 3 deletions python/packages/core/tests/workflow/test_magentic.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@
from agent_framework import (
AgentRunResponse,
AgentRunResponseUpdate,
AgentRunUpdateEvent,
BaseAgent,
ChatClientProtocol,
ChatMessage,
ChatResponse,
ChatResponseUpdate,
Executor,
MagenticAgentMessageEvent,
MagenticBuilder,
MagenticManagerBase,
MagenticPlanReviewDecision,
Expand Down Expand Up @@ -561,8 +561,12 @@ async def _collect_agent_responses_setup(participant_obj: object):
events.append(ev)
if isinstance(ev, WorkflowOutputEvent):
break
if isinstance(ev, MagenticAgentMessageEvent) and ev.message is not None:
captured.append(ev.message)
if isinstance(ev, AgentRunUpdateEvent) and ev.data is not None:
captured.append(
ChatMessage(
role=ev.data.role or Role.ASSISTANT, text=ev.data.text or "", author_name=ev.data.author_name
)
)
if len(events) > 50:
break

Expand Down
Loading
Loading