-
-
Notifications
You must be signed in to change notification settings - Fork 1.9k
feat: implement EmptyModelOutputError for handling empty responses across providers and enhance retry logic in ToolLoopAgentRunner #7104
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -16,11 +16,18 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| TextContent, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| TextResourceContents, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from tenacity import ( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| AsyncRetrying, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| retry_if_exception_type, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| stop_after_attempt, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| wait_exponential, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from astrbot import logger | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from astrbot.core.agent.message import ImageURLPart, TextPart, ThinkPart | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from astrbot.core.agent.tool import ToolSet | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from astrbot.core.agent.tool_image_cache import tool_image_cache | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from astrbot.core.exceptions import EmptyModelOutputError | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from astrbot.core.message.components import Json | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from astrbot.core.message.message_event_result import ( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| MessageChain, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -95,6 +102,10 @@ class _ToolExecutionInterrupted(Exception): | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| class ToolLoopAgentRunner(BaseAgentRunner[TContext]): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| EMPTY_OUTPUT_RETRY_ATTEMPTS = 3 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| EMPTY_OUTPUT_RETRY_WAIT_MIN_S = 1 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| EMPTY_OUTPUT_RETRY_WAIT_MAX_S = 4 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def _get_persona_custom_error_message(self) -> str | None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Read persona-level custom error message from event extras when available.""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| event = getattr(self.run_context.context, "event", None) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -279,31 +290,61 @@ async def _iter_llm_responses_with_fallback( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| candidate_id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.provider = candidate | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| has_stream_output = False | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| async for resp in self._iter_llm_responses(include_model=idx == 0): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if resp.is_chunk: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| has_stream_output = True | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| yield resp | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| continue | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if ( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| resp.role == "err" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| and not has_stream_output | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| and (not is_last_candidate) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| last_err_response = resp | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger.warning( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "Chat Model %s returns error response, trying fallback to next provider.", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| candidate_id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| break | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| yield resp | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| retrying = AsyncRetrying( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| retry=retry_if_exception_type(EmptyModelOutputError), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| stop=stop_after_attempt(self.EMPTY_OUTPUT_RETRY_ATTEMPTS), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| wait=wait_exponential( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| multiplier=1, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| min=self.EMPTY_OUTPUT_RETRY_WAIT_MIN_S, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| max=self.EMPTY_OUTPUT_RETRY_WAIT_MAX_S, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| reraise=True, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if has_stream_output: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| async for attempt in retrying: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| has_stream_output = False | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| with attempt: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| async for resp in self._iter_llm_responses( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| include_model=idx == 0 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if resp.is_chunk: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| has_stream_output = True | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| yield resp | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+305
to
+314
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue (bug_risk): EmptyModelOutputError is still retried even after streaming has started, contradicting the log and likely intent. In the To actually skip retries once streaming has started, either:
That will make the behavior consistent with the log message and prevent multiple attempts after chunks have been emitted. |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| continue | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if ( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| resp.role == "err" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| and not has_stream_output | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| and (not is_last_candidate) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| last_err_response = resp | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger.warning( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "Chat Model %s returns error response, trying fallback to next provider.", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| candidate_id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| break | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| yield resp | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if has_stream_output: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| except EmptyModelOutputError: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if has_stream_output: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger.warning( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "Chat Model %s returned empty output after streaming started; skipping empty-output retry.", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| candidate_id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger.warning( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "Chat Model %s returned empty output on attempt %s/%s.", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| candidate_id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| attempt.retry_state.attempt_number, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.EMPTY_OUTPUT_RETRY_ATTEMPTS, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| raise | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+334
to
+347
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's a potential issue in the retry logic for streaming responses. If an EmptyModelOutputError occurs after streaming has begun (has_stream_output is true), the code logs that it's skipping the retry, but then unconditionally re-raises the exception. This will cause tenacity to perform a retry, which can lead to a confusing user experience with disjointed, repeated, or altered streaming output. When streaming has started, it's best to abort both retry and fallback to avoid sending inconsistent data to the user. You can achieve this by returning from the generator if has_stream_output is true, which will gracefully end the stream.
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| except Exception as exc: # noqa: BLE001 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| last_exception = exc | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger.warning( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,6 +13,7 @@ | |
| from astrbot import logger | ||
| from astrbot.api.provider import Provider | ||
| from astrbot.core.agent.message import ContentPart, ImageURLPart, TextPart | ||
| from astrbot.core.exceptions import EmptyModelOutputError | ||
| from astrbot.core.provider.entities import LLMResponse, TokenUsage | ||
| from astrbot.core.provider.func_tool_manager import ToolSet | ||
| from astrbot.core.utils.io import download_image_by_url | ||
|
|
@@ -29,6 +30,23 @@ | |
| "Anthropic Claude API 提供商适配器", | ||
| ) | ||
| class ProviderAnthropic(Provider): | ||
| @staticmethod | ||
| def _ensure_usable_response( | ||
| llm_response: LLMResponse, | ||
| *, | ||
| completion_id: str | None = None, | ||
| stop_reason: str | None = None, | ||
| ) -> None: | ||
| has_text_output = bool((llm_response.completion_text or "").strip()) | ||
| has_reasoning_output = bool(llm_response.reasoning_content.strip()) | ||
| has_tool_output = bool(llm_response.tools_call_args) | ||
| if has_text_output or has_reasoning_output or has_tool_output: | ||
| return | ||
| raise EmptyModelOutputError( | ||
| "Anthropic completion has no usable output. " | ||
| f"completion_id={completion_id}, stop_reason={stop_reason}" | ||
| ) | ||
|
|
||
| @staticmethod | ||
| def _normalize_custom_headers(provider_config: dict) -> dict[str, str] | None: | ||
| custom_headers = provider_config.get("custom_headers", {}) | ||
|
|
@@ -289,7 +307,9 @@ async def _query(self, payloads: dict, tools: ToolSet | None) -> LLMResponse: | |
| logger.debug(f"completion: {completion}") | ||
|
|
||
| if len(completion.content) == 0: | ||
| raise Exception("API 返回的 completion 为空。") | ||
| raise EmptyModelOutputError( | ||
| f"Anthropic completion is empty. completion_id={completion.id}" | ||
| ) | ||
|
|
||
| llm_response = LLMResponse(role="assistant") | ||
|
|
||
|
|
@@ -317,10 +337,9 @@ async def _query(self, payloads: dict, tools: ToolSet | None) -> LLMResponse: | |
| if not llm_response.completion_text and not llm_response.tools_call_args: | ||
| # Guard clause: raise early if no valid content at all | ||
| if not llm_response.reasoning_content: | ||
| raise ValueError( | ||
| f"Anthropic API returned unparsable completion: " | ||
| f"no text, tool_use, or thinking content found. " | ||
| f"Completion: {completion}" | ||
| raise EmptyModelOutputError( | ||
| "Anthropic completion has no usable output. " | ||
| f"completion_id={completion.id}, stop_reason={completion.stop_reason}" | ||
| ) | ||
|
|
||
| # We have reasoning content (ThinkingBlock) - this is valid | ||
|
|
@@ -330,6 +349,11 @@ async def _query(self, payloads: dict, tools: ToolSet | None) -> LLMResponse: | |
| ) | ||
| llm_response.completion_text = "" # Ensure empty string, not None | ||
|
|
||
| self._ensure_usable_response( | ||
| llm_response, | ||
| completion_id=completion.id, | ||
| stop_reason=completion.stop_reason, | ||
| ) | ||
| return llm_response | ||
|
|
||
| async def _query_stream( | ||
|
|
@@ -481,6 +505,11 @@ async def _query_stream( | |
| final_response.tools_call_name = [call["name"] for call in final_tool_calls] | ||
| final_response.tools_call_ids = [call["id"] for call in final_tool_calls] | ||
|
|
||
| self._ensure_usable_response( | ||
| final_response, | ||
| completion_id=id, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The variable name id shadows the built-in Python function id(). This is considered a bad practice as it can lead to confusion and potential bugs. It would be better to rename this variable to something more descriptive that doesn't conflict with a built-in, such as message_id or completion_id. This change should be applied consistently where the variable is defined (line 378) and used within the _query_stream method. References
|
||
| stop_reason=None, | ||
| ) | ||
| yield final_response | ||
|
|
||
| async def text_chat( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,6 +26,7 @@ | |
| from astrbot.api.provider import Provider | ||
| from astrbot.core.agent.message import ContentPart, ImageURLPart, Message, TextPart | ||
| from astrbot.core.agent.tool import ToolSet | ||
| from astrbot.core.exceptions import EmptyModelOutputError | ||
| from astrbot.core.message.message_event_result import MessageChain | ||
| from astrbot.core.provider.entities import LLMResponse, TokenUsage, ToolCallsResult | ||
| from astrbot.core.utils.io import download_image_by_url | ||
|
|
@@ -696,7 +697,9 @@ async def _parse_openai_completion( | |
| llm_response = LLMResponse("assistant") | ||
|
|
||
| if not completion.choices: | ||
| raise Exception("API 返回的 completion 为空。") | ||
| raise EmptyModelOutputError( | ||
| f"OpenAI completion has no choices. response_id={completion.id}" | ||
| ) | ||
| choice = completion.choices[0] | ||
|
|
||
| # parse the text completion | ||
|
|
@@ -714,6 +717,10 @@ async def _parse_openai_completion( | |
| # Also clean up orphan </think> tags that may leak from some models | ||
| completion_text = re.sub(r"</think>\s*$", "", completion_text).strip() | ||
| llm_response.result_chain = MessageChain().message(completion_text) | ||
| elif refusal := getattr(choice.message, "refusal", None): | ||
| refusal_text = self._normalize_content(refusal) | ||
| if refusal_text: | ||
| llm_response.result_chain = MessageChain().message(refusal_text) | ||
|
Comment on lines
+720
to
+723
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue (bug_risk): Refusal content is not considered in the subsequent 'usable output' check and will still trigger EmptyModelOutputError. The refusal branch only sets elif refusal := getattr(choice.message, "refusal", None):
refusal_text = self._normalize_content(refusal)
if refusal_text:
llm_response.result_chain = MessageChain().message(refusal_text)But the usability check later only considers has_text_output = bool((llm_response.completion_text or "").strip())
has_reasoning_output = bool(llm_response.reasoning_content.strip())
if (
not has_text_output
and not has_reasoning_output
and not llm_response.tools_call_args
):
raise EmptyModelOutputError(...)So a pure refusal with valid text in |
||
|
|
||
| # parse the reasoning content if any | ||
| # the priority is higher than the <think> tag extraction | ||
|
|
@@ -761,9 +768,18 @@ async def _parse_openai_completion( | |
| raise Exception( | ||
| "API 返回的 completion 由于内容安全过滤被拒绝(非 AstrBot)。", | ||
| ) | ||
| if llm_response.completion_text is None and not llm_response.tools_call_args: | ||
| logger.error(f"API 返回的 completion 无法解析:{completion}。") | ||
| raise Exception(f"API 返回的 completion 无法解析:{completion}。") | ||
| has_text_output = bool((llm_response.completion_text or "").strip()) | ||
| has_reasoning_output = bool(llm_response.reasoning_content.strip()) | ||
| if ( | ||
| not has_text_output | ||
| and not has_reasoning_output | ||
| and not llm_response.tools_call_args | ||
| ): | ||
| logger.error(f"OpenAI completion has no usable output: {completion}.") | ||
| raise EmptyModelOutputError( | ||
| "OpenAI completion has no usable output. " | ||
| f"response_id={completion.id}, finish_reason={choice.finish_reason}" | ||
| ) | ||
|
|
||
| llm_response.raw_completion = completion | ||
| llm_response.id = completion.id | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| import pytest | ||
|
|
||
| from astrbot.core.exceptions import EmptyModelOutputError | ||
| from astrbot.core.provider.entities import LLMResponse | ||
| from astrbot.core.provider.sources.gemini_source import ProviderGoogleGenAI | ||
|
|
||
|
|
||
| def test_gemini_empty_output_raises_empty_model_output_error(): | ||
| llm_response = LLMResponse(role="assistant") | ||
|
|
||
| with pytest.raises(EmptyModelOutputError): | ||
| ProviderGoogleGenAI._ensure_usable_response( | ||
| llm_response, | ||
| response_id="resp_empty", | ||
| finish_reason="STOP", | ||
| ) | ||
|
|
||
|
|
||
| def test_gemini_reasoning_only_output_is_allowed(): | ||
| llm_response = LLMResponse( | ||
| role="assistant", | ||
| reasoning_content="chain of thought placeholder", | ||
| ) | ||
|
|
||
| ProviderGoogleGenAI._ensure_usable_response( | ||
| llm_response, | ||
| response_id="resp_reasoning", | ||
| finish_reason="STOP", | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (complexity): Consider extracting the streaming-and-retry logic into a helper and using tenacity’s hooks so the main loop has less nesting and clearer separation of retry versus streaming behavior.
You can keep the new behavior but reduce nesting and separate concerns a bit.
1. Remove inner
try/exceptand let tenacity handleEmptyModelOutputErrorYou don’t need both
AsyncRetryingand an innertry/except EmptyModelOutputError. You can keep the “skip retry after streaming starts” behavior by moving that logic into a helper that decides whether to re-raise asEmptyModelOutputErroror as a non‑retryable exception.For example, extract the streaming +
has_stream_outputhandling:Then the main loop becomes:
This removes one
try/exceptlayer and keeps all the retry semantics intact.2. Use tenacity hooks for retry logging
The “attempt x/y” logging can move into tenacity hooks, so it doesn’t sit inside the streaming loop:
This keeps the retry policy and logging in one place and leaves the streaming logic focused on streaming/fallback behavior.