Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions astrbot/core/config/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,13 @@
"platform_specific": {
# 平台特异配置:按平台分类,平台下按功能分组
"lark": {
"pre_ack_emoji": {"enable": False, "emojis": ["Typing"]},
"pre_ack_emoji": {"enable": False, "emojis": ["Typing"], "auto_remove": True},
},
"telegram": {
"pre_ack_emoji": {"enable": False, "emojis": ["✍️"]},
"pre_ack_emoji": {"enable": False, "emojis": ["✍️"], "auto_remove": True},
},
"discord": {
"pre_ack_emoji": {"enable": False, "emojis": ["✍️"], "auto_remove": True},
},
},
"wake_prefix": ["/"],
Expand Down Expand Up @@ -3555,6 +3558,13 @@ class ChatProviderTemplate(TypedDict):
"platform_specific.lark.pre_ack_emoji.enable": True,
},
},
"platform_specific.lark.pre_ack_emoji.auto_remove": {
"description": "[飞书] 处理完毕后自动撤回表情",
"type": "bool",
"condition": {
"platform_specific.lark.pre_ack_emoji.enable": True,
},
},
"platform_specific.telegram.pre_ack_emoji.enable": {
"description": "[Telegram] 启用预回应表情",
"type": "bool",
Expand All @@ -3568,6 +3578,13 @@ class ChatProviderTemplate(TypedDict):
"platform_specific.telegram.pre_ack_emoji.enable": True,
},
},
"platform_specific.telegram.pre_ack_emoji.auto_remove": {
"description": "[Telegram] 处理完毕后自动撤回表情",
"type": "bool",
"condition": {
"platform_specific.telegram.pre_ack_emoji.enable": True,
},
},
"platform_specific.discord.pre_ack_emoji.enable": {
"description": "[Discord] 启用预回应表情",
"type": "bool",
Expand All @@ -3581,6 +3598,13 @@ class ChatProviderTemplate(TypedDict):
"platform_specific.discord.pre_ack_emoji.enable": True,
},
},
"platform_specific.discord.pre_ack_emoji.auto_remove": {
"description": "[Discord] 处理完毕后自动撤回表情",
"type": "bool",
"condition": {
"platform_specific.discord.pre_ack_emoji.enable": True,
},
},
},
},
},
Expand Down
69 changes: 69 additions & 0 deletions astrbot/core/pipeline/pre_ack_emoji.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import random
from dataclasses import dataclass

from astrbot.core import logger
from astrbot.core.platform import AstrMessageEvent


@dataclass
class EmojiRef:
"""贴出的表情引用,包含撤回所需的全部信息。"""

emoji: str
reaction_id: str | None = None # 飞书需要 reaction_id 来撤回


class PreAckEmojiManager:
"""预回应表情管理器。

在 pipeline 执行前贴表情,执行后根据配置撤回。
运行在洋葱模型外层,不参与 stage 调度。
"""

SUPPORTED_PLATFORMS = ("telegram", "lark", "discord")

def __init__(self, config: dict) -> None:
self.config = config

def _get_cfg(self, platform: str) -> dict:
return (
self.config.get("platform_specific", {})
.get(platform, {})
.get("pre_ack_emoji", {})
) or {}

async def add_emoji(self, event: AstrMessageEvent) -> EmojiRef | None:
"""贴表情。返回 EmojiRef,或 None(未贴)。"""
platform = event.get_platform_name()
if platform not in self.SUPPORTED_PLATFORMS:
return None

cfg = self._get_cfg(platform)
emojis = cfg.get("emojis") or []

if not cfg.get("enable", False) or not emojis:
return None

emoji = random.choice(emojis)
try:
reaction_id = await event.react(emoji)
return EmojiRef(emoji=emoji, reaction_id=reaction_id)
except Exception as e:
logger.warning(f"{platform} 预回应表情发送失败: {e}")
return None

async def remove_emoji(self, event: AstrMessageEvent, ref: EmojiRef | None) -> None:
"""根据配置撤回表情。"""
if ref is None:
return

platform = event.get_platform_name()
cfg = self._get_cfg(platform)

if not cfg.get("auto_remove", True):
return

try:
await event.remove_react(ref.emoji, reaction_id=ref.reaction_id)
except Exception as e:
logger.warning(f"{platform} 预回应表情撤回失败: {e}")
21 changes: 0 additions & 21 deletions astrbot/core/pipeline/preprocess_stage/stage.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
import random
import traceback
from collections.abc import AsyncGenerator

Expand All @@ -26,26 +25,6 @@ async def process(
event: AstrMessageEvent,
) -> None | AsyncGenerator[None, None]:
"""在处理事件之前的预处理"""
# 平台特异配置:platform_specific.<platform>.pre_ack_emoji
supported = {"telegram", "lark", "discord"}
platform = event.get_platform_name()
cfg = (
self.config.get("platform_specific", {})
.get(platform, {})
.get("pre_ack_emoji", {})
) or {}
emojis = cfg.get("emojis") or []
if (
cfg.get("enable", False)
and platform in supported
and emojis
and event.is_at_or_wake_command
):
try:
await event.react(random.choice(emojis))
except Exception as e:
logger.warning(f"{platform} 预回应表情发送失败: {e}")

# 路径映射
if mappings := self.platform_settings.get("path_mapping", []):
# 支持 Record,Image 消息段的路径映射。
Expand Down
10 changes: 10 additions & 0 deletions astrbot/core/pipeline/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from astrbot.core.utils.active_event_registry import active_event_registry

from .bootstrap import ensure_builtin_stages_registered
from .pre_ack_emoji import PreAckEmojiManager
from .context import PipelineContext
from .stage import registered_stages
from .stage_order import STAGES_ORDER
Expand All @@ -24,6 +25,7 @@ def __init__(self, context: PipelineContext) -> None:
) # 按照顺序排序
self.ctx = context # 上下文对象
self.stages = [] # 存储阶段实例
self.pre_ack_emoji_mgr = PreAckEmojiManager(context.astrbot_config)

async def initialize(self) -> None:
"""初始化管道调度器时, 初始化所有阶段"""
Expand All @@ -49,7 +51,9 @@ async def _process_stages(self, event: AstrMessageEvent, from_stage=0) -> None:

if isinstance(coroutine, AsyncGenerator):
# 如果返回的是异步生成器, 实现洋葱模型的核心
did_yield = False
async for _ in coroutine:
did_yield = True
# 此处是前置处理完成后的暂停点(yield), 下面开始执行后续阶段
if event.is_stopped():
logger.debug(
Expand All @@ -66,6 +70,10 @@ async def _process_stages(self, event: AstrMessageEvent, from_stage=0) -> None:
f"阶段 {stage.__class__.__name__} 已终止事件传播。",
)
break

# 洋葱阶段已通过递归处理了后续所有阶段,跳出外层循环避免重复执行
if did_yield:
break
else:
# 如果返回的是普通协程(不含yield的async函数), 则不进入下一层(基线条件)
# 简单地等待它执行完成, 然后继续执行下一个阶段
Expand All @@ -83,6 +91,7 @@ async def execute(self, event: AstrMessageEvent) -> None:

"""
active_event_registry.register(event)
emoji = await self.pre_ack_emoji_mgr.add_emoji(event)
try:
await self._process_stages(event)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

PR 描述中提到修复了洋葱模型重复执行后续阶段的 bug,并且添加了对应的单元测试 (test_pipeline_execution.py)。但我在 _process_stages 方法中没有看到对应的修复。

目前的实现中,当一个 generator stage (i) 执行完毕后(async for 循环结束),外层 for 循环会继续执行 i+1,导致 i+1 以及之后的所有阶段被再次执行。这会导致 test_pipeline_execution.py 中的测试失败。

为了修复这个问题,需要在 async for 循环之后判断是否真的发生了 yield。如果发生了,就应该 break 外层循环,因为后续阶段已经在递归调用 _process_stages(event, i + 1) 中处理过了。

建议将 _process_stages 方法修改如下:

    async def _process_stages(self, event: AstrMessageEvent, from_stage=0) -> None:
        """依次执行各个阶段

        Args:
            event (AstrMessageEvent): 事件对象
            from_stage (int): 从第几个阶段开始执行, 默认从0开始

        """
        for i in range(from_stage, len(self.stages)):
            stage = self.stages[i]  # 获取当前要执行的阶段
            coroutine = stage.process(event)

            if isinstance(coroutine, AsyncGenerator):
                did_yield = False
                async for _ in coroutine:
                    did_yield = True
                    # 此处是前置处理完成后的暂停点(yield), 下面开始执行后续阶段
                    if event.is_stopped():
                        logger.debug(
                            f"阶段 {stage.__class__.__name__} 已终止事件传播。",
                        )
                        break

                    # 递归调用, 处理所有后续阶段
                    await self._process_stages(event, i + 1)

                    # 此处是后续所有阶段处理完毕后返回的点, 执行后置处理
                    if event.is_stopped():
                        logger.debug(
                            f"阶段 {stage.__class__.__name__} 已终止事件传播。",
                        )
                        break
                
                if did_yield:
                    break
            else:
                # 如果返回的是普通协程(不含yield的async函数), 则不进入下一层(基线条件)
                # 简单地等待它执行完成, 然后继续执行下一个阶段
                await coroutine

                if event.is_stopped():
                    logger.debug(f"阶段 {stage.__class__.__name__} 已终止事件传播。")
                    break

这个修改引入了 did_yield 标志位,确保只有在 generator stage 真正 yield 并进入内层逻辑后,才会中断外层循环,从而避免了重复执行的问题。对于没有 yield 的 generator stage,其行为保持不变,会继续执行后续 stage.

    async def _process_stages(self, event: AstrMessageEvent, from_stage=0) -> None:
        """依次执行各个阶段

        Args:
            event (AstrMessageEvent): 事件对象
            from_stage (int): 从第几个阶段开始执行, 默认从0开始

        """
        for i in range(from_stage, len(self.stages)):
            stage = self.stages[i]  # 获取当前要执行的阶段
            coroutine = stage.process(event)

            if isinstance(coroutine, AsyncGenerator):
                did_yield = False
                async for _ in coroutine:
                    did_yield = True
                    # 此处是前置处理完成后的暂停点(yield), 下面开始执行后续阶段
                    if event.is_stopped():
                        logger.debug(
                            f"阶段 {stage.__class__.__name__} 已终止事件传播。",
                        )
                        break

                    # 递归调用, 处理所有后续阶段
                    await self._process_stages(event, i + 1)

                    # 此处是后续所有阶段处理完毕后返回的点, 执行后置处理
                    if event.is_stopped():
                        logger.debug(
                            f"阶段 {stage.__class__.__name__} 已终止事件传播。",
                        )
                        break
                
                if did_yield:
                    break
            else:
                # 如果返回的是普通协程(不含yield的async函数), 则不进入下一层(基线条件)
                # 简单地等待它执行完成, 然后继续执行下一个阶段
                await coroutine

                if event.is_stopped():
                    logger.debug(f"阶段 {stage.__class__.__name__} 已终止事件传播。")
                    break


Expand All @@ -92,4 +101,5 @@ async def execute(self, event: AstrMessageEvent) -> None:

logger.debug("pipeline 执行完毕。")
finally:
await self.pre_ack_emoji_mgr.remove_emoji(event, emoji)
active_event_registry.unregister(event)
20 changes: 18 additions & 2 deletions astrbot/core/platform/astr_message_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,15 +451,31 @@ async def send(self, message: MessageChain) -> None:
)
self._has_send_oper = True

async def react(self, emoji: str) -> None:
async def react(self, emoji: str) -> str | None:
"""对消息添加表情回应。

默认实现为发送一条包含该表情的消息。
注意:此实现并不一定符合所有平台的原生表情回应行为。
注意:此实现并不一定符合所有平台的原生"表情回应"行为。
如需支持平台原生的消息反应功能,请在对应平台的子类中重写本方法。

Returns:
平台特定的 reaction 标识符(如飞书的 reaction_id),用于后续撤回。
大多数平台返回 None。
"""
await self.send(MessageChain([Plain(emoji)]))

async def remove_react(self, emoji: str, reaction_id: str | None = None) -> None:
"""移除消息上的表情回应。

默认实现为空操作。
如需支持平台原生的撤回表情功能,请在对应平台的子类中重写本方法。

Args:
emoji: 要移除的表情
reaction_id: 平台特定的 reaction 标识符(如飞书的 reaction_id)
"""
pass

async def get_group(self, group_id: str | None = None, **kwargs) -> Group | None:
"""获取一个群聊的数据, 如果不填写 group_id: 如果是私聊消息,返回 None。如果是群聊消息,返回当前群聊的数据。

Expand Down
11 changes: 11 additions & 0 deletions astrbot/core/platform/sources/discord/discord_platform_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,17 @@ async def react(self, emoji: str) -> None:
except Exception as e:
logger.error(f"[Discord] 添加反应失败: {e}")

async def remove_react(self, emoji: str, reaction_id: str | None = None) -> None:
"""移除 bot 在原消息上的表情回应"""
try:
if not hasattr(self.message_obj, "raw_message"):
return
raw = self.message_obj.raw_message
if hasattr(raw, "remove_reaction") and self.client.user:
await cast(discord.Message, raw).remove_reaction(emoji, self.client.user)
except Exception as e:
logger.warning(f"[Discord] 移除反应失败: {e}")

def is_slash_command(self) -> bool:
"""判断是否为斜杠命令"""
return (
Expand Down
27 changes: 26 additions & 1 deletion astrbot/core/platform/sources/lark/lark_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
CreateImageRequestBody,
CreateMessageReactionRequest,
CreateMessageReactionRequestBody,
DeleteMessageReactionRequest,
Emoji,
ReplyMessageRequest,
ReplyMessageRequestBody,
Expand Down Expand Up @@ -534,7 +535,7 @@ async def _send_media_message(
receive_id_type=receive_id_type,
)

async def react(self, emoji: str) -> None:
async def react(self, emoji: str) -> str | None:
if self.bot.im is None:
logger.error("[Lark] API Client im 模块未初始化,无法发送表情")
return
Expand All @@ -553,8 +554,32 @@ async def react(self, emoji: str) -> None:
response = await self.bot.im.v1.message_reaction.acreate(request)
if not response.success():
logger.error(f"发送飞书表情回应失败({response.code}): {response.msg}")
return None

# 返回 reaction_id 供调用方保存(如 PreAckEmojiManager 用于后续撤回)
if response.data and response.data.reaction_id:
return response.data.reaction_id
return None

async def remove_react(self, emoji: str, reaction_id: str | None = None) -> None:
if not reaction_id:
return

if self.bot.im is None:
logger.warning("[Lark] API Client im 模块未初始化,无法撤回表情")
return

request = (
DeleteMessageReactionRequest.builder()
.message_id(self.message_obj.message_id)
.reaction_id(reaction_id)
.build()
)

response = await self.bot.im.v1.message_reaction.adelete(request)
if not response.success():
logger.warning(f"撤回飞书表情回应失败({response.code}): {response.msg}")

async def send_streaming(self, generator, use_fallback: bool = False):
buffer = None
async for chain in generator:
Expand Down
4 changes: 4 additions & 0 deletions astrbot/core/platform/sources/telegram/tg_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,10 @@ async def react(self, emoji: str | None, big: bool = False) -> None:
except Exception as e:
logger.error(f"[Telegram] 添加反应失败: {e}")

async def remove_react(self, emoji: str, reaction_id: str | None = None) -> None:
"""移除 bot 在原消息上的表情回应(Telegram 通过设置空 reaction 列表实现)"""
await self.react(None)

async def _send_message_draft(
self,
chat_id: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,9 @@
"emojis": {
"description": "Emoji List (Lark Emoji Enum Names)",
"hint": "Emoji enum names reference: [https://open.feishu.cn/document/server-docs/im-v1/message-reaction/emojis-introduce](https://open.feishu.cn/document/server-docs/im-v1/message-reaction/emojis-introduce)"
},
"auto_remove": {
"description": "[Lark] Auto-remove emoji after processing"
}
}
},
Expand All @@ -809,6 +812,9 @@
"emojis": {
"description": "Emoji List (Unicode)",
"hint": "Telegram only supports a fixed reaction set, reference: [https://gist.github.com/Soulter/3f22c8e5f9c7e152e967e8bc28c97fc9](https://gist.github.com/Soulter/3f22c8e5f9c7e152e967e8bc28c97fc9)"
},
"auto_remove": {
"description": "[Telegram] Auto-remove emoji after processing"
}
}
},
Expand All @@ -820,6 +826,9 @@
"emojis": {
"description": "Emoji List (Unicode or Custom Emoji Name)",
"hint": "Enter Unicode emoji symbols, e.g., 👍, 🤔, ⏳"
},
"auto_remove": {
"description": "[Discord] Auto-remove emoji after processing"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,9 @@
"emojis": {
"description": "表情列表(飞书表情枚举名)",
"hint": "表情枚举名参考:[https://open.feishu.cn/document/server-docs/im-v1/message-reaction/emojis-introduce](https://open.feishu.cn/document/server-docs/im-v1/message-reaction/emojis-introduce)"
},
"auto_remove": {
"description": "[飞书] 处理完毕后自动撤回表情"
}
}
},
Expand All @@ -812,6 +815,9 @@
"emojis": {
"description": "表情列表(Unicode)",
"hint": "Telegram 仅支持固定反应集合,参考:[https://gist.github.com/Soulter/3f22c8e5f9c7e152e967e8bc28c97fc9](https://gist.github.com/Soulter/3f22c8e5f9c7e152e967e8bc28c97fc9)"
},
"auto_remove": {
"description": "[Telegram] 处理完毕后自动撤回表情"
}
}
},
Expand All @@ -823,6 +829,9 @@
"emojis": {
"description": "表情列表(Unicode 或自定义表情名)",
"hint": "填写 Unicode 表情符号,例如:👍、🤔、⏳"
},
"auto_remove": {
"description": "[Discord] 处理完毕后自动撤回表情"
}
}
}
Expand Down
Loading