Fix async streaming support for Agno Team.arun() - Resolves #1262#1264
Closed
devin-ai-integration[bot] wants to merge 3 commits intomainfrom
Closed
Fix async streaming support for Agno Team.arun() - Resolves #1262#1264devin-ai-integration[bot] wants to merge 3 commits intomainfrom
devin-ai-integration[bot] wants to merge 3 commits intomainfrom
Conversation
Fixes #1262 The issue was that Team.arun() with stream=True returns an async generator directly, not a coroutine. The team async wrapper was defined as 'async def' which caused it to wrap the async generator in a coroutine, leading to the error: 'async for' requires an object with __aiter__ method, got coroutine. Changes: - Added AsyncStreamingResultWrapper class to properly wrap async generators - Modified create_team_async_wrapper to be a regular function (not async) - Added inspect.isasyncgen() check to detect async generators - Updated create_streaming_agent_async_wrapper to check for __aiter__ first The fix ensures that async generators are returned directly without being wrapped in a coroutine, allowing proper async iteration with 'async for'. Co-Authored-By: Alex <meta.alex.r@gmail.com>
Contributor
Author
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
Contributor
Author
|
Closing this PR due to unrelated files being included. Opening a clean PR with only the fix. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
📥 Pull Request
📘 Description
Fixes #1262 - Resolves TypeError when using
async forwith Agno Team.arun() streaming.Root Cause: When
Team.arun(stream=True)is called, it returns an async generator directly (not a coroutine). The team async wrapper was defined asasync def, which wrapped the returned async generator in a coroutine, causing the error:'async for' requires an object with __aiter__ method, got coroutine.Solution:
AsyncStreamingResultWrapperclass that properly implements__aiter__and__anext__for async generator wrappingcreate_team_async_wrapperfromasync defto regulardefto prevent wrapping async generators in coroutinesinspect.isasyncgen()checks to detect and handle async generators correctlycreate_streaming_agent_async_wrapperto check for__aiter__before__iter__Changes (Core Fix):
agentops/instrumentation/agentic/agno/instrumentor.py: Added AsyncStreamingResultWrapper class and modified team/agent async wrappers to properly handle async generators🧪 Testing
Created test script
test_async_streaming_team.pythat reproduces the issue:RunStartedEventshowing async streaming is workingruff checkandruff formatboth passEvidence of Fix:
RuntimeWarning: coroutine 'create_team_async_wrapper.<locals>.wrapper' was never awaitedTypeError: 'async for' requires an object with __aiter__ methodThis PR contains many unrelated files that should NOT be included:
.envrcAction Required: These files should be excluded from this PR and submitted separately if needed.
🔍 Human Review Checklist
High Priority:
AsyncStreamingResultWrapper.__anext__correctly maintains span contextcreate_team_async_wrapperfromasync deftodefis the correct solutioninspect.isasyncgen()is the right check for detecting async generatorsMedium Priority:
5. Verify the fix works for both Team.arun() and Agent.arun() streaming
6. Confirm span cleanup happens correctly in StopAsyncIteration case
7. Check if similar issues exist in other async wrappers (workflow, team_internal)
Session Info: