Skip to content
31 changes: 29 additions & 2 deletions python/packages/core/agent_framework/_workflows/_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,31 @@ def run(
)
return self._run_impl(messages, response_id, session, checkpoint_id, checkpoint_storage, **kwargs)

def _filter_messages(self, chat_messages: list[Message]) -> list[Message]:
"""Return only the last meaningful non-user message from a list of messages.

Args:
chat_messages: The conversation history or workflow output to filter.

Returns:
A single-element list containing the last meaningful non-user message,
or an empty list if none exists.
"""
if not chat_messages:
return []

for msg in reversed(chat_messages):
if msg.role != "user" and msg.text and msg.text.strip():
return [msg]
# fallback: last non-user message
non_user = [m for m in reversed(chat_messages) if m.role != "user"][:1]
if not non_user:
logger.warning(
"_filter_messages: no non-user messages found in list[Message] output. "
"Returning empty list — this likely indicates an unexpected workflow termination state."
)
return non_user

async def _run_impl(
self,
messages: AgentRunInputs,
Expand Down Expand Up @@ -476,8 +501,10 @@ def _convert_workflow_events_to_agent_response(
messages.append(data)
raw_representations.append(data.raw_representation)
elif is_instance_of(data, list[Message]):
chat_messages = cast(list[Message], data)
chat_messages = self._filter_messages(cast(list[Message], data))
messages.extend(chat_messages)
# raw_representations intentionally stores the original unfiltered list —
# it records what the workflow emitted, not what was surfaced to the caller.
raw_representations.append(data)
else:
contents = self._extract_contents(data)
Expand Down Expand Up @@ -593,7 +620,7 @@ def _convert_workflow_event_to_agent_response_updates(
]
if is_instance_of(data, list[Message]):
# Convert each Message to an AgentResponseUpdate
chat_messages = cast(list[Message], data)
chat_messages = self._filter_messages(cast(list[Message], data))
updates = []
for msg in chat_messages:
updates.append(
Expand Down
242 changes: 228 additions & 14 deletions python/packages/core/tests/workflow/test_workflow_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,15 +469,20 @@ async def raw_yielding_executor(
assert updates[2].raw_representation.value == 42

async def test_workflow_as_agent_yield_output_with_list_of_chat_messages(self) -> None:
"""Test that yield_output with list[Message] extracts contents from all messages.
"""Test that yield_output with list[Message] surfaces only the last assistant message.

Note: Content items are coalesced by _finalize_response, so multiple text contents
become a single merged Content in the final response.
When a workflow executor yields a list[Message] (as GroupChat orchestrators
do with self._full_conversation on termination), _filter_messages returns
only the last meaningful assistant message to avoid re-emitting user input
and replaying the full conversation history across turns. See #4261.

Users who need intermediate agent responses can opt in via
intermediate_outputs=True in GroupChatBuilder.
"""

@executor
async def list_yielding_executor(messages: list[Message], ctx: WorkflowContext[Never, list[Message]]) -> None:
# Yield a list of Messages (as SequentialBuilder does)
# Yield a list of Messages (as GroupChat orchestrator does with _full_conversation)
msg_list = [
Message(role="user", text="first message"),
Message(role="assistant", text="second message"),
Expand All @@ -491,25 +496,24 @@ async def list_yielding_executor(messages: list[Message], ctx: WorkflowContext[N
workflow = WorkflowBuilder(start_executor=list_yielding_executor).build()
agent = workflow.as_agent("list-msg-agent")

# Verify streaming returns the update with all 4 contents before coalescing
# Streaming: _filter_messages returns only the last meaningful assistant message
updates: list[AgentResponseUpdate] = []
async for update in agent.run("test", stream=True):
updates.append(update)

assert len(updates) == 3
# Only the last assistant message should be surfaced (user messages filtered,
# earlier assistant messages treated as conversation history replay)
assert len(updates) == 1
full_response = AgentResponse.from_updates(updates)
assert len(full_response.messages) == 3
texts = [message.text for message in full_response.messages]
# Note: `from_agent_run_response_updates` coalesces multiple text contents into one content
assert texts == ["first message", "second message", "thirdfourth"]
assert len(full_response.messages) == 1
assert full_response.messages[0].text == "thirdfourth"

# Verify run()
# Non-streaming: same filtering applies
result = await agent.run("test")

assert isinstance(result, AgentResponse)
assert len(result.messages) == 3
texts = [message.text for message in result.messages]
assert texts == ["first message", "second message", "third fourth"]
assert len(result.messages) == 1
assert result.messages[0].text == "third fourth"

async def test_session_conversation_history_included_in_workflow_run(self) -> None:
"""Test that messages provided to agent.run() are passed through to the workflow."""
Expand Down Expand Up @@ -1296,3 +1300,213 @@ def test_merge_updates_function_result_no_matching_call(self):

# Order: text (user), text (assistant), function_result (orphan at end)
assert content_types == ["text", "text", "function_result"]


class TestWorkflowAgentUserInputFilteringRegression:
"""Regression tests for #4261: user input must not compound across successive turns.

When a GroupChat orchestrator terminates, it yields self._full_conversation via
ctx.yield_output(self._full_conversation). This is a list[Message] containing the
entire conversation history — both user inputs and all prior assistant responses.

Without filtering, WorkflowAgent's _convert_workflow_event_to_agent_response_updates
(streaming) and _convert_workflow_events_to_agent_response (non-streaming) forward
all messages verbatim, causing user inputs and earlier assistant responses to
accumulate in the output on every successive turn.

_filter_messages fixes this by returning only the last meaningful assistant message
from list[Message] output, aligning with GroupChatBuilder's default behavior of
intermediate_outputs=False where only the orchestrator's final summary is surfaced.
Users who need intermediate agent responses can opt in via intermediate_outputs=True.

These tests use a class-based Executor (rather than the @executor decorator) to
ensure generic type annotations on WorkflowContext[Never, list[Message]] resolve
correctly at runtime, so is_instance_of(data, list[Message]) hits the right branch.
"""

async def test_streaming_compounding_not_observed_across_turns(self):
"""Regression: turn 1's user input must not appear in turn 2's streamed response."""

class GroupChatLikeExecutor(Executor):
"""Simulates a GroupChat orchestrator's termination behavior.

On termination, BaseGroupChatOrchestrator yields self._full_conversation
via ctx.yield_output(self._full_conversation), which is a list[Message]
containing the entire conversation history (both user and assistant messages).
This executor replicates that exact pattern to exercise the list[Message]
branch in _filter_messages and verify that user inputs do not compound
across successive turns (see #4261).
"""

@handler
async def handle_messages(
self,
messages: list[Message],
ctx: WorkflowContext[Never, list[Message]],
) -> None:
input_text = messages[-1].text or ""
full_conversation: list[Message] = [
Message(role="user", text=input_text),
Message(
role="assistant",
contents=[Content.from_text(text=f"Answer to: {input_text}")],
author_name="Principal",
),
]
await ctx.yield_output(full_conversation)

groupchat_executor = GroupChatLikeExecutor(id="groupchat")
workflow = WorkflowBuilder(start_executor=groupchat_executor).build()
agent = workflow.as_agent("groupchat-agent")
session = AgentSession()

# Turn 1
updates1: list[AgentResponseUpdate] = []
async for chunk in agent.run("first_question", stream=True, session=session):
updates1.append(chunk)

# Turn 2: "first_question" must NOT bleed into turn 2's streamed output
updates2: list[AgentResponseUpdate] = []
async for chunk in agent.run("second_question", stream=True, session=session):
updates2.append(chunk)

text2 = " ".join(u.text or "" for u in updates2 if u.text)
assert "first_question" not in text2, (
"Turn 1 user input should not appear in turn 2 streaming output (compounding regression)"
)
assert "Answer to: second_question" in text2

async def test_nonstreaming_compounding_not_observed_across_turns(self):
"""Regression: turn 1's user input must not appear in turn 2's response."""

class GroupChatLikeExecutor(Executor):
"""Simulates a GroupChat orchestrator's termination behavior.

On termination, BaseGroupChatOrchestrator yields self._full_conversation
via ctx.yield_output(self._full_conversation), which is a list[Message]
containing the entire conversation history (both user and assistant messages).
This executor replicates that exact pattern to exercise the list[Message]
branch in _filter_messages and verify that user inputs do not compound
across successive turns (see #4261).
"""

@handler
async def handle_messages(
self,
messages: list[Message],
ctx: WorkflowContext[Never, list[Message]],
) -> None:
input_text = messages[-1].text or ""
full_conversation: list[Message] = [
Message(role="user", text=input_text),
Message(
role="assistant",
contents=[Content.from_text(text=f"Answer to: {input_text}")],
author_name="Principal",
),
]
await ctx.yield_output(full_conversation)

groupchat_executor = GroupChatLikeExecutor(id="groupchat")
workflow = WorkflowBuilder(start_executor=groupchat_executor).build()
agent = workflow.as_agent("groupchat-agent")
session = AgentSession()

# Turn 1
await agent.run("first_question", session=session)

# Turn 2: "first_question" must NOT bleed into turn 2's response
result2 = await agent.run("second_question", session=session)
text2 = " ".join(m.text or "" for m in result2.messages)
assert "first_question" not in text2, (
"Turn 1 user input should not appear in turn 2 response (compounding regression)"
)
assert "Answer to: second_question" in text2


class TestFilterMessages:
"""Direct unit tests for WorkflowAgent._filter_messages edge cases.

Covers empty input, all-user messages, assistant messages with no/whitespace text,
mixed roles, and ordering. The all-user and empty cases both hit the
`if not non_user: return chat_messages` fallback path, returning the original
list unchanged rather than silently dropping output (see moonbox3's review on #4268).
"""

def _make_agent(self) -> WorkflowAgent:
@executor
async def _e(messages: list[Message], ctx: WorkflowContext[Never, str]) -> None:
await ctx.yield_output("x")

workflow = WorkflowBuilder(start_executor=_e).build()
return WorkflowAgent(workflow=workflow)

def test_empty_list_returns_empty(self):
agent = self._make_agent()
assert agent._filter_messages([]) == []

def test_single_assistant_message_empty_text(self):
"""Return the single assistant message as-is when it's the only message, even if it has no text"""
agent = self._make_agent()
msg = Message(role="assistant", text="")
assert agent._filter_messages([msg]) == [msg]

def test_single_assistant_message_whitespace_text(self):
"""Return the single assistant message as-is when it's the only message, even if it has only whitespace."""
agent = self._make_agent()
msg = Message(role="assistant", text=" ")
assert agent._filter_messages([msg]) == [msg]

def test_single_assistant_message_none_text(self):
"""Return the single assistant message as-is when it's the only message, even if it has None text."""
agent = self._make_agent()
msg = Message(role="assistant", text=None)
assert agent._filter_messages([msg]) == [msg]

def test_single_assistant_message_returned(self):
"""Return the single assistant message as-is when it's the only message"""
agent = self._make_agent()
msg = Message(role="assistant", text="Hello")
assert agent._filter_messages([msg]) == [msg]

def test_all_user_messages_returns_empty_list(self):
"""All-user input: no assistant content exists to surface, returns empty list."""

agent = self._make_agent()
msgs = [Message(role="user", text="hi"), Message(role="user", text="hello")]
result = agent._filter_messages(msgs)
assert result == []

def test_mixed_roles_returns_last_assistant(self):
agent = self._make_agent()
msgs = [
Message(role="user", text="q1"),
Message(role="assistant", text="a1"),
Message(role="user", text="q2"),
Message(role="assistant", text="a2"), # should be returned
]
result = agent._filter_messages(msgs)
assert len(result) == 1
assert result[0].text == "a2"

def test_assistant_with_none_text_falls_through_to_next(self):
agent = self._make_agent()
msgs = [
Message(role="assistant", text="a1"),
Message(role="assistant", text=None),
Message(role="assistant", text=" "),
]
# The last non-user message is whitespace-only, falls to non-text fallback
result = agent._filter_messages(msgs)
assert len(result) == 1 # fallback picks last non-user message

def test_returns_last_not_first_assistant(self):
agent = self._make_agent()
msgs = [
Message(role="assistant", text="First response"),
Message(role="user", text="follow up"),
Message(role="assistant", text="Second response"),
]
result = agent._filter_messages(msgs)
assert len(result) == 1
assert result[0].text == "Second response"