Skip to content

feat: add streaming to direct tool calls#1955

Open
emaan-c wants to merge 1 commit intostrands-agents:mainfrom
emaan-c:feat/add-streaming-to-tool-calls
Open

feat: add streaming to direct tool calls#1955
emaan-c wants to merge 1 commit intostrands-agents:mainfrom
emaan-c:feat/add-streaming-to-tool-calls

Conversation

@emaan-c
Copy link
Copy Markdown

@emaan-c emaan-c commented Mar 23, 2026

Description

Direct tool calls (agent.tool.tool_name()) currently block without providing streaming events, while agent-level calls expose rich streaming from the same underlying infrastructure. This creates inconsistent developer experience and prevents building responsive UIs for long-running operations, multi-agent systems, and debugging workflows.

This adds stream() and stream_async() methods to tool calls, enabling real-time observability without recording to message history.

Resolves: #1436

Public API Changes

Tool calls now support three execution modes:

# Synchronous (unchanged - backward compatible)
result = agent.tool.fetch_data(query="users")

# NEW: Sync streaming
for event in agent.tool.fetch_data.stream(query="users"):
    print(f"Progress: {event.get('type')}")

# NEW: Async streaming
async for event in agent.tool.fetch_data.stream_async(query="users"):
    await ui.update_progress(event)

Streaming methods yield the same events as ToolExecutor._stream() without recording to message history. 100% backward compatible.

Use Cases

  • Multi-agent composition: Stream sub-agent execution with full visibility
  • Real-time progress: Display live indicators during long-running operations
  • Debugging: Inspect execution flow without polluting conversation history

Related Issues

#1436

Documentation PR

strands-agents/docs#754

Type of Change

New feature

Testing

  • I ran hatch run prepare

Checklist

  • I have read the CONTRIBUTING document
  • I have added any necessary tests that prove my fix is effective or my feature works
  • I have updated the documentation accordingly
  • I have added an appropriate example to the documentation to outline the feature, or no new docs are needed
  • My changes generate no new warnings
  • Any dependent changes have been merged and published

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@codecov
Copy link
Copy Markdown

codecov bot commented Mar 30, 2026

Codecov Report

❌ Patch coverage is 98.64865% with 1 line in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
src/strands/tools/_caller.py 98.64% 0 Missing and 1 partial ⚠️

📢 Thoughts on this report? Let us know!

@emaan-c emaan-c force-pushed the feat/add-streaming-to-tool-calls branch from 4eb85eb to 636cf86 Compare March 31, 2026 02:57
@github-actions github-actions bot added size/l and removed size/m labels Mar 31, 2026
@mkmeral
Copy link
Copy Markdown
Contributor

mkmeral commented Apr 9, 2026

/strands review


def __init__(self, agent: "Agent | BidiAgent") -> None:
"""Initialize instance.
class _ToolExecutor:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need a separate Tool Executor, why not just add stream methods to toolcaller?

Also strands already has tool executor concept, so there is a name clash

Copy link
Copy Markdown
Author

@emaan-c emaan-c Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the name clash was a miss on my part. Renamed it to _DirectToolCall in the latest push.

For adding methods directly to _ToolCaller, the issue is that it uses __getattr__ to resolve tool names, so any method I add would shadow a tool with the same name (imagine someone has a tool called stream). There's actually a WARNING comment in the class about this exact problem. The per-tool object approach sidesteps that and gives us the agent.tool.my_tool.stream_async() API naturally.

events = run_async(collect_events)
yield from events

async def stream_async(self, **kwargs: Any) -> AsyncIterator[TypedEvent]:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this path separate from __call__? Can we make streaming default and return final event (like how we handle agent stream and call)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea in principle, that's how Agent.__call__ wraps Agent.stream_async and it's clean. The wrinkle here is that __call__ does a bunch of stuff streaming intentionally skips: grabbing the invocation lock, calling apply_management() and etc. So unifying them would mean either adding all that to the streaming path or pulling it out of __call__, neither of which felt right for this PR.

I did pull the shared setup into _prepare_tool_use() though, so there's no more duplication between the two paths.

return events

events = run_async(collect_events)
yield from events
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue: stream() collects all events into a list before yielding, which defeats the purpose of streaming. Users calling stream() will not see events in real-time — they'll get all events at once after tool execution completes, making this behave identically to __call__() but with extra overhead.

Suggestion: Consider using a thread-safe queue to bridge async-to-sync streaming, or at minimum, make the docstring and PR description very explicit that stream() does NOT provide real-time streaming. Currently the docstring says "events are buffered before yielding" but the PR description positions this as enabling "real-time progress" which is misleading for the sync variant.

An alternative approach using a queue:

def stream(self, **kwargs: Any) -> Iterator[TypedEvent]:
    import queue
    import threading
    
    q: queue.Queue[TypedEvent | None] = queue.Queue()
    
    async def _produce() -> None:
        try:
            async for event in self.stream_async(**kwargs):
                q.put(event)
        finally:
            q.put(None)  # sentinel
    
    thread = threading.Thread(target=lambda: run_async(_produce), daemon=True)
    thread.start()
    
    while True:
        item = q.get()
        if item is None:
            break
        yield item
    thread.join()

If a true sync streaming implementation is too complex for this PR, consider removing stream() entirely and only shipping stream_async(). A sync method that doesn't actually stream could confuse users (violates "the obvious path is the happy path" tenet).

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair catch, the old implementation was basically __call__() with extra steps. Rewrote it to use a background thread + queue.Queue so events actually yield in real-time. Follows the same ThreadPoolExecutor + contextvars.copy_context() pattern that _async.py and MCPClient use elsewhere in the codebase.

RuntimeError: If called during interrupt.
"""
if self._agent._interrupt_state.activated:
raise RuntimeError("cannot directly call tool during interrupt")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue: stream_async() doesn't check the invocation lock or record_direct_tool_call, while __call__() does. This means streaming tool calls could execute concurrently with agent invocations even when record_direct_tool_call is True (the default), creating potential race conditions with shared agent state.

Suggestion: Either add the same concurrency guard from __call__(), or document why streaming calls intentionally skip it (e.g., because they don't record to message history). If the intent is that streaming never records, make this an explicit design decision in the docstring.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is by design, streaming is meant for observability only, it doesn't touch message history or agent state, so there's no shared mutation to guard against. Made this explicit in the docstrings now so it's clearly an intentional choice rather than an oversight.

events = run_async(collect_events)
yield from events

async def stream_async(self, **kwargs: Any) -> AsyncIterator[TypedEvent]:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue: The return type AsyncIterator[TypedEvent] for an async def with yield should be AsyncGenerator[TypedEvent, None] or AsyncIterator[TypedEvent]. However, TypedEvent is from the private _events module and is not exported from strands or strands.types. Customers wanting to type-hint the events they receive from this public API have no stable import path.

Suggestion: Either export TypedEvent from a public module (e.g., strands.types or strands), or use a more generic type like dict[str, Any] for the public API signature until TypedEvent is promoted to a public type.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. Switched to Any to match how Agent.stream_async handles its return type

logger.debug("tool_name=<%s>, streaming=<True> | executing tool stream", normalized_name)

# Create unique tool ID and set up the tool request
tool_id = f"tooluse_{self._tool_name}_{random.randint(100000000, 999999999)}"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue: The tool_use setup logic (normalize name, create tool_id, build ToolUse dict, create tool_results list) is duplicated between __call__() and stream_async(). This violates DRY and creates a maintenance burden — any future change to tool setup must be applied in both places.

Suggestion: Extract a private helper method, e.g.:

def _prepare_tool_use(self, **kwargs: Any) -> tuple[ToolUse, list[ToolResult], dict]:
    normalized_name = self._find_normalized_tool_name(self._tool_name)
    tool_id = f"tooluse_{self._tool_name}_{random.randint(100000000, 999999999)}"
    tool_use: ToolUse = {"toolUseId": tool_id, "name": normalized_name, "input": kwargs.copy()}
    return tool_use, [], kwargs

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, fixed

@github-actions
Copy link
Copy Markdown

github-actions bot commented Apr 9, 2026

Assessment: Request Changes

This PR addresses a real gap in the developer experience (issue #1436) and the architectural approach of wrapping ToolExecutor._stream() is sound. However, there are several issues that should be addressed before merging.

Review Categories
  • Critical — stream() doesn't actually stream: The sync stream() method buffers all events before yielding, making it functionally identical to __call__() but marketed as streaming. This violates the "obvious path is the happy path" tenet — either implement true sync streaming, remove it, or rename to make the behavior clear.
  • API Review Process: This PR adds new public API methods (stream / stream_async) but lacks the needs-api-review label. Key design questions remain around parameter parity with __call__(), the no-recording-by-default decision, and TypedEvent not being publicly exported.
  • Documentation PR: This adds new public API surface (streaming methods on tool calls). Per the documentation criteria, a documentation PR is required for new features that add public API surface. The PR claims "No documentation PR needed" but this should be reconsidered.
  • Behavioral asymmetry: stream_async() skips concurrency guards and conversation management that __call__() performs — this should be an intentional, documented design decision rather than an omission.
  • Code duplication: Tool setup logic is duplicated between __call__() and stream_async() and should be extracted.

The refactoring of _ToolCaller to return _ToolExecutor instances is a clean approach that preserves backward compatibility.


# TODO: https://github.com/strands-agents/sdk-python/issues/1311
if isinstance(self._agent, Agent):
self._agent.conversation_manager.apply_management(self._agent)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue: The __call__ method applies conversation_manager.apply_management() after execution (line 134), but stream_async() does not. While this may be intentional since streaming doesn't record to history, if a user mixes streaming and non-streaming calls, the conversation management behavior could become inconsistent.

Suggestion: Document this behavioral difference explicitly, or keep the two paths symmetric.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is intentional, streaming doesn't add anything to message history, so there's nothing for the conversation manager to act on. Added a note about this in both stream() and stream_async() docstrings to make it clear.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[FEATURE] Support for Direct Method Call Streaming

2 participants