Skip to content

fix(platform): prevent qq_official duplicate message consumption (#5848)#6519

Open
001VIsir wants to merge 5 commits intoAstrBotDevs:masterfrom
001VIsir:visir_fix
Open

fix(platform): prevent qq_official duplicate message consumption (#5848)#6519
001VIsir wants to merge 5 commits intoAstrBotDevs:masterfrom
001VIsir:visir_fix

Conversation

@001VIsir
Copy link
Copy Markdown

@001VIsir 001VIsir commented Mar 17, 2026

Fixes #5848


qq_official 场景下,消息可能被重复消费,导致同一输入触发多次回复,严重时会造成接口异常/封禁风险。
从 issue 日志看,重复事件会在短时间内连续进入处理链,且在运行一段时间后有“递增-回落-再递增”的趋势。
本 PR 的目标是从平台入口、事件总线、平台实例管理三个层面同时消除重复消费,并避免过度去重误拦截正常消息。

Modifications / 改动点

本次核心修改文件如下:

  • astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py
  • astrbot/core/event_bus.py
  • astrbot/core/platform/manager.py
  • astrbot/core/config/default.py
  • astrbot/core/utils/ttl_registry.py
  • astrbot/core/utils/number_utils.py
  • astrbot/core/message/utils.py
    主要改动说明:
  1. QQ Official 入口去重(平台层)
    • 新增 MessageDeduplicator,拆分两类去重:
      • message_id 长窗口去重(默认 1800s
      • sender_id + content_hash 短窗口去重(默认 3s
    • group/channel/direct/c2c 四类入口回调统一前置判重,命中即丢弃重复消息。
    • 统一 sender_id 提取优先级(user_openid > member_openid > id),减少不同回调路径导致的判重漂移。
  2. EventBus 全局去重兜底(调度前)
    • dispatch 前新增 EventDeduplicator
      • 先按 message_id 快速短路判重;
      • 再按内容指纹判重(平台、来源、发送者、文本、附件签名)。
    • 新增可配置项 event_bus_dedup_ttl_seconds(默认 0.5s,支持 0 关闭)。
  3. 平台实例防重复加载(实例管理层)
    • PlatformManager.load_platform 中增加防御逻辑:
      • 若同 platform_id 实例已存在,先终止旧实例再加载新实例。
    • 避免同 ID 多实例并发消费导致“一条消息被处理多次”。
  4. 通用基础能力抽离
    • 新增 TTLKeyRegistry(TTL 去重注册表,使用 time.monotonic(),支持清理节流)。
    • 新增 safe_positive_float(配置解析安全化,允许 0 作为禁用值)。
    • 新增消息组件签名工具,将附件特征纳入去重指纹,降低误判。
  • This is NOT a breaking change. / 这不是一个破坏性变更。

Screenshots or Test Results / 运行截图或测试结果

Verification Steps / 验证步骤:

一、改动前

  1. 使用 qq_official 适配器启动,保持默认去重配置:
    -打开启用消息列表单聊功能和启用频道私聊功能
bug3
  1. 在短时间内重复发送相同消息:
    • 触发多次回复
  • 由下图可见发送12两条消息后日志里却收到2条1和2条 2一共四条消息
bug bug2

一、改动后

  1. 使用 qq_official 适配器启动,保持默认去重配置:
    -打开启用消息列表单聊功能和启用频道私聊功能
  2. 在短时间内重复发送相同消息:
    ** 可以看见消息并没有重复消费,测试了3分钟一切正常**
    debug2
    debug4

Checklist / 检查清单

  • 😊 If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.
    / 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。
    (本 PR 为 bugfix,无新增功能)
  • 👀 My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
    / 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”
  • 🤓 I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in requirements.txt and pyproject.toml.
    / 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到 requirements.txtpyproject.toml 文件相应位置。
  • 😮 My changes do not introduce malicious code.
    / 我的更改没有引入恶意代码。

Summary by Sourcery

Prevent duplicate message processing for the qq_official platform by adding multi-layer deduplication and safer configuration handling.

Bug Fixes:

  • Avoid repeated qq_official message consumption by introducing per-adapter and global event bus deduplication keyed by message ID and content fingerprints.
  • Prevent duplicate processing caused by multiple platform instances sharing the same platform ID by terminating old instances before reloading.

Enhancements:

  • Unify sender ID extraction for qq_official messages to ensure consistent identification across message types.
  • Introduce a reusable TTL-based key registry and message deduplication utilities for IDs, content, and attachments.
  • Add a safe_positive_float helper to robustly parse numeric configuration values and support disabling features via zero values.

@dosubot dosubot bot added the size:L This PR changes 100-499 lines, ignoring generated files. label Mar 17, 2026
@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses a critical issue where messages from the qq_official platform could be consumed multiple times, leading to repeated responses and potential API rate limit issues. The changes introduce a robust, multi-tiered deduplication system that filters out duplicate messages at the platform entry point, within the event bus, and during platform instance loading. This significantly enhances the system's stability and reliability by ensuring each message is processed only once, preventing unintended side effects and improving overall performance.

Highlights

  • Multi-layered Deduplication for QQ Official: Implemented a new MessageDeduplicator within the qq_official platform adapter to prevent duplicate message consumption. This includes long-window deduplication based on message_id (1800s) and short-window deduplication based on sender_id and content_hash (3s). Deduplication is applied uniformly across group, channel, direct, and C2C message callbacks.
  • Global Event Bus Deduplication: Introduced an EventDeduplicator into the EventBus to provide a global fallback deduplication mechanism before event dispatch. It prioritizes message_id checks for efficiency and then uses a comprehensive content fingerprint (platform, source, sender, text, attachment signature) for more robust deduplication. A new configurable setting, event_bus_dedup_ttl_seconds, controls its time-to-live.
  • Platform Instance Management Enhancement: Added defensive logic to PlatformManager.load_platform to prevent duplicate loading of platform instances. If an instance with the same platform_id already exists, the old instance is terminated before a new one is loaded, ensuring only one active instance per platform ID and preventing redundant message processing.
  • New Utility Modules: Extracted common functionalities into new utility modules: TTLKeyRegistry for generic time-to-live key management with throttled cleanup, safe_positive_float for robust parsing of positive float configuration values (allowing 0 to disable features), and a message component signature tool to include attachment features in deduplication fingerprints.
Changelog
  • astrbot/core/config/default.py
    • Added event_bus_dedup_ttl_seconds configuration option with a default value of 0.5 seconds.
    • Introduced dedup_message_id_ttl_seconds, dedup_content_key_ttl_seconds, and dedup_cleanup_interval_seconds to the QQ Official platform configuration, along with their descriptions and hints.
  • astrbot/core/event_bus.py
    • Imported hashlib, build_component_dedup_signature, safe_positive_float, and TTLKeyRegistry.
    • Implemented the EventDeduplicator class to manage event deduplication using TTL-based registries.
    • Modified the EventBus.dispatch method to utilize the new EventDeduplicator to skip duplicate events, initialized with a configurable TTL.
  • astrbot/core/message/utils.py
    • Added a new file containing the build_component_dedup_signature function, which generates a hash-based signature from message components (images and files) for deduplication purposes.
  • astrbot/core/platform/manager.py
    • Enhanced the load_platform method to prevent duplicate platform instance loading by terminating any existing instance with the same platform_id before loading a new one.
  • astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py
    • Imported hashlib, safe_positive_float, and TTLKeyRegistry, and removed SimpleNamespace import.
    • Created _extract_sender_id to standardize sender ID extraction logic across different message types.
    • Implemented the MessageDeduplicator class for platform-specific message deduplication, using separate TTL registries for message IDs and content keys.
    • Integrated deduplication checks (_should_drop_message) into all incoming message handlers (on_group_at_message_create, on_at_message_create, on_direct_message_create, on_c2c_message_create) to filter out duplicates early.
    • Initialized the platform's _deduplicator with configurable TTLs and cleanup intervals.
    • Updated _send_by_session_common to directly use self.client instead of SimpleNamespace.
    • Modified _parse_from_qqofficial to use the standardized _extract_sender_id for MessageMember creation and added a type cast for the message object.
  • astrbot/core/utils/number_utils.py
    • Added a new file containing the safe_positive_float function, which safely parses a value into a positive float, treating 0 as a valid positive value.
  • astrbot/core/utils/ttl_registry.py
    • Added a new file containing the TTLKeyRegistry class, a generic utility for managing keys with a time-to-live, supporting configurable cleanup intervals.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 3 issues

Prompt for AI Agents
Please address the comments from this code review:

## Individual Comments

### Comment 1
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py" line_range="90-98" />
<code_context>
+        content_hash = hashlib.sha1(content.encode("utf-8")).hexdigest()[:16]
+        return f"{sender_id}:{content_hash}"
+
+    async def is_duplicate(
+        self,
+        message_id: str,
+        content: str = "",
+        sender_id: str = "",
+    ) -> bool:
+        async with self._lock:
+            # Bypass deduplication if TTL is 0 (disabled)
+            if self._message_ids.ttl_seconds == 0:
+                return False
+
</code_context>
<issue_to_address>
**question (bug_risk):** Disabling message-id dedup via TTL=0 also disables content-based dedup, which may be stricter than intended.

In `MessageDeduplicator.is_duplicate`, this check:
```python
if self._message_ids.ttl_seconds == 0:
    return False
```
causes `dedup_message_id_ttl_seconds = 0` to disable *all* deduplication, even when `dedup_content_key_ttl_seconds` is > 0. With separate config knobs, callers may expect to disable ID-based dedup while keeping content-based dedup.

To keep the knobs independent, consider moving the `ttl_seconds == 0` check into the message-ID branch only, and still executing the content-key logic when `self._content_keys.ttl_seconds > 0`.
</issue_to_address>

### Comment 2
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py" line_range="135" />
<code_context>
     def set_platform(self, platform: QQOfficialPlatformAdapter) -> None:
         self.platform = platform

+    def _get_sender_id(self, message) -> str:
+        """Extract sender ID from different message types.
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying the deduplication and sender-id logic by collapsing redundant helper methods and centralizing shared content-key construction into a single utility.

You can trim a good amount of indirection and duplicated logic without changing behavior.

### 1. Collapse sender-id helpers

`_extract_sender_id` and `botClient._get_sender_id` just call each other. You can remove `_get_sender_id` and use the module-level helper directly:

```python
# keep this as the single source of truth
def _extract_sender_id(message) -> str:
    ...
```

```python
class botClient(Client):
    def set_platform(self, platform: QQOfficialPlatformAdapter) -> None:
        self.platform = platform

    # remove _get_sender_id

    async def _should_drop_message(self, message) -> bool:
        sender_id = _extract_sender_id(message)
        content = getattr(message, "content", "") or ""
        return await self.platform._is_duplicate_message(message.id, content, sender_id)
```

This removes one layer without changing semantics.

### 2. Flatten dedup wrapper chain

Right now the path is:

`handler → _should_drop_message → _extract_dedup_key → platform._is_duplicate_message → MessageDeduplicator.is_duplicate`.

`_extract_dedup_key` is small and only used in one place; you can inline it into `_should_drop_message` as above and keep `_is_duplicate_message` as the single platform-facing hook:

```python
class botClient(Client):
    ...

    async def _should_drop_message(self, message) -> bool:
        sender_id = _extract_sender_id(message)
        content = getattr(message, "content", "") or ""
        return await self.platform._is_duplicate_message(message.id, content, sender_id)
```

Then remove `_extract_dedup_key` entirely.

This makes each handler’s flow easier to follow without altering functionality.

### 3. Extract a shared content-key builder

To avoid duplicating content-key semantics between `MessageDeduplicator` and `EventDeduplicator`, you can move `_build_content_key` into a shared utility function and call it from both:

```python
# e.g. in a shared module, or at module level if both can import it
def build_sender_content_key(content: str, sender_id: str) -> str | None:
    if not (content and sender_id):
        return None
    content_hash = hashlib.sha1(content.encode("utf-8")).hexdigest()[:16]
    return f"{sender_id}:{content_hash}"
```

```python
class MessageDeduplicator:
    ...

    def _build_content_key(self, content: str, sender_id: str) -> str | None:
        return build_sender_content_key(content, sender_id)
```

And in `EventDeduplicator`:

```python
class EventDeduplicator:
    ...

    def _build_content_key(self, content: str, sender_id: str) -> str | None:
        return build_sender_content_key(content, sender_id)
```

This keeps all content-dedup semantics in a single place while preserving your current behavior and TTL-registry usage.
</issue_to_address>

### Comment 3
<location path="astrbot/core/event_bus.py" line_range="27" />
<code_context>
 from .platform import AstrMessageEvent


+class EventDeduplicator:
+    """Event deduplicator using TTL-based registry.
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting the deduplication key-building logic into shared utilities and optionally using a generic TTL-based deduplicator to simplify and reuse the EventDeduplicator logic.

You can reduce the added complexity without changing behavior by:

1. **Extracting key-building into reusable pure functions**, and  
2. **Making the ID-first/content-second policy explicit and generic** so `EventDeduplicator` is just configuration.

### 1. Extract key-building into shared utilities

Move the key-building logic out of `EventDeduplicator` so it’s testable in isolation and reusable (e.g. by `MessageDeduplicator`).

```python
# astrbot/core/message/utils.py
import hashlib
from astrbot.core.message.utils import build_component_dedup_signature  # existing
from astrbot.core.platform import AstrMessageEvent  # adjust import path as needed

_MAX_RAW_TEXT_FINGERPRINT_LEN = 256

def build_event_content_dedup_key(event: AstrMessageEvent) -> str:
    msg_text = (event.get_message_str() or "").strip()
    if len(msg_text) <= _MAX_RAW_TEXT_FINGERPRINT_LEN:
        msg_sig = msg_text
    else:
        msg_hash = hashlib.sha1(msg_text.encode("utf-8")).hexdigest()[:16]
        msg_sig = f"h:{len(msg_text)}:{msg_hash}"

    attach_sig = build_component_dedup_signature(event.get_messages())
    return "|".join([
        "content",
        event.get_platform_id() or "",
        event.unified_msg_origin or "",
        event.get_sender_id() or "",
        msg_sig,
        attach_sig,
    ])


def build_event_message_id_dedup_key(event: AstrMessageEvent) -> str | None:
    message_id = str(getattr(event.message_obj, "message_id", "") or "")
    if not message_id:
        message_id = str(getattr(event.message_obj, "id", "") or "")
    if not message_id:
        return None
    return "|".join([
        "message_id",
        event.get_platform_id() or "",
        event.unified_msg_origin or "",
        message_id,
    ])
```

Then `EventDeduplicator` becomes much thinner and no longer tightly couples the algorithm to the class:

```python
# event_bus.py
from astrbot.core.message.utils import (
    build_event_content_dedup_key,
    build_event_message_id_dedup_key,
)

class EventDeduplicator:
    def __init__(self, ttl_seconds: float = 0.5) -> None:
        self._registry = TTLKeyRegistry(ttl_seconds)

    def is_duplicate(self, event: AstrMessageEvent) -> bool:
        if self._registry.ttl_seconds == 0:
            return False

        message_id_key = build_event_message_id_dedup_key(event)
        if message_id_key is not None:
            if self._registry.contains(message_id_key):
                logger.debug(
                    "Skip duplicate event in event_bus (by message_id): umo=%s, sender=%s",
                    event.unified_msg_origin,
                    event.get_sender_id(),
                )
                return True
            self._registry.add(message_id_key)

        content_key = build_event_content_dedup_key(event)
        if self._registry.contains(content_key):
            logger.debug(
                "Skip duplicate event in event_bus (by content): umo=%s, sender=%s",
                event.unified_msg_origin,
                event.get_sender_id(),
            )
            if message_id_key is not None:
                self._registry.discard(message_id_key)
            return True

        self._registry.add(content_key)
        return False
```

This keeps all semantics (including the “discard message-id on content duplicate” behavior) but makes the algorithm reusable and easier to test and reason about.

### 2. Introduce a generic TTL-based deduplicator (optional next step)

To unify with other dedup implementations (`MessageDeduplicator`), you can introduce a small generic helper that encapsulates the ID-first / content-second policy and accepts pluggable key extractors:

```python
# astrbot/core/utils/dedup.py
from collections.abc import Callable
from astrbot.core.utils.ttl_registry import TTLKeyRegistry

class TTLBasedDeduplicator:
    def __init__(
        self,
        ttl_seconds: float,
        build_id_key: Callable[[object], str | None],
        build_content_key: Callable[[object], str],
    ) -> None:
        self._registry = TTLKeyRegistry(ttl_seconds)
        self._build_id_key = build_id_key
        self._build_content_key = build_content_key

    def is_duplicate(self, obj: object) -> bool:
        if self._registry.ttl_seconds == 0:
            return False

        id_key = self._build_id_key(obj)
        if id_key is not None:
            if self._registry.contains(id_key):
                return True
            self._registry.add(id_key)

        content_key = self._build_content_key(obj)
        if self._registry.contains(content_key):
            if id_key is not None:
                self._registry.discard(id_key)
            return True

        self._registry.add(content_key)
        return False
```

Then `EventDeduplicator` becomes a thin adapter:

```python
from astrbot.core.utils.dedup import TTLBasedDeduplicator
from astrbot.core.message.utils import (
    build_event_content_dedup_key,
    build_event_message_id_dedup_key,
)

class EventDeduplicator(TTLBasedDeduplicator):
    def __init__(self, ttl_seconds: float = 0.5) -> None:
        super().__init__(
            ttl_seconds=ttl_seconds,
            build_id_key=build_event_message_id_dedup_key,
            build_content_key=build_event_content_dedup_key,
        )
```

`QQOfficialPlatformAdapter`’s `MessageDeduplicator` can be migrated to the same generic helper by supplying its own `build_id_key` and `build_content_key`, removing duplicated control-flow while preserving existing behavior.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@dosubot dosubot bot added the area:platform The bug / feature is about IM platform adapter, such as QQ, Lark, Telegram, WebChat and so on. label Mar 17, 2026
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

这个 PR 通过引入一个多层去重机制,很好地解决了 qq_official 适配器中消息重复消费的问题。分别在平台适配器入口、全局事件总线和平台管理器三个层面增加了处理逻辑,结构清晰,考虑周全。将 TTLKeyRegistrysafe_positive_float 等功能抽离为可复用的工具类,是很好的实践。整体方案看起来很可靠,从多个角度解决了问题。我发现了一个关于发送者 ID 提取逻辑的小问题,并提供了一个修复建议。

@dosubot dosubot bot added size:XL This PR changes 500-999 lines, ignoring generated files. and removed size:L This PR changes 100-499 lines, ignoring generated files. labels Mar 17, 2026
@001VIsir
Copy link
Copy Markdown
Author

@sourcery-ai review

Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 3 issues, and left some high level feedback:

  • TTLKeyRegistry is documented as a general-purpose utility but is not concurrency-safe; if there is any chance it will be reused outside the single-consumer EventBus/QQ adapter loops, consider adding a lightweight lock or documenting the single-threaded assumption explicitly in the class docstring.
  • EventBus deduplication currently logs duplicate events both inside EventDeduplicator.is_duplicate and again in dispatch; consider consolidating the logging in one place to avoid noisy duplicate log lines for the same event.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- TTLKeyRegistry is documented as a general-purpose utility but is not concurrency-safe; if there is any chance it will be reused outside the single-consumer EventBus/QQ adapter loops, consider adding a lightweight lock or documenting the single-threaded assumption explicitly in the class docstring.
- EventBus deduplication currently logs duplicate events both inside EventDeduplicator.is_duplicate and again in dispatch; consider consolidating the logging in one place to avoid noisy duplicate log lines for the same event.

## Individual Comments

### Comment 1
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py" line_range="153" />
<code_context>
     def set_platform(self, platform: QQOfficialPlatformAdapter) -> None:
         self.platform = platform

+    async def _should_drop_message(self, message) -> bool:
+        sender_id = _extract_sender_id(message)
+        content = getattr(message, "content", "") or ""
</code_context>
<issue_to_address>
**issue (complexity):** Consider inlining the deduplication decision into a single platform-level method to avoid an extra indirection layer between the client and the MessageDeduplicator.

You can reduce complexity without changing behavior by collapsing the extra indirection around the deduplicator.

Right now, the call chain is:

- `botClient._should_drop_message(message)`- `QQOfficialPlatformAdapter._is_duplicate_message(message_id, content, sender_id)`- `MessageDeduplicator.is_duplicate(...)`

For this adapter, that middle layer in `botClient` plus the thin wrapper in the platform adds cognitive overhead without giving much in return.

### Suggested refactor: single platform-level entrypoint

Move the message-field extraction and dedup decision into the platform, and let the client call a single method. This removes one internal layer and makes the flow easier to follow in the handlers.

**In `QQOfficialPlatformAdapter`:**

```python
class QQOfficialPlatformAdapter(Platform):
    # ...

    def __init__(self, platform_config: dict[str, Any]) -> None:
        # ...
        self._deduplicator = MessageDeduplicator(
            message_id_ttl_seconds=message_id_ttl_seconds,
            content_key_ttl_seconds=content_key_ttl_seconds,
            cleanup_interval_seconds=cleanup_interval_seconds,
        )

    async def should_handle_raw_message(self, message: Any) -> bool:
        """Return False if the message should be dropped as a duplicate."""
        sender_id = _extract_sender_id(message)
        content = getattr(message, "content", "") or ""
        return not await self._deduplicator.is_duplicate(
            message.id,
            content,
            sender_id,
        )
```

**In `botClient`:**

```python
class botClient(Client):
    def set_platform(self, platform: QQOfficialPlatformAdapter) -> None:
        self.platform = platform

    # Remove _should_drop_message entirely

    async def on_group_at_message_create(
        self,
        message: botpy.message.GroupMessage,
    ) -> None:
        if not await self.platform.should_handle_raw_message(message):
            return
        abm = QQOfficialPlatformAdapter._parse_from_qqofficial(
            message,
            MessageType.GROUP_MESSAGE,
        )
        # ...

    async def on_at_message_create(self, message: botpy.message.Message) -> None:
        if not await self.platform.should_handle_raw_message(message):
            return
        abm = QQOfficialPlatformAdapter._parse_from_qqofficial(
            message,
            MessageType.GROUP_MESSAGE,
        )
        # ...

    async def on_direct_message_create(
        self,
        message: botpy.message.DirectMessage,
    ) -> None:
        if not await self.platform.should_handle_raw_message(message):
            return
        abm = QQOfficialPlatformAdapter._parse_from_qqofficial(
            message,
            MessageType.FRIEND_MESSAGE,
        )
        # ...

    async def on_c2c_message_create(
        self,
        message: botpy.message.C2CMessage,
    ) -> None:
        if not await self.platform.should_handle_raw_message(message):
            return
        abm = QQOfficialPlatformAdapter._parse_from_qqofficial(
            message,
            MessageType.FRIEND_MESSAGE,
        )
        # ...
```

**And remove** the now-redundant platform wrapper:

```python
# Delete this; the logic has moved into should_handle_raw_message
async def _is_duplicate_message(
    self,
    message_id: str,
    content: str = "",
    sender_id: str = "",
) -> bool:
    return await self._deduplicator.is_duplicate(message_id, content, sender_id)
```

This keeps all functionality intact (same fields go into `MessageDeduplicator`, same timing of the check) while:

- Reducing the number of internal layers for “should I drop this message?”
- Centralizing the “raw incoming message → dedup decision” logic on the platform, which already owns `_deduplicator`.
</issue_to_address>

### Comment 2
<location path="astrbot/core/event_bus.py" line_range="29" />
<code_context>
 from .platform import AstrMessageEvent


+class EventDeduplicator:
+    """Event deduplicator using TTL-based registry.
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider inlining the deduplication logic into `EventBus` as a private method using a `TTLKeyRegistry` field instead of a separate `EventDeduplicator` wrapper class and duplicated logging.

You can reduce indirection and logging noise without changing behavior by inlining `EventDeduplicator` into `EventBus` and centralizing the dedup log.

### 1. Store `TTLKeyRegistry` directly on `EventBus`

Keep all config behavior, but drop the wrapper class:

```python
from astrbot.core.utils.ttl_registry import TTLKeyRegistry
from astrbot.core.utils.number_utils import safe_positive_float
from astrbot.core.message.utils import (
    build_event_content_dedup_key,
    build_event_message_id_dedup_key,
)

class EventBus:
    def __init__(
        self,
        event_queue: Queue,
        pipeline_scheduler_mapping: dict[str, PipelineScheduler],
        astrbot_config_mgr: AstrBotConfigManager,
    ) -> None:
        self.event_queue = event_queue
        self.pipeline_scheduler_mapping = pipeline_scheduler_mapping
        self.astrbot_config_mgr = astrbot_config_mgr

        dedup_ttl_seconds = safe_positive_float(
            self.astrbot_config_mgr.g(
                None,
                "event_bus_dedup_ttl_seconds",
                0.5,
            ),
            default=0.5,
        )
        self._dedup_registry = TTLKeyRegistry(dedup_ttl_seconds)
```

### 2. Inline `is_duplicate` as a private method

Preserve the exact logic (TTL 0 special case, message_id short‑circuit, content dedup that discards message_id):

```python
class EventBus:
    ...

    def _is_duplicate(self, event: AstrMessageEvent) -> bool:
        # TTL of 0 means deduplication is disabled
        if self._dedup_registry.ttl_seconds == 0:
            return False

        message_id_key = build_event_message_id_dedup_key(event)
        if message_id_key is not None:
            if self._dedup_registry.contains(message_id_key):
                # Only log here to avoid double-logging
                logger.debug(
                    "Skip duplicate event in event_bus (by message_id): umo=%s, sender=%s",
                    event.unified_msg_origin,
                    event.get_sender_id(),
                )
                return True
            self._dedup_registry.add(message_id_key)

        content_key = build_event_content_dedup_key(event)
        if self._dedup_registry.contains(content_key):
            logger.debug(
                "Skip duplicate event in event_bus (by content): umo=%s, sender=%s",
                event.unified_msg_origin,
                event.get_sender_id(),
            )
            # Preserve existing behavior: discard message_id key on content duplicate
            if message_id_key is not None:
                self._dedup_registry.discard(message_id_key)
            return True

        self._dedup_registry.add(content_key)
        return False
```

### 3. Simplify `dispatch` and remove double logging

Let `_is_duplicate` be the single source of dedup logs:

```python
class EventBus:
    ...

    async def dispatch(self) -> None:
        # event_queue 由单一消费者处理;去重结构不是线程安全的,按设计仅在此循环中使用。
        while True:
            event: AstrMessageEvent = await self.event_queue.get()

            if self._is_duplicate(event):
                continue

            conf_info = self.astrbot_config_mgr.get_conf_info(event.unified_msg_origin)
            ...
```

This keeps all dedup behavior and config intact, while:

- Removing a thin wrapper class only used in one place.
- Collapsing the policy into a single, local `_is_duplicate` method.
- Ensuring deduplication is logged once in a predictable location.
</issue_to_address>

### Comment 3
<location path="astrbot/core/message/utils.py" line_range="60" />
<code_context>
+    return f"{sender_id}:{content_hash}"
+
+
+def build_event_content_dedup_key(event: "AstrMessageEvent") -> str:
+    """Build a content fingerprint key for EventBus deduplication."""
+    msg_text = str(event.get_message_str() or "").strip()
</code_context>
<issue_to_address>
**issue (complexity):** Consider refactoring these dedup helpers to operate on primitive message fields instead of AstrMessageEvent instances and moving the event-specific extraction into the EventBus layer.

You can reduce coupling/complexity by making the utils work on primitives instead of `AstrMessageEvent` directly, and let EventBus handle the event introspection.

### 1. Decouple from `AstrMessageEvent`

Right now `message/utils.py` knows a lot about `AstrMessageEvent` internals (`get_message_str`, `get_messages`, `get_platform_id`, `unified_msg_origin`, `get_sender_id`, `message_obj`). This makes the “generic” utils module depend on platform/event semantics.

You can keep the same behavior but change the API to accept plain values, and move the event-specific extraction into EventBus:

```python
# message/utils.py

def build_content_dedup_key(
    *,
    platform_id: str,
    unified_msg_origin: str,
    sender_id: str,
    text: str,
    components: Iterable[BaseMessageComponent],
) -> str:
    msg_text = (text or "").strip()
    if len(msg_text) <= _MAX_RAW_TEXT_FINGERPRINT_LEN:
        msg_sig = msg_text
    else:
        msg_hash = hashlib.sha1(msg_text.encode("utf-8")).hexdigest()[:16]
        msg_sig = f"h:{len(msg_text)}:{msg_hash}"

    attach_sig = build_component_dedup_signature(components)
    return "|".join(
        ["content", platform_id, unified_msg_origin, sender_id, msg_sig, attach_sig]
    )


def build_message_id_dedup_key(
    *,
    platform_id: str,
    unified_msg_origin: str,
    message_id: str,
) -> str | None:
    if not message_id:
        return None
    return "|".join(["message_id", platform_id, unified_msg_origin, message_id])
```

Then in EventBus (or a dedicated dedup module):

```python
# event_bus_dedup.py (or wherever the dedup logic lives)

from astrbot.core.message import utils as msg_utils

def make_event_content_dedup_key(event: AstrMessageEvent) -> str:
    return msg_utils.build_content_dedup_key(
        platform_id=str(event.get_platform_id() or ""),
        unified_msg_origin=str(event.unified_msg_origin or ""),
        sender_id=str(event.get_sender_id() or ""),
        text=str(event.get_message_str() or ""),
        components=event.get_messages(),
    )

def make_event_message_id_dedup_key(event: AstrMessageEvent) -> str | None:
    raw_id = getattr(event.message_obj, "message_id", "") or getattr(
        event.message_obj, "id", ""
    )
    return msg_utils.build_message_id_dedup_key(
        platform_id=str(event.get_platform_id() or ""),
        unified_msg_origin=str(event.unified_msg_origin or ""),
        message_id=str(raw_id or ""),
    )
```

This keeps all current functionality but:
- Removes `AstrMessageEvent` knowledge from `message/utils.py`.
- Makes the utils module genuinely “message/attachment oriented”.
- Localizes “what counts as a duplicate event” to the EventBus/dedup layer, where platform semantics already live.

### 2. Collapse event-specific key builders

With the above, `build_event_content_dedup_key` and `build_event_message_id_dedup_key` disappear from `message/utils.py` and are replaced by the two generic helpers shown. This reduces the number of “key builder” functions the reader must understand in the utils module without changing the actual dedup behavior.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@001VIsir
Copy link
Copy Markdown
Author

@sourcery-ai review

Copy link
Copy Markdown
Author

@001VIsir 001VIsir left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已实现的修改:

  1. qqofficial 去重链简化 — 删除 botClient._should_drop_message 和 platform._is_duplicate_message 中间层,改用 platform.should_handle_raw_message 单一入口
  2. EventBus 去重逻辑内联 — 移除 EventDeduplicator 包装类,去重逻辑直接做成 EventBus._is_duplicate() 私有方法,dispatch 里不再有重复日志
  3. message/utils 去耦合 — 把基于 AstrMessageEvent 的 dedup key 构建改为 primitive 参数(build_content_dedup_key / build_message_id_dedup_key),事件字段提取迁移到 EventBus 本地静态方法
  4. TTLKeyRegistry 文档 — 明确写明 "not thread-safe, designed for single-consumer/single-thread usage"

Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 3 issues, and left some high level feedback:

  • In QQOfficialPlatformAdapter._send_by_session_common the send_helper variable was changed from a SimpleNamespace(bot=self.client) wrapper to self.client directly; if downstream helpers still expect a .bot attribute this will break, so consider either preserving the wrapper or updating all call sites to match the new shape explicitly.
  • QQOfficialPlatformAdapter.should_handle_raw_message accesses message.id directly; for defensive robustness against malformed or test messages you may want to use getattr(message, 'id', '') (mirroring the guarded access patterns used elsewhere) before passing it into the deduplicator.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `QQOfficialPlatformAdapter._send_by_session_common` the `send_helper` variable was changed from a `SimpleNamespace(bot=self.client)` wrapper to `self.client` directly; if downstream helpers still expect a `.bot` attribute this will break, so consider either preserving the wrapper or updating all call sites to match the new shape explicitly.
- `QQOfficialPlatformAdapter.should_handle_raw_message` accesses `message.id` directly; for defensive robustness against malformed or test messages you may want to use `getattr(message, 'id', '')` (mirroring the guarded access patterns used elsewhere) before passing it into the deduplicator.

## Individual Comments

### Comment 1
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py" line_range="281-285" />
<code_context>
+    async def should_handle_raw_message(self, message: Any) -> bool:
+        """Return False if the raw incoming message should be dropped."""
+        sender_id = _extract_sender_id(message)
+        content = getattr(message, "content", "") or ""
+        is_duplicate = await self._deduplicator.is_duplicate(
+            message.id,
+            content,
+            sender_id,
+        )
+        return not is_duplicate
</code_context>
<issue_to_address>
**issue (bug_risk):** Content used for deduplication may not be a string, causing encode errors in downstream hashing.

`content` here comes straight from `message.content` into `MessageDeduplicator.is_duplicate`, which calls `build_sender_content_dedup_key` and then `content.encode("utf-8")`. If `message.content` is a non-string but truthy (e.g., a dict), `getattr(..., "content", "") or ""` will pass that through and `.encode` will fail at runtime.

To avoid this, either normalize to a string before passing it in:

```python
raw_content = getattr(message, "content", "")
content = str(raw_content or "")
```

or add an `isinstance(content, str)` check inside `build_sender_content_dedup_key` and handle non-string values appropriately.
</issue_to_address>

### Comment 2
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py" line_range="76" />
<code_context>
+    return sender_id_str
+
+
+class MessageDeduplicator:
+    def __init__(
+        self,
</code_context>
<issue_to_address>
**issue (complexity):** Consider refactoring the deduplication logic into smaller, focused helpers and simplifying content-key handling to make the control flow clearer and easier to maintain.

You can reduce the complexity of the new dedup logic without changing behavior by:

1. Flattening `MessageDeduplicator.is_duplicate` via small helpers instead of local flags and nested branches.
2. Inlining `_build_content_key` (or making it the “one true” key builder if you expect more callers).

### 1. Flatten `is_duplicate` into clear phases

Right now `is_duplicate` mixes “compute flags”, “check id”, “check content”, and the special-case “discard message_id” in one method. Extracting them into small helpers makes the control flow easier to read and test in isolation, while preserving all semantics.

For example:

```python
class MessageDeduplicator:
    ...

    def _id_dedup_enabled(self, message_id: str) -> bool:
        return self._message_ids.ttl_seconds > 0 and bool(message_id)

    def _content_dedup_enabled(self) -> bool:
        return self._content_keys.ttl_seconds > 0

    def _register_message_id(self, message_id: str) -> bool:
        """Return True if duplicate by ID, False otherwise (and register)."""
        if self._message_ids.contains(message_id):
            logger.debug(
                "[QQOfficial] Duplicate message detected (by ID): %s...",
                message_id[:50],
            )
            return True

        self._message_ids.add(message_id)
        return False

    def _register_content(
        self,
        message_id: str,
        content: str,
        sender_id: str,
        id_dedup_enabled: bool,
    ) -> bool:
        """Return True if duplicate by content, False otherwise (and register).

        Note: If the content is duplicate we discard message_id to preserve
        the existing behavior of not keeping message_id on content duplicates.
        """
        content_key = build_sender_content_dedup_key(content, sender_id)
        if content_key is None:
            logger.debug(
                "[QQOfficial] New message registered: %s...", message_id[:50]
            )
            return False

        if self._content_keys.contains(content_key):
            logger.debug(
                "[QQOfficial] Duplicate message detected (by content): %s",
                content_key,
            )
            if id_dedup_enabled:
                self._message_ids.discard(message_id)
            return True

        self._content_keys.add(content_key)
        logger.debug("[QQOfficial] New message registered: %s...", message_id[:50])
        return False

    async def is_duplicate(
        self,
        message_id: str,
        content: str = "",
        sender_id: str = "",
    ) -> bool:
        async with self._lock:
            id_dedup_enabled = self._id_dedup_enabled(message_id)
            content_dedup_enabled = self._content_dedup_enabled()

            if not id_dedup_enabled and not content_dedup_enabled:
                return False

            if id_dedup_enabled and self._register_message_id(message_id):
                return True

            if not content_dedup_enabled:
                if id_dedup_enabled:
                    logger.debug(
                        "[QQOfficial] New message registered: %s...",
                        message_id[:50],
                    )
                return False

            return self._register_content(
                message_id=message_id,
                content=content,
                sender_id=sender_id,
                id_dedup_enabled=id_dedup_enabled,
            )
```

This keeps the same behavior (including discarding `message_id` when content is duplicate) but makes each concern local and the top-level control flow linear.

### 2. Remove `_build_content_key` indirection

Currently `_build_content_key` is just a thin wrapper over `build_sender_content_dedup_key` and only used once. Unless you plan to reuse or swap implementations, it’s simpler to inline it or make it the central builder.

If you go with inlining (as above), you can delete `_build_content_key` entirely.

If you prefer to keep a central builder, use it everywhere that constructs a content key (in QQ adapter and any other dedup consumers):

```python
class MessageDeduplicator:
    ...

    @staticmethod
    def build_content_key(content: str, sender_id: str) -> str | None:
        return build_sender_content_dedup_key(content, sender_id)
```

Then call `MessageDeduplicator.build_content_key(...)` from both `MessageDeduplicator` and e.g. `event_bus` to unify the concept.
</issue_to_address>

### Comment 3
<location path="astrbot/core/event_bus.py" line_range="42" />
<code_context>
         # abconf uuid -> scheduler
         self.pipeline_scheduler_mapping = pipeline_scheduler_mapping
         self.astrbot_config_mgr = astrbot_config_mgr
+        dedup_ttl_seconds = safe_positive_float(
+            self.astrbot_config_mgr.g(
+                None,
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting the new event-level deduplication logic into a separate `EventDeduplicator` helper so `EventBus` only configures and calls it.

You can keep the current behavior but reduce the dispersion of dedup logic by extracting a small, reusable `EventDeduplicator` and letting `EventBus` depend on it.

### 1. Extract event-level dedup into a helper

Move the key-building and policy into a dedicated helper that knows how to inspect `AstrMessageEvent`:

```python
# event_dedup.py
from astrbot.core.message.utils import (
    build_content_dedup_key,
    build_message_id_dedup_key,
)
from astrbot.core.utils.ttl_registry import TTLKeyRegistry
from .platform import AstrMessageEvent
from astrbot.core import logger


class EventDeduplicator:
    def __init__(self, ttl_seconds: float) -> None:
        self._registry = TTLKeyRegistry(ttl_seconds=ttl_seconds)

    def is_duplicate(self, event: AstrMessageEvent) -> bool:
        if self._registry.ttl_seconds == 0:
            return False

        message_id_key = self._build_message_id_key(event)
        if message_id_key is not None:
            if self._registry.contains(message_id_key):
                logger.debug(
                    "Skip duplicate event in event_bus (by message_id): umo=%s, sender=%s",
                    event.unified_msg_origin,
                    event.get_sender_id(),
                )
                return True
            self._registry.add(message_id_key)

        content_key = self._build_content_key(event)
        if self._registry.contains(content_key):
            logger.debug(
                "Skip duplicate event in event_bus (by content): umo=%s, sender=%s",
                event.unified_msg_origin,
                event.get_sender_id(),
            )
            if message_id_key is not None:
                self._registry.discard(message_id_key)
            return True

        self._registry.add(content_key)
        return False

    @staticmethod
    def _build_content_key(event: AstrMessageEvent) -> str:
        return build_content_dedup_key(
            platform_id=str(event.get_platform_id() or ""),
            unified_msg_origin=str(event.unified_msg_origin or ""),
            sender_id=str(event.get_sender_id() or ""),
            text=str(event.get_message_str() or ""),
            components=event.get_messages(),
        )

    @staticmethod
    def _build_message_id_key(event: AstrMessageEvent) -> str | None:
        message_id = getattr(event.message_obj, "message_id", "") or getattr(
            event.message_obj,
            "id",
            "",
        )
        return build_message_id_dedup_key(
            platform_id=str(event.get_platform_id() or ""),
            unified_msg_origin=str(event.unified_msg_origin or ""),
            message_id=str(message_id or ""),
        )
```

This centralizes:

- which `AstrMessageEvent` fields participate in dedup
- the policy (ID-first, then content, discard ID key on content dup)
- the TTL registry ownership

It can later be reused/aligned with `MessageDeduplicator` to avoid policy drift.

### 2. Slim down `EventBus` to focus on dispatch

`EventBus` then just configures and uses this helper:

```python
# event_bus.py
from astrbot.core.utils.number_utils import safe_positive_float
from .event_dedup import EventDeduplicator

class EventBus:
    def __init__(self, event_queue, pipeline_scheduler_mapping, astrbot_config_mgr):
        self.event_queue = event_queue
        self.pipeline_scheduler_mapping = pipeline_scheduler_mapping
        self.astrbot_config_mgr = astrbot_config_mgr

        dedup_ttl_seconds = safe_positive_float(
            self.astrbot_config_mgr.g(
                None,
                "event_bus_dedup_ttl_seconds",
                0.5,
            ),
            default=0.5,
        )
        self._deduplicator = EventDeduplicator(ttl_seconds=dedup_ttl_seconds)

    async def dispatch(self) -> None:
        # event_queue 由单一消费者处理;去重结构不是线程安全的,按设计仅在此循环中使用。
        while True:
            event: AstrMessageEvent = await self.event_queue.get()
            if self._deduplicator.is_duplicate(event):
                continue

            conf_info = self.astrbot_config_mgr.get_conf_info(event.unified_msg_origin)
            conf_id = conf_info["id"]
            conf_name = conf_info.get("name") or conf_id
            self._print_event(event, conf_name)
            scheduler = self.pipeline_scheduler_mapping.get(conf_id)
            ...
```

Benefits:

- `EventBus` no longer encodes dedup policy or key construction, just calls `is_duplicate`.
- Dedup behavior is centralized and can be reused or later unified with `MessageDeduplicator` to keep one mental model for dedup across adapters.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Copy link
Copy Markdown
Author

@001VIsir 001VIsir left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

变更概述

本次 PR 针对最新 review 意见,重点完成了去重链路的健壮性修复和结构精简,在不改变既有业务语义的前提下提升可维护性。

  • 加固 QQOfficialPlatformAdapter.should_handle_raw_message
    • 使用 getattr(message, "id", "") 防御式读取消息 ID
    • 对消息内容统一做字符串归一化(str(raw_content or "")),避免非字符串内容进入哈希流程时触发 .encode() 运行时异常
  • 恢复 send_helper 的兼容形状:
    • 在主动发送路径中使用 SimpleNamespace(bot=self.client)
    • 保持下游 QQ 上传/发送辅助方法对 .bot 属性的依赖不被破坏
  • 重构 MessageDeduplicator 流程,拆分为职责清晰的小方法:
    • _id_dedup_enabled_content_dedup_enabled
    • _register_message_id_register_content
    • 保留原有语义:命中内容去重时会丢弃已登记的 message_id key
  • 将 EventBus 事件级去重逻辑抽离为独立 helper:
    • 新增 astrbot/core/event_dedup.pyEventDeduplicator
    • EventBus 只负责配置与调度,去重策略由 helper 统一处理
    • 保持 “先 message_id 再 content” 的判重顺序及行为一致

变更目的

  • 提升异常输入场景下的稳定性(特别是 malformed/test message)
  • 避免 send helper 形状变化导致的隐式回归
  • 降低 EventBus 与 QQ 去重逻辑的认知复杂度,便于后续维护与测试

@001VIsir
Copy link
Copy Markdown
Author

@sourcery-ai review

Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 1 issue

Prompt for AI Agents
Please address the comments from this code review:

## Individual Comments

### Comment 1
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py" line_range="77" />
<code_context>
+    return sender_id_str
+
+
+class MessageDeduplicator:
+    def __init__(
+        self,
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying `MessageDeduplicator` by collapsing the helper methods into a streamlined `is_duplicate` that uses `TTLKeyRegistry.seen()` and optionally adding a raw-message view helper to centralize normalization.

You can simplify `MessageDeduplicator` quite a bit by leaning on `TTLKeyRegistry.seen()` and centralizing the policy & logging in `is_duplicate`. This keeps the same behavior (including the “undo ID registration if content duplicate” rule) while reducing helper methods and branching.

### 1) Use `seen()` and inline small helpers

`_id_dedup_enabled` / `_content_dedup_enabled` and the `_register_*` helpers mostly re‑implement `TTLKeyRegistry.seen()`. You can collapse this into a single `is_duplicate` method that:

* Normalizes / checks enablement
* Uses `seen()` with clear, linear control flow
* Explicitly handles the “discard ID on content duplicate” policy

Example refactor:

```python
class MessageDeduplicator:
    def __init__(
        self,
        message_id_ttl_seconds: float = 30 * 60,
        content_key_ttl_seconds: float = 3.0,
        cleanup_interval_seconds: float = 1.0,
    ) -> None:
        self._message_ids = TTLKeyRegistry(
            ttl_seconds=message_id_ttl_seconds,
            cleanup_interval_seconds=cleanup_interval_seconds,
        )
        self._content_keys = TTLKeyRegistry(
            ttl_seconds=content_key_ttl_seconds,
            cleanup_interval_seconds=cleanup_interval_seconds,
        )
        self._lock = asyncio.Lock()

    async def is_duplicate(
        self,
        message_id: str,
        content: str = "",
        sender_id: str = "",
    ) -> bool:
        async with self._lock:
            id_dedup_enabled = self._message_ids.ttl_seconds > 0 and bool(message_id)
            content_dedup_enabled = self._content_keys.ttl_seconds > 0

            if not id_dedup_enabled and not content_dedup_enabled:
                return False

            # 1) ID-based dedup
            if id_dedup_enabled and self._message_ids.seen(message_id):
                logger.debug(
                    "[QQOfficial] Duplicate message detected (by ID): %s...",
                    message_id[:50],
                )
                return True

            # 2) Content-based dedup
            if not content_dedup_enabled:
                logger.debug(
                    "[QQOfficial] New message registered: %s...", message_id[:50]
                )
                return False

            content_key = build_sender_content_dedup_key(content, sender_id)
            if content_key is None:
                logger.debug(
                    "[QQOfficial] New message registered: %s...", message_id[:50]
                )
                return False

            if self._content_keys.seen(content_key):
                logger.debug(
                    "[QQOfficial] Duplicate message detected (by content): %s",
                    content_key,
                )
                # Preserve existing behavior: do not keep message_id on content duplicates
                if id_dedup_enabled:
                    self._message_ids.discard(message_id)
                return True

            logger.debug(
                "[QQOfficial] New message registered: %s...", message_id[:50]
            )
            return False
```

This:

* Removes `_id_dedup_enabled`, `_content_dedup_enabled`, `_register_message_id`, `_register_content`.
* Uses `seen()` instead of `contains` + `add`.
* Keeps the “discard message_id when content duplicate” behavior, but makes it explicit.

### 2) Optional: move normalization into the deduplicator input

You’re already reusing `_extract_sender_id` in both the adapter and `_parse_from_qqofficial`. To thin the adapter further, you can also pass a small “raw message view” into the deduplicator instead of multiple normalized strings.

Minimal example, keeping most of your current surface:

```python
@dataclass
class RawMessageView:
    id: str
    content: str
    sender_id: str

class MessageDeduplicator:
    # ctor unchanged...

    async def is_duplicate_from_raw(self, message: Any) -> bool:
        view = RawMessageView(
            id=str(getattr(message, "id", "") or ""),
            content=str(getattr(message, "content", "") or ""),
            sender_id=_extract_sender_id(message),
        )
        return await self.is_duplicate(view.id, view.content, view.sender_id)
```

Then in `QQOfficialPlatformAdapter`:

```python
async def should_handle_raw_message(self, message: Any) -> bool:
    is_duplicate = await self._deduplicator.is_duplicate_from_raw(message)
    return not is_duplicate
```

This keeps all functionality but moves the normalization logic next to the dedup policy, reducing the adapter’s responsibility surface.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

return sender_id_str


class MessageDeduplicator:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider simplifying MessageDeduplicator by collapsing the helper methods into a streamlined is_duplicate that uses TTLKeyRegistry.seen() and optionally adding a raw-message view helper to centralize normalization.

You can simplify MessageDeduplicator quite a bit by leaning on TTLKeyRegistry.seen() and centralizing the policy & logging in is_duplicate. This keeps the same behavior (including the “undo ID registration if content duplicate” rule) while reducing helper methods and branching.

1) Use seen() and inline small helpers

_id_dedup_enabled / _content_dedup_enabled and the _register_* helpers mostly re‑implement TTLKeyRegistry.seen(). You can collapse this into a single is_duplicate method that:

  • Normalizes / checks enablement
  • Uses seen() with clear, linear control flow
  • Explicitly handles the “discard ID on content duplicate” policy

Example refactor:

class MessageDeduplicator:
    def __init__(
        self,
        message_id_ttl_seconds: float = 30 * 60,
        content_key_ttl_seconds: float = 3.0,
        cleanup_interval_seconds: float = 1.0,
    ) -> None:
        self._message_ids = TTLKeyRegistry(
            ttl_seconds=message_id_ttl_seconds,
            cleanup_interval_seconds=cleanup_interval_seconds,
        )
        self._content_keys = TTLKeyRegistry(
            ttl_seconds=content_key_ttl_seconds,
            cleanup_interval_seconds=cleanup_interval_seconds,
        )
        self._lock = asyncio.Lock()

    async def is_duplicate(
        self,
        message_id: str,
        content: str = "",
        sender_id: str = "",
    ) -> bool:
        async with self._lock:
            id_dedup_enabled = self._message_ids.ttl_seconds > 0 and bool(message_id)
            content_dedup_enabled = self._content_keys.ttl_seconds > 0

            if not id_dedup_enabled and not content_dedup_enabled:
                return False

            # 1) ID-based dedup
            if id_dedup_enabled and self._message_ids.seen(message_id):
                logger.debug(
                    "[QQOfficial] Duplicate message detected (by ID): %s...",
                    message_id[:50],
                )
                return True

            # 2) Content-based dedup
            if not content_dedup_enabled:
                logger.debug(
                    "[QQOfficial] New message registered: %s...", message_id[:50]
                )
                return False

            content_key = build_sender_content_dedup_key(content, sender_id)
            if content_key is None:
                logger.debug(
                    "[QQOfficial] New message registered: %s...", message_id[:50]
                )
                return False

            if self._content_keys.seen(content_key):
                logger.debug(
                    "[QQOfficial] Duplicate message detected (by content): %s",
                    content_key,
                )
                # Preserve existing behavior: do not keep message_id on content duplicates
                if id_dedup_enabled:
                    self._message_ids.discard(message_id)
                return True

            logger.debug(
                "[QQOfficial] New message registered: %s...", message_id[:50]
            )
            return False

This:

  • Removes _id_dedup_enabled, _content_dedup_enabled, _register_message_id, _register_content.
  • Uses seen() instead of contains + add.
  • Keeps the “discard message_id when content duplicate” behavior, but makes it explicit.

2) Optional: move normalization into the deduplicator input

You’re already reusing _extract_sender_id in both the adapter and _parse_from_qqofficial. To thin the adapter further, you can also pass a small “raw message view” into the deduplicator instead of multiple normalized strings.

Minimal example, keeping most of your current surface:

@dataclass
class RawMessageView:
    id: str
    content: str
    sender_id: str

class MessageDeduplicator:
    # ctor unchanged...

    async def is_duplicate_from_raw(self, message: Any) -> bool:
        view = RawMessageView(
            id=str(getattr(message, "id", "") or ""),
            content=str(getattr(message, "content", "") or ""),
            sender_id=_extract_sender_id(message),
        )
        return await self.is_duplicate(view.id, view.content, view.sender_id)

Then in QQOfficialPlatformAdapter:

async def should_handle_raw_message(self, message: Any) -> bool:
    is_duplicate = await self._deduplicator.is_duplicate_from_raw(message)
    return not is_duplicate

This keeps all functionality but moves the normalization logic next to the dedup policy, reducing the adapter’s responsibility surface.

@001VIsir 001VIsir marked this pull request as draft March 18, 2026 08:37
@001VIsir 001VIsir marked this pull request as ready for review March 18, 2026 08:41
@auto-assign auto-assign bot requested review from Raven95676 and Soulter March 18, 2026 08:41
# Conflicts:
#	astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py
@001VIsir
Copy link
Copy Markdown
Author

@sourcery-ai review

Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 2 issues

Prompt for AI Agents
Please address the comments from this code review:

## Individual Comments

### Comment 1
<location path="astrbot/core/event_dedup.py" line_range="15-16" />
<code_context>
+    def __init__(self, ttl_seconds: float = 0.5) -> None:
+        self._registry = TTLKeyRegistry(ttl_seconds=ttl_seconds)
+
+    def is_duplicate(self, event: AstrMessageEvent) -> bool:
+        if self._registry.ttl_seconds == 0:
+            return False
+
</code_context>
<issue_to_address>
**suggestion:** Event deduplication is disabled only when TTL is exactly 0; it might be safer to treat all non-positive TTLs as disabled.

Here `is_duplicate` short-circuits only when `self._registry.ttl_seconds == 0`. Even though `safe_positive_float` should prevent negative values, using `<= 0` would be more robust against unexpected config or future parsing changes, and would align with `_clean_expired`, which already treats `ttl_seconds <= 0` as disabling cleanup.
</issue_to_address>

### Comment 2
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py" line_range="81" />
<code_context>
+    return sender_id_str
+
+
+class MessageDeduplicator:
+    def __init__(
+        self,
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying `MessageDeduplicator` into a synchronous, linear implementation that uses `TTLKeyRegistry.seen()` directly while preserving existing deduplication behavior.

You can simplify `MessageDeduplicator` significantly without changing behavior by:

* dropping the async lock and making it synchronous (callers are already single-threaded async handlers),
* collapsing the helper methods into a single flow,
* using `TTLKeyRegistry.seen(...)` instead of `contains` + `add`.

That keeps all configuration knobs and existing semantics (including the “discard ID entry on content-dup” behavior) but removes a lot of local complexity and indirection.

### Suggested refactor

```python
class MessageDeduplicator:
    def __init__(
        self,
        message_id_ttl_seconds: float = 30 * 60,
        content_key_ttl_seconds: float = 3.0,
        cleanup_interval_seconds: float = 1.0,
    ) -> None:
        self._message_ids = TTLKeyRegistry(
            ttl_seconds=message_id_ttl_seconds,
            cleanup_interval_seconds=cleanup_interval_seconds,
        )
        self._content_keys = TTLKeyRegistry(
            ttl_seconds=content_key_ttl_seconds,
            cleanup_interval_seconds=cleanup_interval_seconds,
        )

    def is_duplicate(
        self,
        message_id: str,
        content: str = "",
        sender_id: str = "",
    ) -> bool:
        id_dedup_enabled = self._message_ids.ttl_seconds > 0 and bool(message_id)
        content_dedup_enabled = self._content_keys.ttl_seconds > 0

        if not id_dedup_enabled and not content_dedup_enabled:
            return False

        # 1) ID-based dedup
        if id_dedup_enabled:
            seen_before = self._message_ids.seen(message_id)
            if seen_before:
                logger.debug(
                    "[QQOfficial] Duplicate message detected (by ID): %s...",
                    message_id[:50],
                )
                return True

        # 2) Content-based dedup
        if not content_dedup_enabled:
            logger.debug("[QQOfficial] New message registered: %s...", message_id[:50])
            return False

        content_key = build_sender_content_dedup_key(content, sender_id)
        if content_key is None:
            logger.debug("[QQOfficial] New message registered: %s...", message_id[:50])
            return False

        content_seen_before = self._content_keys.seen(content_key)
        if content_seen_before:
            logger.debug(
                "[QQOfficial] Duplicate message detected (by content): %s",
                content_key,
            )
            # Preserve existing behavior: do not keep message_id on content duplicates
            if id_dedup_enabled:
                self._message_ids.discard(message_id)
            return True

        logger.debug("[QQOfficial] New message registered: %s...", message_id[:50])
        return False
```

Then `QQOfficialPlatformAdapter.should_handle_raw_message` can become:

```python
async def should_handle_raw_message(self, message: Any) -> bool:
    sender_id = _extract_sender_id(message)
    message_id = str(getattr(message, "id", "") or "")
    raw_content = getattr(message, "content", "")
    content = str(raw_content or "")
    is_duplicate = self._deduplicator.is_duplicate(
        message_id,
        content,
        sender_id,
    )
    return not is_duplicate
```

And the call sites remain unchanged:

```python
async def on_group_at_message_create(self, message: botpy.message.GroupMessage) -> None:
    if not await self.platform.should_handle_raw_message(message):
        return
    ...
```

This keeps all current behavior (ID TTL, content TTL, cleanup interval, and the “drop ID entry on content duplicate” rule) but removes:

* the async lock and related concurrency complexity,
* four small helper methods with scattered control flow,
* the manual `contains` + `add` usage in favor of the existing `seen()` abstraction.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +15 to +16
def is_duplicate(self, event: AstrMessageEvent) -> bool:
if self._registry.ttl_seconds == 0:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Event deduplication is disabled only when TTL is exactly 0; it might be safer to treat all non-positive TTLs as disabled.

Here is_duplicate short-circuits only when self._registry.ttl_seconds == 0. Even though safe_positive_float should prevent negative values, using <= 0 would be more robust against unexpected config or future parsing changes, and would align with _clean_expired, which already treats ttl_seconds <= 0 as disabling cleanup.

@001VIsir
Copy link
Copy Markdown
Author

#6320 似乎遇到了和此pr类似的问题,不过在 4.22.1版本 我用master分支测试下来bug依然没被解决,仍然存在重复消费问题
12345
1234

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:platform The bug / feature is about IM platform adapter, such as QQ, Lark, Telegram, WebChat and so on. size:XL This PR changes 500-999 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug]消息重复读取

1 participant