Skip to content

Python: [Bug]: agent_framework_ag_ui - WorkflowAgent async generator incompatible with run_agent_stream #3905

@leelakarthik

Description

@leelakarthik

Description

Summary

The run_agent_stream function in agent_framework_ag_ui/_run.py (line ~877) fails when used with WorkflowAgent because it attempts to await an async generator, which is not a valid operation.

Error

TypeError: object async_generator can't be used in 'await' expression

Root Cause

When workflow.as_agent() creates a WorkflowAgent and its run() method is called with stream=True, it returns an async generator directly. However, the current code in run_agent_stream only handles two return types:

  1. ResponseStream (direct)
  2. Awaitable[ResponseStream] (needs await)

It doesn't handle the third case: async generators (which have __anext__ but aren't awaitable coroutines).

Current Code (Lines ~873-885 in _run.py)

response_stream = agent.run(messages, stream=True, **run_kwargs)
if isinstance(response_stream, ResponseStream):
    stream = response_stream
else:
    stream = await cast(Awaitable[ResponseStream[Any, Any]], response_stream)
    # ^^^ FAILS HERE when response_stream is an async generator
    if not isinstance(stream, ResponseStream):
        raise AgentExecutionException("Chat client did not return a ResponseStream.")

async for update in stream:

Expected Behavior

WorkflowAgent.run() should work seamlessly with the AG-UI endpoint registration via add_agent_framework_fastapi_endpoint().

Actual Behavior

Application crashes with TypeError when streaming from a WorkflowAgent registered through the AG-UI bridge.

Proposed Fix

Add a check for async generators before attempting to await:

response_stream = agent.run(messages, stream=True, **run_kwargs)

# Handle three possible return types:
# 1. ResponseStream (direct stream object)
# 2. Awaitable[ResponseStream] (needs await first)  
# 3. AsyncGenerator (from WorkflowAgent - has __anext__)

if hasattr(response_stream, '__anext__'):
    # It's already an async generator - use directly
    stream = response_stream
elif isinstance(response_stream, ResponseStream):
    stream = response_stream
else:
    # It's awaitable - await it first
    stream = await cast(Awaitable[ResponseStream[Any, Any]], response_stream)
    if not isinstance(stream, ResponseStream):
        raise AgentExecutionException("Chat client did not return a ResponseStream.")

async for update in stream:

Impact

  • Severity: High - blocks WorkflowAgent usage with AG-UI
  • Workaround: Create custom wrapper agents or custom endpoints (bypassing add_agent_framework_fastapi_endpoint)
  • Affected Components: agent_framework_ag_ui._run.run_agent_stream

Code Sample

### Minimal Reproduction


from agent_framework import Agent, Workflow
from agent_framework_ag_ui import add_agent_framework_fastapi_endpoint
from fastapi import FastAPI

# Create a simple workflow
workflow = Workflow()
# ... configure workflow with agents

# Convert to WorkflowAgent
workflow_agent = workflow.as_agent(name="TestWorkflow")

# Try to register with AG-UI (this will fail)
app = FastAPI()
add_agent_framework_fastapi_endpoint(
    app=app,
    agent=workflow_agent,  # WorkflowAgent
    path="/test"
)

# When a request is made to /test, it crashes with:
# TypeError: object async_generator can't be used in 'await' expression

Error Messages / Stack Traces

ERROR:    Exception in ASGI application
  | ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "agent_framework_ag_ui/_endpoint.py", line 85, in event_generator
    |     async for event in wrapped_agent.run_agent(input_data):
    |   File "agent_framework_ag_ui/_agent.py", line 116, in run_agent
    |     async for event in run_agent_stream(input_data, self.agent, self.config):
    |   File "agent_framework_ag_ui/_run.py", line 877, in run_agent_stream
    |     stream = await cast(Awaitable[ResponseStream[Any, Any]], response_stream)
    |              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    | TypeError: object async_generator can't be used in 'await' expression


### Environment
- **Python Version**: 3.11
- **agent-framework Version**: 1.0.0b260210
- **agent-framework-ag-ui Version**: 1.0.0b260210
- **Operating System**: macOS

Package Versions

agent-framework-* 1.0.0b260210

Python Version

3.11

Additional Context

This issue occurs specifically when using Workflow.as_agent() to create a WorkflowAgent and then registering it via add_agent_framework_fastapi_endpoint(). Regular agents (not from workflows) work correctly.

The fix is straightforward - just add a check for __anext__ before attempting to await, as async generators are already iterable and don't need awaiting.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions