Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions astrbot/core/config/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@
},
},
"platform": [],
"event_bus_dedup_ttl_seconds": 0.5,
"platform_specific": {
# 平台特异配置:按平台分类,平台下按功能分组
"lark": {
Expand Down Expand Up @@ -328,6 +329,9 @@ class ChatProviderTemplate(TypedDict):
"secret": "",
"enable_group_c2c": True,
"enable_guild_direct_message": True,
"dedup_message_id_ttl_seconds": 1800.0,
"dedup_content_key_ttl_seconds": 3.0,
"dedup_cleanup_interval_seconds": 1.0,
},
"QQ 官方机器人(Webhook)": {
"id": "default",
Expand Down Expand Up @@ -774,6 +778,21 @@ class ChatProviderTemplate(TypedDict):
"type": "bool",
"hint": "启用后,机器人可以接收到频道的私聊消息。",
},
"dedup_message_id_ttl_seconds": {
"description": "消息 ID 去重窗口(秒)",
"type": "float",
"hint": "QQ 官方适配器中 message_id 去重窗口,默认 1800 秒。",
},
"dedup_content_key_ttl_seconds": {
"description": "内容键去重窗口(秒)",
"type": "float",
"hint": "QQ 官方适配器中 sender+content hash 去重窗口,默认 3 秒。",
},
"dedup_cleanup_interval_seconds": {
"description": "去重缓存清理间隔(秒)",
"type": "float",
"hint": "QQ 官方适配器去重缓存的增量清理间隔,默认 1 秒。",
},
"ws_reverse_host": {
"description": "反向 Websocket 主机",
"type": "string",
Expand Down
14 changes: 14 additions & 0 deletions astrbot/core/event_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
from astrbot.core import logger
from astrbot.core.astrbot_config_mgr import AstrBotConfigManager
from astrbot.core.pipeline.scheduler import PipelineScheduler
from astrbot.core.utils.number_utils import safe_positive_float

from .event_dedup import EventDeduplicator
from .platform import AstrMessageEvent


Expand All @@ -33,10 +35,22 @@ def __init__(
# abconf uuid -> scheduler
self.pipeline_scheduler_mapping = pipeline_scheduler_mapping
self.astrbot_config_mgr = astrbot_config_mgr
dedup_ttl_seconds = safe_positive_float(
self.astrbot_config_mgr.g(
None,
"event_bus_dedup_ttl_seconds",
0.5,
),
default=0.5,
)
self._deduplicator = EventDeduplicator(ttl_seconds=dedup_ttl_seconds)

async def dispatch(self) -> None:
# event_queue 由单一消费者处理;去重结构不是线程安全的,按设计仅在此循环中使用。
while True:
event: AstrMessageEvent = await self.event_queue.get()
if self._deduplicator.is_duplicate(event):
continue
conf_info = self.astrbot_config_mgr.get_conf_info(event.unified_msg_origin)
conf_id = conf_info["id"]
conf_name = conf_info.get("name") or conf_id
Expand Down
65 changes: 65 additions & 0 deletions astrbot/core/event_dedup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from astrbot.core import logger
from astrbot.core.message.utils import (
build_content_dedup_key,
build_message_id_dedup_key,
)
from astrbot.core.utils.ttl_registry import TTLKeyRegistry

from .platform import AstrMessageEvent


class EventDeduplicator:
def __init__(self, ttl_seconds: float = 0.5) -> None:
self._registry = TTLKeyRegistry(ttl_seconds=ttl_seconds)

def is_duplicate(self, event: AstrMessageEvent) -> bool:
if self._registry.ttl_seconds == 0:
Comment on lines +15 to +16
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Event deduplication is disabled only when TTL is exactly 0; it might be safer to treat all non-positive TTLs as disabled.

Here is_duplicate short-circuits only when self._registry.ttl_seconds == 0. Even though safe_positive_float should prevent negative values, using <= 0 would be more robust against unexpected config or future parsing changes, and would align with _clean_expired, which already treats ttl_seconds <= 0 as disabling cleanup.

return False

message_id_key = self._build_message_id_key(event)
if message_id_key is not None:
if self._registry.contains(message_id_key):
logger.debug(
"Skip duplicate event in event_bus (by message_id): umo=%s, sender=%s",
event.unified_msg_origin,
event.get_sender_id(),
)
return True
self._registry.add(message_id_key)

content_key = self._build_content_key(event)
if self._registry.contains(content_key):
logger.debug(
"Skip duplicate event in event_bus (by content): umo=%s, sender=%s",
event.unified_msg_origin,
event.get_sender_id(),
)
if message_id_key is not None:
self._registry.discard(message_id_key)
return True

self._registry.add(content_key)
return False

@staticmethod
def _build_content_key(event: AstrMessageEvent) -> str:
return build_content_dedup_key(
platform_id=str(event.get_platform_id() or ""),
unified_msg_origin=str(event.unified_msg_origin or ""),
sender_id=str(event.get_sender_id() or ""),
text=str(event.get_message_str() or ""),
components=event.get_messages(),
)

@staticmethod
def _build_message_id_key(event: AstrMessageEvent) -> str | None:
message_id = getattr(event.message_obj, "message_id", "") or getattr(
event.message_obj,
"id",
"",
)
return build_message_id_dedup_key(
platform_id=str(event.get_platform_id() or ""),
unified_msg_origin=str(event.unified_msg_origin or ""),
message_id=str(message_id or ""),
)
101 changes: 101 additions & 0 deletions astrbot/core/message/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
"""Message utilities for deduplication and component handling."""

import hashlib
from collections.abc import Iterable

from astrbot.core.message.components import BaseMessageComponent, File, Image

_MAX_RAW_TEXT_FINGERPRINT_LEN = 256


def build_component_dedup_signature(
components: Iterable[BaseMessageComponent],
) -> str:
"""Build a deduplication signature from message components.

This function extracts unique identifiers from Image and File components
and creates a hash-based signature for deduplication purposes.

Args:
components: An iterable of message components to analyze.

Returns:
A SHA1 hash (16 hex characters) representing the component signatures,
or an empty string if no valid components are found.
"""
parts: list[str] = []
for component in components:
if isinstance(component, Image):
# Image can have url, file, or file_unique
ref = component.url or component.file or component.file_unique or ""
if ref:
parts.append(f"img:{ref}")
elif isinstance(component, File):
# File can have url, file (via property), or name
ref = component.url or component.file or component.name or ""
if ref:
parts.append(f"file:{ref}")
# Future component types can be added here

if not parts:
return ""

payload = "|".join(parts)
return hashlib.sha1(payload.encode("utf-8")).hexdigest()[:16]


def build_sender_content_dedup_key(content: str, sender_id: str) -> str | None:
"""Build a sender+content hash key for short-window deduplication."""
if not (content and sender_id):
return None
content_hash = hashlib.sha1(content.encode("utf-8")).hexdigest()[:16]
return f"{sender_id}:{content_hash}"


def build_content_dedup_key(
*,
platform_id: str,
unified_msg_origin: str,
sender_id: str,
text: str,
components: Iterable[BaseMessageComponent],
) -> str:
"""Build a content fingerprint key for event deduplication."""
msg_text = str(text or "").strip()
if len(msg_text) <= _MAX_RAW_TEXT_FINGERPRINT_LEN:
msg_sig = msg_text
else:
msg_hash = hashlib.sha1(msg_text.encode("utf-8")).hexdigest()[:16]
msg_sig = f"h:{len(msg_text)}:{msg_hash}"

attach_sig = build_component_dedup_signature(components)
return "|".join(
[
"content",
str(platform_id or ""),
str(unified_msg_origin or ""),
str(sender_id or ""),
msg_sig,
attach_sig,
]
)


def build_message_id_dedup_key(
*,
platform_id: str,
unified_msg_origin: str,
message_id: str,
) -> str | None:
"""Build a message_id fingerprint key for event deduplication."""
normalized_message_id = str(message_id or "")
if not normalized_message_id:
return None
return "|".join(
[
"message_id",
str(platform_id or ""),
str(unified_msg_origin or ""),
normalized_message_id,
]
)
9 changes: 9 additions & 0 deletions astrbot/core/platform/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,15 @@ async def load_platform(self, platform_config: dict) -> None:
)
return

# 防御式处理:避免同一平台 ID 被重复加载导致消息重复消费。
if platform_id in self._inst_map:
logger.warning(
"平台 %s(%s) 已存在实例,先终止旧实例再重载。",
platform_config["type"],
platform_id,
)
await self.terminate_platform(platform_id)

logger.info(
f"载入 {platform_config['type']}({platform_config['id']}) 平台适配器 ...",
)
Expand Down
Loading