feat: add streaming to direct tool calls#1955
feat: add streaming to direct tool calls#1955emaan-c wants to merge 1 commit intostrands-agents:mainfrom
Conversation
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
4eb85eb to
636cf86
Compare
|
/strands review |
src/strands/tools/_caller.py
Outdated
|
|
||
| def __init__(self, agent: "Agent | BidiAgent") -> None: | ||
| """Initialize instance. | ||
| class _ToolExecutor: |
There was a problem hiding this comment.
do we need a separate Tool Executor, why not just add stream methods to toolcaller?
Also strands already has tool executor concept, so there is a name clash
There was a problem hiding this comment.
Yeah the name clash was a miss on my part. Renamed it to _DirectToolCall in the latest push.
For adding methods directly to _ToolCaller, the issue is that it uses __getattr__ to resolve tool names, so any method I add would shadow a tool with the same name (imagine someone has a tool called stream). There's actually a WARNING comment in the class about this exact problem. The per-tool object approach sidesteps that and gives us the agent.tool.my_tool.stream_async() API naturally.
src/strands/tools/_caller.py
Outdated
| events = run_async(collect_events) | ||
| yield from events | ||
|
|
||
| async def stream_async(self, **kwargs: Any) -> AsyncIterator[TypedEvent]: |
There was a problem hiding this comment.
is this path separate from __call__? Can we make streaming default and return final event (like how we handle agent stream and call)
There was a problem hiding this comment.
I like the idea in principle, that's how Agent.__call__ wraps Agent.stream_async and it's clean. The wrinkle here is that __call__ does a bunch of stuff streaming intentionally skips: grabbing the invocation lock, calling apply_management() and etc. So unifying them would mean either adding all that to the streaming path or pulling it out of __call__, neither of which felt right for this PR.
I did pull the shared setup into _prepare_tool_use() though, so there's no more duplication between the two paths.
src/strands/tools/_caller.py
Outdated
| return events | ||
|
|
||
| events = run_async(collect_events) | ||
| yield from events |
There was a problem hiding this comment.
Issue: stream() collects all events into a list before yielding, which defeats the purpose of streaming. Users calling stream() will not see events in real-time — they'll get all events at once after tool execution completes, making this behave identically to __call__() but with extra overhead.
Suggestion: Consider using a thread-safe queue to bridge async-to-sync streaming, or at minimum, make the docstring and PR description very explicit that stream() does NOT provide real-time streaming. Currently the docstring says "events are buffered before yielding" but the PR description positions this as enabling "real-time progress" which is misleading for the sync variant.
An alternative approach using a queue:
def stream(self, **kwargs: Any) -> Iterator[TypedEvent]:
import queue
import threading
q: queue.Queue[TypedEvent | None] = queue.Queue()
async def _produce() -> None:
try:
async for event in self.stream_async(**kwargs):
q.put(event)
finally:
q.put(None) # sentinel
thread = threading.Thread(target=lambda: run_async(_produce), daemon=True)
thread.start()
while True:
item = q.get()
if item is None:
break
yield item
thread.join()If a true sync streaming implementation is too complex for this PR, consider removing stream() entirely and only shipping stream_async(). A sync method that doesn't actually stream could confuse users (violates "the obvious path is the happy path" tenet).
There was a problem hiding this comment.
Fair catch, the old implementation was basically __call__() with extra steps. Rewrote it to use a background thread + queue.Queue so events actually yield in real-time. Follows the same ThreadPoolExecutor + contextvars.copy_context() pattern that _async.py and MCPClient use elsewhere in the codebase.
| RuntimeError: If called during interrupt. | ||
| """ | ||
| if self._agent._interrupt_state.activated: | ||
| raise RuntimeError("cannot directly call tool during interrupt") |
There was a problem hiding this comment.
Issue: stream_async() doesn't check the invocation lock or record_direct_tool_call, while __call__() does. This means streaming tool calls could execute concurrently with agent invocations even when record_direct_tool_call is True (the default), creating potential race conditions with shared agent state.
Suggestion: Either add the same concurrency guard from __call__(), or document why streaming calls intentionally skip it (e.g., because they don't record to message history). If the intent is that streaming never records, make this an explicit design decision in the docstring.
There was a problem hiding this comment.
This is by design, streaming is meant for observability only, it doesn't touch message history or agent state, so there's no shared mutation to guard against. Made this explicit in the docstrings now so it's clearly an intentional choice rather than an oversight.
src/strands/tools/_caller.py
Outdated
| events = run_async(collect_events) | ||
| yield from events | ||
|
|
||
| async def stream_async(self, **kwargs: Any) -> AsyncIterator[TypedEvent]: |
There was a problem hiding this comment.
Issue: The return type AsyncIterator[TypedEvent] for an async def with yield should be AsyncGenerator[TypedEvent, None] or AsyncIterator[TypedEvent]. However, TypedEvent is from the private _events module and is not exported from strands or strands.types. Customers wanting to type-hint the events they receive from this public API have no stable import path.
Suggestion: Either export TypedEvent from a public module (e.g., strands.types or strands), or use a more generic type like dict[str, Any] for the public API signature until TypedEvent is promoted to a public type.
There was a problem hiding this comment.
Good call. Switched to Any to match how Agent.stream_async handles its return type
src/strands/tools/_caller.py
Outdated
| logger.debug("tool_name=<%s>, streaming=<True> | executing tool stream", normalized_name) | ||
|
|
||
| # Create unique tool ID and set up the tool request | ||
| tool_id = f"tooluse_{self._tool_name}_{random.randint(100000000, 999999999)}" |
There was a problem hiding this comment.
Issue: The tool_use setup logic (normalize name, create tool_id, build ToolUse dict, create tool_results list) is duplicated between __call__() and stream_async(). This violates DRY and creates a maintenance burden — any future change to tool setup must be applied in both places.
Suggestion: Extract a private helper method, e.g.:
def _prepare_tool_use(self, **kwargs: Any) -> tuple[ToolUse, list[ToolResult], dict]:
normalized_name = self._find_normalized_tool_name(self._tool_name)
tool_id = f"tooluse_{self._tool_name}_{random.randint(100000000, 999999999)}"
tool_use: ToolUse = {"toolUseId": tool_id, "name": normalized_name, "input": kwargs.copy()}
return tool_use, [], kwargs|
Assessment: Request Changes This PR addresses a real gap in the developer experience (issue #1436) and the architectural approach of wrapping Review Categories
The refactoring of |
|
|
||
| # TODO: https://github.com/strands-agents/sdk-python/issues/1311 | ||
| if isinstance(self._agent, Agent): | ||
| self._agent.conversation_manager.apply_management(self._agent) |
There was a problem hiding this comment.
Issue: The __call__ method applies conversation_manager.apply_management() after execution (line 134), but stream_async() does not. While this may be intentional since streaming doesn't record to history, if a user mixes streaming and non-streaming calls, the conversation management behavior could become inconsistent.
Suggestion: Document this behavioral difference explicitly, or keep the two paths symmetric.
There was a problem hiding this comment.
This is intentional, streaming doesn't add anything to message history, so there's nothing for the conversation manager to act on. Added a note about this in both stream() and stream_async() docstrings to make it clear.
636cf86 to
0927155
Compare
Description
Direct tool calls (
agent.tool.tool_name()) currently block without providing streaming events, while agent-level calls expose rich streaming from the same underlying infrastructure. This creates inconsistent developer experience and prevents building responsive UIs for long-running operations, multi-agent systems, and debugging workflows.This adds
stream()andstream_async()methods to tool calls, enabling real-time observability without recording to message history.Resolves: #1436
Public API Changes
Tool calls now support three execution modes:
Streaming methods yield the same events as
ToolExecutor._stream()without recording to message history. 100% backward compatible.Use Cases
Related Issues
#1436
Documentation PR
strands-agents/docs#754
Type of Change
New feature
Testing
hatch run prepareChecklist
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.