diff --git a/python/packages/core/agent_framework/_workflows/_workflow.py b/python/packages/core/agent_framework/_workflows/_workflow.py index a14542b2a6..eb22d7c330 100644 --- a/python/packages/core/agent_framework/_workflows/_workflow.py +++ b/python/packages/core/agent_framework/_workflows/_workflow.py @@ -370,6 +370,10 @@ async def _run_workflow_with_tracing( span.add_event(OtelAttr.WORKFLOW_COMPLETED) except Exception as exc: + # Drain any pending events (for example, ExecutorFailedEvent) before yielding WorkflowFailedEvent + for event in await self._runner.context.drain_events(): + yield event + # Surface structured failure details before propagating exception details = WorkflowErrorDetails.from_exception(exc) with _framework_event_origin(): diff --git a/python/packages/core/tests/workflow/test_workflow_states.py b/python/packages/core/tests/workflow/test_workflow_states.py index 4e88ed26cb..53baf86383 100644 --- a/python/packages/core/tests/workflow/test_workflow_states.py +++ b/python/packages/core/tests/workflow/test_workflow_states.py @@ -39,6 +39,12 @@ async def test_executor_failed_and_workflow_failed_events_streaming(): async for ev in wf.run_stream(0): events.append(ev) + # ExecutorFailedEvent should be emitted before WorkflowFailedEvent + executor_failed_events = [e for e in events if isinstance(e, ExecutorFailedEvent)] + assert executor_failed_events, "ExecutorFailedEvent should be emitted when start executor fails" + assert executor_failed_events[0].executor_id == "f" + assert executor_failed_events[0].origin is WorkflowEventSource.FRAMEWORK + # Workflow-level failure and FAILED status should be surfaced failed_events = [e for e in events if isinstance(e, WorkflowFailedEvent)] assert failed_events @@ -47,6 +53,11 @@ async def test_executor_failed_and_workflow_failed_events_streaming(): assert status and status[-1].state == WorkflowRunState.FAILED assert all(e.origin is WorkflowEventSource.FRAMEWORK for e in status) + # Verify ExecutorFailedEvent comes before WorkflowFailedEvent + executor_failed_idx = events.index(executor_failed_events[0]) + workflow_failed_idx = events.index(failed_events[0]) + assert executor_failed_idx < workflow_failed_idx, "ExecutorFailedEvent should be emitted before WorkflowFailedEvent" + async def test_executor_failed_event_emitted_on_direct_execute(): failing = FailingExecutor(id="f") @@ -65,6 +76,42 @@ async def test_executor_failed_event_emitted_on_direct_execute(): assert all(e.origin is WorkflowEventSource.FRAMEWORK for e in failed) +class PassthroughExecutor(Executor): + """Executor that passes message to the next executor.""" + + @handler + async def passthrough(self, msg: int, ctx: WorkflowContext[int]) -> None: + await ctx.send_message(msg) + + +async def test_executor_failed_event_from_second_executor_in_chain(): + """Test that ExecutorFailedEvent is emitted when a non-start executor fails.""" + passthrough = PassthroughExecutor(id="passthrough") + failing = FailingExecutor(id="failing") + wf: Workflow = WorkflowBuilder().set_start_executor(passthrough).add_edge(passthrough, failing).build() + + events: list[object] = [] + with pytest.raises(RuntimeError, match="boom"): + async for ev in wf.run_stream(0): + events.append(ev) + + # ExecutorFailedEvent should be emitted for the failing executor + executor_failed_events = [e for e in events if isinstance(e, ExecutorFailedEvent)] + assert executor_failed_events, "ExecutorFailedEvent should be emitted when second executor fails" + assert executor_failed_events[0].executor_id == "failing" + assert executor_failed_events[0].origin is WorkflowEventSource.FRAMEWORK + + # Workflow-level failure should also be surfaced + failed_events = [e for e in events if isinstance(e, WorkflowFailedEvent)] + assert failed_events + assert all(e.origin is WorkflowEventSource.FRAMEWORK for e in failed_events) + + # Verify ExecutorFailedEvent comes before WorkflowFailedEvent + executor_failed_idx = events.index(executor_failed_events[0]) + workflow_failed_idx = events.index(failed_events[0]) + assert executor_failed_idx < workflow_failed_idx, "ExecutorFailedEvent should be emitted before WorkflowFailedEvent" + + class SimpleExecutor(Executor): """Executor that does nothing, for testing."""