Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions python/packages/core/agent_framework/_workflows/_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,10 @@ async def _run_workflow_with_tracing(

span.add_event(OtelAttr.WORKFLOW_COMPLETED)
except Exception as exc:
# Drain any pending events (for example, ExecutorFailedEvent) before yielding WorkflowFailedEvent
for event in await self._runner.context.drain_events():
yield event

# Surface structured failure details before propagating exception
details = WorkflowErrorDetails.from_exception(exc)
with _framework_event_origin():
Expand Down
47 changes: 47 additions & 0 deletions python/packages/core/tests/workflow/test_workflow_states.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ async def test_executor_failed_and_workflow_failed_events_streaming():
async for ev in wf.run_stream(0):
events.append(ev)

# ExecutorFailedEvent should be emitted before WorkflowFailedEvent
executor_failed_events = [e for e in events if isinstance(e, ExecutorFailedEvent)]
assert executor_failed_events, "ExecutorFailedEvent should be emitted when start executor fails"
assert executor_failed_events[0].executor_id == "f"
assert executor_failed_events[0].origin is WorkflowEventSource.FRAMEWORK

# Workflow-level failure and FAILED status should be surfaced
failed_events = [e for e in events if isinstance(e, WorkflowFailedEvent)]
assert failed_events
Expand All @@ -47,6 +53,11 @@ async def test_executor_failed_and_workflow_failed_events_streaming():
assert status and status[-1].state == WorkflowRunState.FAILED
assert all(e.origin is WorkflowEventSource.FRAMEWORK for e in status)

# Verify ExecutorFailedEvent comes before WorkflowFailedEvent
executor_failed_idx = events.index(executor_failed_events[0])
workflow_failed_idx = events.index(failed_events[0])
assert executor_failed_idx < workflow_failed_idx, "ExecutorFailedEvent should be emitted before WorkflowFailedEvent"


async def test_executor_failed_event_emitted_on_direct_execute():
failing = FailingExecutor(id="f")
Expand All @@ -65,6 +76,42 @@ async def test_executor_failed_event_emitted_on_direct_execute():
assert all(e.origin is WorkflowEventSource.FRAMEWORK for e in failed)


class PassthroughExecutor(Executor):
"""Executor that passes message to the next executor."""

@handler
async def passthrough(self, msg: int, ctx: WorkflowContext[int]) -> None:
await ctx.send_message(msg)


async def test_executor_failed_event_from_second_executor_in_chain():
"""Test that ExecutorFailedEvent is emitted when a non-start executor fails."""
passthrough = PassthroughExecutor(id="passthrough")
failing = FailingExecutor(id="failing")
wf: Workflow = WorkflowBuilder().set_start_executor(passthrough).add_edge(passthrough, failing).build()

events: list[object] = []
with pytest.raises(RuntimeError, match="boom"):
async for ev in wf.run_stream(0):
events.append(ev)

# ExecutorFailedEvent should be emitted for the failing executor
executor_failed_events = [e for e in events if isinstance(e, ExecutorFailedEvent)]
assert executor_failed_events, "ExecutorFailedEvent should be emitted when second executor fails"
assert executor_failed_events[0].executor_id == "failing"
assert executor_failed_events[0].origin is WorkflowEventSource.FRAMEWORK

# Workflow-level failure should also be surfaced
failed_events = [e for e in events if isinstance(e, WorkflowFailedEvent)]
assert failed_events
assert all(e.origin is WorkflowEventSource.FRAMEWORK for e in failed_events)

# Verify ExecutorFailedEvent comes before WorkflowFailedEvent
executor_failed_idx = events.index(executor_failed_events[0])
workflow_failed_idx = events.index(failed_events[0])
assert executor_failed_idx < workflow_failed_idx, "ExecutorFailedEvent should be emitted before WorkflowFailedEvent"


class SimpleExecutor(Executor):
"""Executor that does nothing, for testing."""

Expand Down
Loading