Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 64 additions & 23 deletions astrbot/core/agent/runners/tool_loop_agent_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,18 @@
TextContent,
TextResourceContents,
)
from tenacity import (
AsyncRetrying,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)

from astrbot import logger
from astrbot.core.agent.message import ImageURLPart, TextPart, ThinkPart
from astrbot.core.agent.tool import ToolSet
from astrbot.core.agent.tool_image_cache import tool_image_cache
from astrbot.core.exceptions import EmptyModelOutputError
from astrbot.core.message.components import Json
from astrbot.core.message.message_event_result import (
MessageChain,
Expand Down Expand Up @@ -95,6 +102,10 @@ class _ToolExecutionInterrupted(Exception):


class ToolLoopAgentRunner(BaseAgentRunner[TContext]):
EMPTY_OUTPUT_RETRY_ATTEMPTS = 3
EMPTY_OUTPUT_RETRY_WAIT_MIN_S = 1
EMPTY_OUTPUT_RETRY_WAIT_MAX_S = 4

def _get_persona_custom_error_message(self) -> str | None:
"""Read persona-level custom error message from event extras when available."""
event = getattr(self.run_context.context, "event", None)
Expand Down Expand Up @@ -279,31 +290,61 @@ async def _iter_llm_responses_with_fallback(
candidate_id,
)
self.provider = candidate
has_stream_output = False
try:
async for resp in self._iter_llm_responses(include_model=idx == 0):
if resp.is_chunk:
has_stream_output = True
yield resp
continue

if (
resp.role == "err"
and not has_stream_output
and (not is_last_candidate)
):
last_err_response = resp
logger.warning(
"Chat Model %s returns error response, trying fallback to next provider.",
candidate_id,
)
break

yield resp
return
retrying = AsyncRetrying(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider extracting the streaming-and-retry logic into a helper and using tenacity’s hooks so the main loop has less nesting and clearer separation of retry versus streaming behavior.

You can keep the new behavior but reduce nesting and separate concerns a bit.

1. Remove inner try/except and let tenacity handle EmptyModelOutputError

You don’t need both AsyncRetrying and an inner try/except EmptyModelOutputError. You can keep the “skip retry after streaming starts” behavior by moving that logic into a helper that decides whether to re-raise as EmptyModelOutputError or as a non‑retryable exception.

For example, extract the streaming + has_stream_output handling:

async def _stream_llm_with_empty_output_guard(
    self,
    *,
    include_model: bool,
    candidate_id: str,
    is_last_candidate: bool,
) -> None:
    has_stream_output = False

    try:
        async for resp in self._iter_llm_responses(include_model=include_model):
            if resp.is_chunk:
                has_stream_output = True
                yield resp
                continue

            if (
                resp.role == "err"
                and not has_stream_output
                and not is_last_candidate
            ):
                # unchanged fallback behavior
                last_err_response = resp
                logger.warning(
                    "Chat Model %s returns error response, trying fallback to next provider.",
                    candidate_id,
                )
                break

            yield resp
            return

        if has_stream_output:
            return

    except EmptyModelOutputError:
        if has_stream_output:
            logger.warning(
                "Chat Model %s returned empty output after streaming started; skipping empty-output retry.",
                candidate_id,
            )
            # convert to non-retryable error so tenacity won't retry
            raise RuntimeError("Empty output after streaming started") from None
        else:
            # let tenacity see the original retryable exception
            raise

Then the main loop becomes:

retrying = AsyncRetrying(
    retry=retry_if_exception_type(EmptyModelOutputError),
    stop=stop_after_attempt(self.EMPTY_OUTPUT_RETRY_ATTEMPTS),
    wait=wait_exponential(
        multiplier=1,
        min=self.EMPTY_OUTPUT_RETRY_WAIT_MIN_S,
        max=self.EMPTY_OUTPUT_RETRY_WAIT_MAX_S,
    ),
    reraise=True,
)

async for attempt in retrying:
    with attempt:
        await self._stream_llm_with_empty_output_guard(
            include_model=idx == 0,
            candidate_id=candidate_id,
            is_last_candidate=is_last_candidate,
        )
        # success -> break retry loop
        break

This removes one try/except layer and keeps all the retry semantics intact.

2. Use tenacity hooks for retry logging

The “attempt x/y” logging can move into tenacity hooks, so it doesn’t sit inside the streaming loop:

def _log_empty_output_retry(retry_state: tenacity.RetryCallState) -> None:
    candidate_id = retry_state.kwargs.get("candidate_id", "<unknown>")
    logger.warning(
        "Chat Model %s returned empty output on attempt %s/%s.",
        candidate_id,
        retry_state.attempt_number,
        self.EMPTY_OUTPUT_RETRY_ATTEMPTS,
    )

retrying = AsyncRetrying(
    retry=retry_if_exception_type(EmptyModelOutputError),
    stop=stop_after_attempt(self.EMPTY_OUTPUT_RETRY_ATTEMPTS),
    wait=wait_exponential(
        multiplier=1,
        min=self.EMPTY_OUTPUT_RETRY_WAIT_MIN_S,
        max=self.EMPTY_OUTPUT_RETRY_WAIT_MAX_S,
    ),
    before_sleep=_log_empty_output_retry,
    reraise=True,
)

This keeps the retry policy and logging in one place and leaves the streaming logic focused on streaming/fallback behavior.

retry=retry_if_exception_type(EmptyModelOutputError),
stop=stop_after_attempt(self.EMPTY_OUTPUT_RETRY_ATTEMPTS),
wait=wait_exponential(
multiplier=1,
min=self.EMPTY_OUTPUT_RETRY_WAIT_MIN_S,
max=self.EMPTY_OUTPUT_RETRY_WAIT_MAX_S,
),
reraise=True,
)

if has_stream_output:
return
async for attempt in retrying:
has_stream_output = False
with attempt:
try:
async for resp in self._iter_llm_responses(
include_model=idx == 0
):
if resp.is_chunk:
has_stream_output = True
yield resp
Comment on lines +305 to +314
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): EmptyModelOutputError is still retried even after streaming has started, contradicting the log and likely intent.

In the except EmptyModelOutputError block, when has_stream_output is true we log that we are skipping the retry but still re-raise EmptyModelOutputError, which AsyncRetrying is configured to retry. This can cause the entire call to restart after partial output has already been streamed.

To actually skip retries once streaming has started, either:

  • raise a different exception that the retry logic does not catch, or
  • exit the retry loop directly (e.g., return or break) when has_stream_output is true.

That will make the behavior consistent with the log message and prevent multiple attempts after chunks have been emitted.

continue

if (
resp.role == "err"
and not has_stream_output
and (not is_last_candidate)
):
last_err_response = resp
logger.warning(
"Chat Model %s returns error response, trying fallback to next provider.",
candidate_id,
)
break

yield resp
return

if has_stream_output:
return
except EmptyModelOutputError:
if has_stream_output:
logger.warning(
"Chat Model %s returned empty output after streaming started; skipping empty-output retry.",
candidate_id,
)
else:
logger.warning(
"Chat Model %s returned empty output on attempt %s/%s.",
candidate_id,
attempt.retry_state.attempt_number,
self.EMPTY_OUTPUT_RETRY_ATTEMPTS,
)
raise
Comment on lines +334 to +347
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There's a potential issue in the retry logic for streaming responses. If an EmptyModelOutputError occurs after streaming has begun (has_stream_output is true), the code logs that it's skipping the retry, but then unconditionally re-raises the exception. This will cause tenacity to perform a retry, which can lead to a confusing user experience with disjointed, repeated, or altered streaming output.

When streaming has started, it's best to abort both retry and fallback to avoid sending inconsistent data to the user. You can achieve this by returning from the generator if has_stream_output is true, which will gracefully end the stream.

Suggested change
except EmptyModelOutputError:
if has_stream_output:
logger.warning(
"Chat Model %s returned empty output after streaming started; skipping empty-output retry.",
candidate_id,
)
else:
logger.warning(
"Chat Model %s returned empty output on attempt %s/%s.",
candidate_id,
attempt.retry_state.attempt_number,
self.EMPTY_OUTPUT_RETRY_ATTEMPTS,
)
raise
except EmptyModelOutputError:
if has_stream_output:
logger.warning(
"Chat Model %s returned empty output after streaming started; aborting retry and fallback.",
candidate_id,
)
return
else:
logger.warning(
"Chat Model %s returned empty output on attempt %s/%s.",
candidate_id,
attempt.retry_state.attempt_number,
self.EMPTY_OUTPUT_RETRY_ATTEMPTS,
)
raise

except Exception as exc: # noqa: BLE001
last_exception = exc
logger.warning(
Expand Down
4 changes: 4 additions & 0 deletions astrbot/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,7 @@ class AstrBotError(Exception):

class ProviderNotFoundError(AstrBotError):
"""Raised when a specified provider is not found."""


class EmptyModelOutputError(AstrBotError):
"""Raised when the model response contains no usable assistant output."""
39 changes: 34 additions & 5 deletions astrbot/core/provider/sources/anthropic_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from astrbot import logger
from astrbot.api.provider import Provider
from astrbot.core.agent.message import ContentPart, ImageURLPart, TextPart
from astrbot.core.exceptions import EmptyModelOutputError
from astrbot.core.provider.entities import LLMResponse, TokenUsage
from astrbot.core.provider.func_tool_manager import ToolSet
from astrbot.core.utils.io import download_image_by_url
Expand All @@ -29,6 +30,23 @@
"Anthropic Claude API 提供商适配器",
)
class ProviderAnthropic(Provider):
@staticmethod
def _ensure_usable_response(
llm_response: LLMResponse,
*,
completion_id: str | None = None,
stop_reason: str | None = None,
) -> None:
has_text_output = bool((llm_response.completion_text or "").strip())
has_reasoning_output = bool(llm_response.reasoning_content.strip())
has_tool_output = bool(llm_response.tools_call_args)
if has_text_output or has_reasoning_output or has_tool_output:
return
raise EmptyModelOutputError(
"Anthropic completion has no usable output. "
f"completion_id={completion_id}, stop_reason={stop_reason}"
)

@staticmethod
def _normalize_custom_headers(provider_config: dict) -> dict[str, str] | None:
custom_headers = provider_config.get("custom_headers", {})
Expand Down Expand Up @@ -289,7 +307,9 @@ async def _query(self, payloads: dict, tools: ToolSet | None) -> LLMResponse:
logger.debug(f"completion: {completion}")

if len(completion.content) == 0:
raise Exception("API 返回的 completion 为空。")
raise EmptyModelOutputError(
f"Anthropic completion is empty. completion_id={completion.id}"
)

llm_response = LLMResponse(role="assistant")

Expand Down Expand Up @@ -317,10 +337,9 @@ async def _query(self, payloads: dict, tools: ToolSet | None) -> LLMResponse:
if not llm_response.completion_text and not llm_response.tools_call_args:
# Guard clause: raise early if no valid content at all
if not llm_response.reasoning_content:
raise ValueError(
f"Anthropic API returned unparsable completion: "
f"no text, tool_use, or thinking content found. "
f"Completion: {completion}"
raise EmptyModelOutputError(
"Anthropic completion has no usable output. "
f"completion_id={completion.id}, stop_reason={completion.stop_reason}"
)

# We have reasoning content (ThinkingBlock) - this is valid
Expand All @@ -330,6 +349,11 @@ async def _query(self, payloads: dict, tools: ToolSet | None) -> LLMResponse:
)
llm_response.completion_text = "" # Ensure empty string, not None

self._ensure_usable_response(
llm_response,
completion_id=completion.id,
stop_reason=completion.stop_reason,
)
return llm_response

async def _query_stream(
Expand Down Expand Up @@ -481,6 +505,11 @@ async def _query_stream(
final_response.tools_call_name = [call["name"] for call in final_tool_calls]
final_response.tools_call_ids = [call["id"] for call in final_tool_calls]

self._ensure_usable_response(
final_response,
completion_id=id,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The variable name id shadows the built-in Python function id(). This is considered a bad practice as it can lead to confusion and potential bugs. It would be better to rename this variable to something more descriptive that doesn't conflict with a built-in, such as message_id or completion_id. This change should be applied consistently where the variable is defined (line 378) and used within the _query_stream method.

References
  1. PEP 8: Avoid using names that shadow built-in identifiers like 'id'. (link)

stop_reason=None,
)
yield final_response

async def text_chat(
Expand Down
46 changes: 40 additions & 6 deletions astrbot/core/provider/sources/gemini_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from astrbot import logger
from astrbot.api.provider import Provider
from astrbot.core.agent.message import ContentPart, ImageURLPart, TextPart
from astrbot.core.exceptions import EmptyModelOutputError
from astrbot.core.message.message_event_result import MessageChain
from astrbot.core.provider.entities import LLMResponse, TokenUsage
from astrbot.core.provider.func_tool_manager import ToolSet
Expand Down Expand Up @@ -444,6 +445,23 @@ def _extract_usage(
output=usage_metadata.candidates_token_count or 0,
)

@staticmethod
def _ensure_usable_response(
llm_response: LLMResponse,
*,
response_id: str | None = None,
finish_reason: str | None = None,
) -> None:
has_text_output = bool((llm_response.completion_text or "").strip())
has_reasoning_output = bool(llm_response.reasoning_content.strip())
has_tool_output = bool(llm_response.tools_call_args)
if has_text_output or has_reasoning_output or has_tool_output:
return
raise EmptyModelOutputError(
"Gemini completion has no usable output. "
f"response_id={response_id}, finish_reason={finish_reason}"
)

def _process_content_parts(
self,
candidate: types.Candidate,
Expand All @@ -452,7 +470,10 @@ def _process_content_parts(
"""处理内容部分并构建消息链"""
if not candidate.content:
logger.warning(f"收到的 candidate.content 为空: {candidate}")
raise Exception("API 返回的 candidate.content 为空。")
raise EmptyModelOutputError(
"Gemini candidate content is empty. "
f"finish_reason={candidate.finish_reason}"
)

finish_reason = candidate.finish_reason
result_parts: list[types.Part] | None = candidate.content.parts
Expand All @@ -474,7 +495,10 @@ def _process_content_parts(

if not result_parts:
logger.warning(f"收到的 candidate.content.parts 为空: {candidate}")
raise Exception("API 返回的 candidate.content.parts 为空。")
raise EmptyModelOutputError(
"Gemini candidate content parts are empty. "
f"finish_reason={candidate.finish_reason}"
)

# 提取 reasoning content
reasoning = self._extract_reasoning_content(candidate)
Expand Down Expand Up @@ -525,7 +549,14 @@ def _process_content_parts(
if ts := part.thought_signature:
# only keep the last thinking signature
llm_response.reasoning_signature = base64.b64encode(ts).decode("utf-8")
return MessageChain(chain=chain)
chain_result = MessageChain(chain=chain)
llm_response.result_chain = chain_result
self._ensure_usable_response(
llm_response,
response_id=None,
finish_reason=str(finish_reason) if finish_reason is not None else None,
)
return chain_result

async def _query(self, payloads: dict, tools: ToolSet | None) -> LLMResponse:
"""非流式请求 Gemini API"""
Expand Down Expand Up @@ -727,9 +758,12 @@ async def _query_stream(
final_response.result_chain = MessageChain(
chain=[Comp.Plain(accumulated_text)],
)
elif not final_response.result_chain:
# If no text was accumulated and no final response was set, provide empty space
final_response.result_chain = MessageChain(chain=[Comp.Plain(" ")])

self._ensure_usable_response(
final_response,
response_id=getattr(final_response, "id", None),
finish_reason=None,
)

yield final_response

Expand Down
24 changes: 20 additions & 4 deletions astrbot/core/provider/sources/openai_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from astrbot.api.provider import Provider
from astrbot.core.agent.message import ContentPart, ImageURLPart, Message, TextPart
from astrbot.core.agent.tool import ToolSet
from astrbot.core.exceptions import EmptyModelOutputError
from astrbot.core.message.message_event_result import MessageChain
from astrbot.core.provider.entities import LLMResponse, TokenUsage, ToolCallsResult
from astrbot.core.utils.io import download_image_by_url
Expand Down Expand Up @@ -696,7 +697,9 @@ async def _parse_openai_completion(
llm_response = LLMResponse("assistant")

if not completion.choices:
raise Exception("API 返回的 completion 为空。")
raise EmptyModelOutputError(
f"OpenAI completion has no choices. response_id={completion.id}"
)
choice = completion.choices[0]

# parse the text completion
Expand All @@ -714,6 +717,10 @@ async def _parse_openai_completion(
# Also clean up orphan </think> tags that may leak from some models
completion_text = re.sub(r"</think>\s*$", "", completion_text).strip()
llm_response.result_chain = MessageChain().message(completion_text)
elif refusal := getattr(choice.message, "refusal", None):
refusal_text = self._normalize_content(refusal)
if refusal_text:
llm_response.result_chain = MessageChain().message(refusal_text)
Comment on lines +720 to +723
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Refusal content is not considered in the subsequent 'usable output' check and will still trigger EmptyModelOutputError.

The refusal branch only sets result_chain and never completion_text or reasoning_content:

elif refusal := getattr(choice.message, "refusal", None):
    refusal_text = self._normalize_content(refusal)
    if refusal_text:
        llm_response.result_chain = MessageChain().message(refusal_text)

But the usability check later only considers completion_text, reasoning_content, and tools_call_args when deciding whether to raise EmptyModelOutputError:

has_text_output = bool((llm_response.completion_text or "").strip())
has_reasoning_output = bool(llm_response.reasoning_content.strip())
if (
    not has_text_output
    and not has_reasoning_output
    and not llm_response.tools_call_args
):
    raise EmptyModelOutputError(...)

So a pure refusal with valid text in result_chain still triggers EmptyModelOutputError. To align behavior, either copy the refusal text into completion_text, or update the usable-output check to also treat a non-empty result_chain as valid output.


# parse the reasoning content if any
# the priority is higher than the <think> tag extraction
Expand Down Expand Up @@ -761,9 +768,18 @@ async def _parse_openai_completion(
raise Exception(
"API 返回的 completion 由于内容安全过滤被拒绝(非 AstrBot)。",
)
if llm_response.completion_text is None and not llm_response.tools_call_args:
logger.error(f"API 返回的 completion 无法解析:{completion}。")
raise Exception(f"API 返回的 completion 无法解析:{completion}。")
has_text_output = bool((llm_response.completion_text or "").strip())
has_reasoning_output = bool(llm_response.reasoning_content.strip())
if (
not has_text_output
and not has_reasoning_output
and not llm_response.tools_call_args
):
logger.error(f"OpenAI completion has no usable output: {completion}.")
raise EmptyModelOutputError(
"OpenAI completion has no usable output. "
f"response_id={completion.id}, finish_reason={choice.finish_reason}"
)

llm_response.raw_completion = completion
llm_response.id = completion.id
Expand Down
14 changes: 14 additions & 0 deletions tests/test_anthropic_kimi_code_provider.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import httpx
import pytest

import astrbot.core.provider.sources.anthropic_source as anthropic_source
import astrbot.core.provider.sources.kimi_code_source as kimi_code_source
from astrbot.core.exceptions import EmptyModelOutputError
from astrbot.core.provider.entities import LLMResponse


class _FakeAsyncAnthropic:
Expand Down Expand Up @@ -79,3 +82,14 @@ def test_kimi_code_provider_restores_required_user_agent_when_blank(monkeypatch)
assert provider.custom_headers == {
"User-Agent": kimi_code_source.KIMI_CODE_USER_AGENT,
}


def test_anthropic_empty_output_raises_empty_model_output_error():
llm_response = LLMResponse(role="assistant")

with pytest.raises(EmptyModelOutputError):
anthropic_source.ProviderAnthropic._ensure_usable_response(
llm_response,
completion_id="msg_empty",
stop_reason="end_turn",
)
29 changes: 29 additions & 0 deletions tests/test_gemini_source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import pytest

from astrbot.core.exceptions import EmptyModelOutputError
from astrbot.core.provider.entities import LLMResponse
from astrbot.core.provider.sources.gemini_source import ProviderGoogleGenAI


def test_gemini_empty_output_raises_empty_model_output_error():
llm_response = LLMResponse(role="assistant")

with pytest.raises(EmptyModelOutputError):
ProviderGoogleGenAI._ensure_usable_response(
llm_response,
response_id="resp_empty",
finish_reason="STOP",
)


def test_gemini_reasoning_only_output_is_allowed():
llm_response = LLMResponse(
role="assistant",
reasoning_content="chain of thought placeholder",
)

ProviderGoogleGenAI._ensure_usable_response(
llm_response,
response_id="resp_reasoning",
finish_reason="STOP",
)
Loading
Loading