fix(platform): prevent qq_official duplicate message consumption (#5848)#6519
fix(platform): prevent qq_official duplicate message consumption (#5848)#6519001VIsir wants to merge 5 commits intoAstrBotDevs:masterfrom
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request addresses a critical issue where messages from the Highlights
Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Hey - I've found 3 issues
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py" line_range="90-98" />
<code_context>
+ content_hash = hashlib.sha1(content.encode("utf-8")).hexdigest()[:16]
+ return f"{sender_id}:{content_hash}"
+
+ async def is_duplicate(
+ self,
+ message_id: str,
+ content: str = "",
+ sender_id: str = "",
+ ) -> bool:
+ async with self._lock:
+ # Bypass deduplication if TTL is 0 (disabled)
+ if self._message_ids.ttl_seconds == 0:
+ return False
+
</code_context>
<issue_to_address>
**question (bug_risk):** Disabling message-id dedup via TTL=0 also disables content-based dedup, which may be stricter than intended.
In `MessageDeduplicator.is_duplicate`, this check:
```python
if self._message_ids.ttl_seconds == 0:
return False
```
causes `dedup_message_id_ttl_seconds = 0` to disable *all* deduplication, even when `dedup_content_key_ttl_seconds` is > 0. With separate config knobs, callers may expect to disable ID-based dedup while keeping content-based dedup.
To keep the knobs independent, consider moving the `ttl_seconds == 0` check into the message-ID branch only, and still executing the content-key logic when `self._content_keys.ttl_seconds > 0`.
</issue_to_address>
### Comment 2
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py" line_range="135" />
<code_context>
def set_platform(self, platform: QQOfficialPlatformAdapter) -> None:
self.platform = platform
+ def _get_sender_id(self, message) -> str:
+ """Extract sender ID from different message types.
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying the deduplication and sender-id logic by collapsing redundant helper methods and centralizing shared content-key construction into a single utility.
You can trim a good amount of indirection and duplicated logic without changing behavior.
### 1. Collapse sender-id helpers
`_extract_sender_id` and `botClient._get_sender_id` just call each other. You can remove `_get_sender_id` and use the module-level helper directly:
```python
# keep this as the single source of truth
def _extract_sender_id(message) -> str:
...
```
```python
class botClient(Client):
def set_platform(self, platform: QQOfficialPlatformAdapter) -> None:
self.platform = platform
# remove _get_sender_id
async def _should_drop_message(self, message) -> bool:
sender_id = _extract_sender_id(message)
content = getattr(message, "content", "") or ""
return await self.platform._is_duplicate_message(message.id, content, sender_id)
```
This removes one layer without changing semantics.
### 2. Flatten dedup wrapper chain
Right now the path is:
`handler → _should_drop_message → _extract_dedup_key → platform._is_duplicate_message → MessageDeduplicator.is_duplicate`.
`_extract_dedup_key` is small and only used in one place; you can inline it into `_should_drop_message` as above and keep `_is_duplicate_message` as the single platform-facing hook:
```python
class botClient(Client):
...
async def _should_drop_message(self, message) -> bool:
sender_id = _extract_sender_id(message)
content = getattr(message, "content", "") or ""
return await self.platform._is_duplicate_message(message.id, content, sender_id)
```
Then remove `_extract_dedup_key` entirely.
This makes each handler’s flow easier to follow without altering functionality.
### 3. Extract a shared content-key builder
To avoid duplicating content-key semantics between `MessageDeduplicator` and `EventDeduplicator`, you can move `_build_content_key` into a shared utility function and call it from both:
```python
# e.g. in a shared module, or at module level if both can import it
def build_sender_content_key(content: str, sender_id: str) -> str | None:
if not (content and sender_id):
return None
content_hash = hashlib.sha1(content.encode("utf-8")).hexdigest()[:16]
return f"{sender_id}:{content_hash}"
```
```python
class MessageDeduplicator:
...
def _build_content_key(self, content: str, sender_id: str) -> str | None:
return build_sender_content_key(content, sender_id)
```
And in `EventDeduplicator`:
```python
class EventDeduplicator:
...
def _build_content_key(self, content: str, sender_id: str) -> str | None:
return build_sender_content_key(content, sender_id)
```
This keeps all content-dedup semantics in a single place while preserving your current behavior and TTL-registry usage.
</issue_to_address>
### Comment 3
<location path="astrbot/core/event_bus.py" line_range="27" />
<code_context>
from .platform import AstrMessageEvent
+class EventDeduplicator:
+ """Event deduplicator using TTL-based registry.
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting the deduplication key-building logic into shared utilities and optionally using a generic TTL-based deduplicator to simplify and reuse the EventDeduplicator logic.
You can reduce the added complexity without changing behavior by:
1. **Extracting key-building into reusable pure functions**, and
2. **Making the ID-first/content-second policy explicit and generic** so `EventDeduplicator` is just configuration.
### 1. Extract key-building into shared utilities
Move the key-building logic out of `EventDeduplicator` so it’s testable in isolation and reusable (e.g. by `MessageDeduplicator`).
```python
# astrbot/core/message/utils.py
import hashlib
from astrbot.core.message.utils import build_component_dedup_signature # existing
from astrbot.core.platform import AstrMessageEvent # adjust import path as needed
_MAX_RAW_TEXT_FINGERPRINT_LEN = 256
def build_event_content_dedup_key(event: AstrMessageEvent) -> str:
msg_text = (event.get_message_str() or "").strip()
if len(msg_text) <= _MAX_RAW_TEXT_FINGERPRINT_LEN:
msg_sig = msg_text
else:
msg_hash = hashlib.sha1(msg_text.encode("utf-8")).hexdigest()[:16]
msg_sig = f"h:{len(msg_text)}:{msg_hash}"
attach_sig = build_component_dedup_signature(event.get_messages())
return "|".join([
"content",
event.get_platform_id() or "",
event.unified_msg_origin or "",
event.get_sender_id() or "",
msg_sig,
attach_sig,
])
def build_event_message_id_dedup_key(event: AstrMessageEvent) -> str | None:
message_id = str(getattr(event.message_obj, "message_id", "") or "")
if not message_id:
message_id = str(getattr(event.message_obj, "id", "") or "")
if not message_id:
return None
return "|".join([
"message_id",
event.get_platform_id() or "",
event.unified_msg_origin or "",
message_id,
])
```
Then `EventDeduplicator` becomes much thinner and no longer tightly couples the algorithm to the class:
```python
# event_bus.py
from astrbot.core.message.utils import (
build_event_content_dedup_key,
build_event_message_id_dedup_key,
)
class EventDeduplicator:
def __init__(self, ttl_seconds: float = 0.5) -> None:
self._registry = TTLKeyRegistry(ttl_seconds)
def is_duplicate(self, event: AstrMessageEvent) -> bool:
if self._registry.ttl_seconds == 0:
return False
message_id_key = build_event_message_id_dedup_key(event)
if message_id_key is not None:
if self._registry.contains(message_id_key):
logger.debug(
"Skip duplicate event in event_bus (by message_id): umo=%s, sender=%s",
event.unified_msg_origin,
event.get_sender_id(),
)
return True
self._registry.add(message_id_key)
content_key = build_event_content_dedup_key(event)
if self._registry.contains(content_key):
logger.debug(
"Skip duplicate event in event_bus (by content): umo=%s, sender=%s",
event.unified_msg_origin,
event.get_sender_id(),
)
if message_id_key is not None:
self._registry.discard(message_id_key)
return True
self._registry.add(content_key)
return False
```
This keeps all semantics (including the “discard message-id on content duplicate” behavior) but makes the algorithm reusable and easier to test and reason about.
### 2. Introduce a generic TTL-based deduplicator (optional next step)
To unify with other dedup implementations (`MessageDeduplicator`), you can introduce a small generic helper that encapsulates the ID-first / content-second policy and accepts pluggable key extractors:
```python
# astrbot/core/utils/dedup.py
from collections.abc import Callable
from astrbot.core.utils.ttl_registry import TTLKeyRegistry
class TTLBasedDeduplicator:
def __init__(
self,
ttl_seconds: float,
build_id_key: Callable[[object], str | None],
build_content_key: Callable[[object], str],
) -> None:
self._registry = TTLKeyRegistry(ttl_seconds)
self._build_id_key = build_id_key
self._build_content_key = build_content_key
def is_duplicate(self, obj: object) -> bool:
if self._registry.ttl_seconds == 0:
return False
id_key = self._build_id_key(obj)
if id_key is not None:
if self._registry.contains(id_key):
return True
self._registry.add(id_key)
content_key = self._build_content_key(obj)
if self._registry.contains(content_key):
if id_key is not None:
self._registry.discard(id_key)
return True
self._registry.add(content_key)
return False
```
Then `EventDeduplicator` becomes a thin adapter:
```python
from astrbot.core.utils.dedup import TTLBasedDeduplicator
from astrbot.core.message.utils import (
build_event_content_dedup_key,
build_event_message_id_dedup_key,
)
class EventDeduplicator(TTLBasedDeduplicator):
def __init__(self, ttl_seconds: float = 0.5) -> None:
super().__init__(
ttl_seconds=ttl_seconds,
build_id_key=build_event_message_id_dedup_key,
build_content_key=build_event_content_dedup_key,
)
```
`QQOfficialPlatformAdapter`’s `MessageDeduplicator` can be migrated to the same generic helper by supplying its own `build_id_key` and `build_content_key`, removing duplicated control-flow while preserving existing behavior.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py
Outdated
Show resolved
Hide resolved
astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py
Outdated
Show resolved
Hide resolved
astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py
Outdated
Show resolved
Hide resolved
|
@sourcery-ai review |
There was a problem hiding this comment.
Hey - I've found 3 issues, and left some high level feedback:
- TTLKeyRegistry is documented as a general-purpose utility but is not concurrency-safe; if there is any chance it will be reused outside the single-consumer EventBus/QQ adapter loops, consider adding a lightweight lock or documenting the single-threaded assumption explicitly in the class docstring.
- EventBus deduplication currently logs duplicate events both inside EventDeduplicator.is_duplicate and again in dispatch; consider consolidating the logging in one place to avoid noisy duplicate log lines for the same event.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- TTLKeyRegistry is documented as a general-purpose utility but is not concurrency-safe; if there is any chance it will be reused outside the single-consumer EventBus/QQ adapter loops, consider adding a lightweight lock or documenting the single-threaded assumption explicitly in the class docstring.
- EventBus deduplication currently logs duplicate events both inside EventDeduplicator.is_duplicate and again in dispatch; consider consolidating the logging in one place to avoid noisy duplicate log lines for the same event.
## Individual Comments
### Comment 1
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py" line_range="153" />
<code_context>
def set_platform(self, platform: QQOfficialPlatformAdapter) -> None:
self.platform = platform
+ async def _should_drop_message(self, message) -> bool:
+ sender_id = _extract_sender_id(message)
+ content = getattr(message, "content", "") or ""
</code_context>
<issue_to_address>
**issue (complexity):** Consider inlining the deduplication decision into a single platform-level method to avoid an extra indirection layer between the client and the MessageDeduplicator.
You can reduce complexity without changing behavior by collapsing the extra indirection around the deduplicator.
Right now, the call chain is:
- `botClient._should_drop_message(message)` →
- `QQOfficialPlatformAdapter._is_duplicate_message(message_id, content, sender_id)` →
- `MessageDeduplicator.is_duplicate(...)`
For this adapter, that middle layer in `botClient` plus the thin wrapper in the platform adds cognitive overhead without giving much in return.
### Suggested refactor: single platform-level entrypoint
Move the message-field extraction and dedup decision into the platform, and let the client call a single method. This removes one internal layer and makes the flow easier to follow in the handlers.
**In `QQOfficialPlatformAdapter`:**
```python
class QQOfficialPlatformAdapter(Platform):
# ...
def __init__(self, platform_config: dict[str, Any]) -> None:
# ...
self._deduplicator = MessageDeduplicator(
message_id_ttl_seconds=message_id_ttl_seconds,
content_key_ttl_seconds=content_key_ttl_seconds,
cleanup_interval_seconds=cleanup_interval_seconds,
)
async def should_handle_raw_message(self, message: Any) -> bool:
"""Return False if the message should be dropped as a duplicate."""
sender_id = _extract_sender_id(message)
content = getattr(message, "content", "") or ""
return not await self._deduplicator.is_duplicate(
message.id,
content,
sender_id,
)
```
**In `botClient`:**
```python
class botClient(Client):
def set_platform(self, platform: QQOfficialPlatformAdapter) -> None:
self.platform = platform
# Remove _should_drop_message entirely
async def on_group_at_message_create(
self,
message: botpy.message.GroupMessage,
) -> None:
if not await self.platform.should_handle_raw_message(message):
return
abm = QQOfficialPlatformAdapter._parse_from_qqofficial(
message,
MessageType.GROUP_MESSAGE,
)
# ...
async def on_at_message_create(self, message: botpy.message.Message) -> None:
if not await self.platform.should_handle_raw_message(message):
return
abm = QQOfficialPlatformAdapter._parse_from_qqofficial(
message,
MessageType.GROUP_MESSAGE,
)
# ...
async def on_direct_message_create(
self,
message: botpy.message.DirectMessage,
) -> None:
if not await self.platform.should_handle_raw_message(message):
return
abm = QQOfficialPlatformAdapter._parse_from_qqofficial(
message,
MessageType.FRIEND_MESSAGE,
)
# ...
async def on_c2c_message_create(
self,
message: botpy.message.C2CMessage,
) -> None:
if not await self.platform.should_handle_raw_message(message):
return
abm = QQOfficialPlatformAdapter._parse_from_qqofficial(
message,
MessageType.FRIEND_MESSAGE,
)
# ...
```
**And remove** the now-redundant platform wrapper:
```python
# Delete this; the logic has moved into should_handle_raw_message
async def _is_duplicate_message(
self,
message_id: str,
content: str = "",
sender_id: str = "",
) -> bool:
return await self._deduplicator.is_duplicate(message_id, content, sender_id)
```
This keeps all functionality intact (same fields go into `MessageDeduplicator`, same timing of the check) while:
- Reducing the number of internal layers for “should I drop this message?”
- Centralizing the “raw incoming message → dedup decision” logic on the platform, which already owns `_deduplicator`.
</issue_to_address>
### Comment 2
<location path="astrbot/core/event_bus.py" line_range="29" />
<code_context>
from .platform import AstrMessageEvent
+class EventDeduplicator:
+ """Event deduplicator using TTL-based registry.
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider inlining the deduplication logic into `EventBus` as a private method using a `TTLKeyRegistry` field instead of a separate `EventDeduplicator` wrapper class and duplicated logging.
You can reduce indirection and logging noise without changing behavior by inlining `EventDeduplicator` into `EventBus` and centralizing the dedup log.
### 1. Store `TTLKeyRegistry` directly on `EventBus`
Keep all config behavior, but drop the wrapper class:
```python
from astrbot.core.utils.ttl_registry import TTLKeyRegistry
from astrbot.core.utils.number_utils import safe_positive_float
from astrbot.core.message.utils import (
build_event_content_dedup_key,
build_event_message_id_dedup_key,
)
class EventBus:
def __init__(
self,
event_queue: Queue,
pipeline_scheduler_mapping: dict[str, PipelineScheduler],
astrbot_config_mgr: AstrBotConfigManager,
) -> None:
self.event_queue = event_queue
self.pipeline_scheduler_mapping = pipeline_scheduler_mapping
self.astrbot_config_mgr = astrbot_config_mgr
dedup_ttl_seconds = safe_positive_float(
self.astrbot_config_mgr.g(
None,
"event_bus_dedup_ttl_seconds",
0.5,
),
default=0.5,
)
self._dedup_registry = TTLKeyRegistry(dedup_ttl_seconds)
```
### 2. Inline `is_duplicate` as a private method
Preserve the exact logic (TTL 0 special case, message_id short‑circuit, content dedup that discards message_id):
```python
class EventBus:
...
def _is_duplicate(self, event: AstrMessageEvent) -> bool:
# TTL of 0 means deduplication is disabled
if self._dedup_registry.ttl_seconds == 0:
return False
message_id_key = build_event_message_id_dedup_key(event)
if message_id_key is not None:
if self._dedup_registry.contains(message_id_key):
# Only log here to avoid double-logging
logger.debug(
"Skip duplicate event in event_bus (by message_id): umo=%s, sender=%s",
event.unified_msg_origin,
event.get_sender_id(),
)
return True
self._dedup_registry.add(message_id_key)
content_key = build_event_content_dedup_key(event)
if self._dedup_registry.contains(content_key):
logger.debug(
"Skip duplicate event in event_bus (by content): umo=%s, sender=%s",
event.unified_msg_origin,
event.get_sender_id(),
)
# Preserve existing behavior: discard message_id key on content duplicate
if message_id_key is not None:
self._dedup_registry.discard(message_id_key)
return True
self._dedup_registry.add(content_key)
return False
```
### 3. Simplify `dispatch` and remove double logging
Let `_is_duplicate` be the single source of dedup logs:
```python
class EventBus:
...
async def dispatch(self) -> None:
# event_queue 由单一消费者处理;去重结构不是线程安全的,按设计仅在此循环中使用。
while True:
event: AstrMessageEvent = await self.event_queue.get()
if self._is_duplicate(event):
continue
conf_info = self.astrbot_config_mgr.get_conf_info(event.unified_msg_origin)
...
```
This keeps all dedup behavior and config intact, while:
- Removing a thin wrapper class only used in one place.
- Collapsing the policy into a single, local `_is_duplicate` method.
- Ensuring deduplication is logged once in a predictable location.
</issue_to_address>
### Comment 3
<location path="astrbot/core/message/utils.py" line_range="60" />
<code_context>
+ return f"{sender_id}:{content_hash}"
+
+
+def build_event_content_dedup_key(event: "AstrMessageEvent") -> str:
+ """Build a content fingerprint key for EventBus deduplication."""
+ msg_text = str(event.get_message_str() or "").strip()
</code_context>
<issue_to_address>
**issue (complexity):** Consider refactoring these dedup helpers to operate on primitive message fields instead of AstrMessageEvent instances and moving the event-specific extraction into the EventBus layer.
You can reduce coupling/complexity by making the utils work on primitives instead of `AstrMessageEvent` directly, and let EventBus handle the event introspection.
### 1. Decouple from `AstrMessageEvent`
Right now `message/utils.py` knows a lot about `AstrMessageEvent` internals (`get_message_str`, `get_messages`, `get_platform_id`, `unified_msg_origin`, `get_sender_id`, `message_obj`). This makes the “generic” utils module depend on platform/event semantics.
You can keep the same behavior but change the API to accept plain values, and move the event-specific extraction into EventBus:
```python
# message/utils.py
def build_content_dedup_key(
*,
platform_id: str,
unified_msg_origin: str,
sender_id: str,
text: str,
components: Iterable[BaseMessageComponent],
) -> str:
msg_text = (text or "").strip()
if len(msg_text) <= _MAX_RAW_TEXT_FINGERPRINT_LEN:
msg_sig = msg_text
else:
msg_hash = hashlib.sha1(msg_text.encode("utf-8")).hexdigest()[:16]
msg_sig = f"h:{len(msg_text)}:{msg_hash}"
attach_sig = build_component_dedup_signature(components)
return "|".join(
["content", platform_id, unified_msg_origin, sender_id, msg_sig, attach_sig]
)
def build_message_id_dedup_key(
*,
platform_id: str,
unified_msg_origin: str,
message_id: str,
) -> str | None:
if not message_id:
return None
return "|".join(["message_id", platform_id, unified_msg_origin, message_id])
```
Then in EventBus (or a dedicated dedup module):
```python
# event_bus_dedup.py (or wherever the dedup logic lives)
from astrbot.core.message import utils as msg_utils
def make_event_content_dedup_key(event: AstrMessageEvent) -> str:
return msg_utils.build_content_dedup_key(
platform_id=str(event.get_platform_id() or ""),
unified_msg_origin=str(event.unified_msg_origin or ""),
sender_id=str(event.get_sender_id() or ""),
text=str(event.get_message_str() or ""),
components=event.get_messages(),
)
def make_event_message_id_dedup_key(event: AstrMessageEvent) -> str | None:
raw_id = getattr(event.message_obj, "message_id", "") or getattr(
event.message_obj, "id", ""
)
return msg_utils.build_message_id_dedup_key(
platform_id=str(event.get_platform_id() or ""),
unified_msg_origin=str(event.unified_msg_origin or ""),
message_id=str(raw_id or ""),
)
```
This keeps all current functionality but:
- Removes `AstrMessageEvent` knowledge from `message/utils.py`.
- Makes the utils module genuinely “message/attachment oriented”.
- Localizes “what counts as a duplicate event” to the EventBus/dedup layer, where platform semantics already live.
### 2. Collapse event-specific key builders
With the above, `build_event_content_dedup_key` and `build_event_message_id_dedup_key` disappear from `message/utils.py` and are replaced by the two generic helpers shown. This reduces the number of “key builder” functions the reader must understand in the utils module without changing the actual dedup behavior.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py
Outdated
Show resolved
Hide resolved
|
@sourcery-ai review |
001VIsir
left a comment
There was a problem hiding this comment.
已实现的修改:
- qqofficial 去重链简化 — 删除 botClient._should_drop_message 和 platform._is_duplicate_message 中间层,改用 platform.should_handle_raw_message 单一入口
- EventBus 去重逻辑内联 — 移除 EventDeduplicator 包装类,去重逻辑直接做成 EventBus._is_duplicate() 私有方法,dispatch 里不再有重复日志
- message/utils 去耦合 — 把基于 AstrMessageEvent 的 dedup key 构建改为 primitive 参数(build_content_dedup_key / build_message_id_dedup_key),事件字段提取迁移到 EventBus 本地静态方法
- TTLKeyRegistry 文档 — 明确写明 "not thread-safe, designed for single-consumer/single-thread usage"
There was a problem hiding this comment.
Hey - I've found 3 issues, and left some high level feedback:
- In
QQOfficialPlatformAdapter._send_by_session_commonthesend_helpervariable was changed from aSimpleNamespace(bot=self.client)wrapper toself.clientdirectly; if downstream helpers still expect a.botattribute this will break, so consider either preserving the wrapper or updating all call sites to match the new shape explicitly. QQOfficialPlatformAdapter.should_handle_raw_messageaccessesmessage.iddirectly; for defensive robustness against malformed or test messages you may want to usegetattr(message, 'id', '')(mirroring the guarded access patterns used elsewhere) before passing it into the deduplicator.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `QQOfficialPlatformAdapter._send_by_session_common` the `send_helper` variable was changed from a `SimpleNamespace(bot=self.client)` wrapper to `self.client` directly; if downstream helpers still expect a `.bot` attribute this will break, so consider either preserving the wrapper or updating all call sites to match the new shape explicitly.
- `QQOfficialPlatformAdapter.should_handle_raw_message` accesses `message.id` directly; for defensive robustness against malformed or test messages you may want to use `getattr(message, 'id', '')` (mirroring the guarded access patterns used elsewhere) before passing it into the deduplicator.
## Individual Comments
### Comment 1
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py" line_range="281-285" />
<code_context>
+ async def should_handle_raw_message(self, message: Any) -> bool:
+ """Return False if the raw incoming message should be dropped."""
+ sender_id = _extract_sender_id(message)
+ content = getattr(message, "content", "") or ""
+ is_duplicate = await self._deduplicator.is_duplicate(
+ message.id,
+ content,
+ sender_id,
+ )
+ return not is_duplicate
</code_context>
<issue_to_address>
**issue (bug_risk):** Content used for deduplication may not be a string, causing encode errors in downstream hashing.
`content` here comes straight from `message.content` into `MessageDeduplicator.is_duplicate`, which calls `build_sender_content_dedup_key` and then `content.encode("utf-8")`. If `message.content` is a non-string but truthy (e.g., a dict), `getattr(..., "content", "") or ""` will pass that through and `.encode` will fail at runtime.
To avoid this, either normalize to a string before passing it in:
```python
raw_content = getattr(message, "content", "")
content = str(raw_content or "")
```
or add an `isinstance(content, str)` check inside `build_sender_content_dedup_key` and handle non-string values appropriately.
</issue_to_address>
### Comment 2
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py" line_range="76" />
<code_context>
+ return sender_id_str
+
+
+class MessageDeduplicator:
+ def __init__(
+ self,
</code_context>
<issue_to_address>
**issue (complexity):** Consider refactoring the deduplication logic into smaller, focused helpers and simplifying content-key handling to make the control flow clearer and easier to maintain.
You can reduce the complexity of the new dedup logic without changing behavior by:
1. Flattening `MessageDeduplicator.is_duplicate` via small helpers instead of local flags and nested branches.
2. Inlining `_build_content_key` (or making it the “one true” key builder if you expect more callers).
### 1. Flatten `is_duplicate` into clear phases
Right now `is_duplicate` mixes “compute flags”, “check id”, “check content”, and the special-case “discard message_id” in one method. Extracting them into small helpers makes the control flow easier to read and test in isolation, while preserving all semantics.
For example:
```python
class MessageDeduplicator:
...
def _id_dedup_enabled(self, message_id: str) -> bool:
return self._message_ids.ttl_seconds > 0 and bool(message_id)
def _content_dedup_enabled(self) -> bool:
return self._content_keys.ttl_seconds > 0
def _register_message_id(self, message_id: str) -> bool:
"""Return True if duplicate by ID, False otherwise (and register)."""
if self._message_ids.contains(message_id):
logger.debug(
"[QQOfficial] Duplicate message detected (by ID): %s...",
message_id[:50],
)
return True
self._message_ids.add(message_id)
return False
def _register_content(
self,
message_id: str,
content: str,
sender_id: str,
id_dedup_enabled: bool,
) -> bool:
"""Return True if duplicate by content, False otherwise (and register).
Note: If the content is duplicate we discard message_id to preserve
the existing behavior of not keeping message_id on content duplicates.
"""
content_key = build_sender_content_dedup_key(content, sender_id)
if content_key is None:
logger.debug(
"[QQOfficial] New message registered: %s...", message_id[:50]
)
return False
if self._content_keys.contains(content_key):
logger.debug(
"[QQOfficial] Duplicate message detected (by content): %s",
content_key,
)
if id_dedup_enabled:
self._message_ids.discard(message_id)
return True
self._content_keys.add(content_key)
logger.debug("[QQOfficial] New message registered: %s...", message_id[:50])
return False
async def is_duplicate(
self,
message_id: str,
content: str = "",
sender_id: str = "",
) -> bool:
async with self._lock:
id_dedup_enabled = self._id_dedup_enabled(message_id)
content_dedup_enabled = self._content_dedup_enabled()
if not id_dedup_enabled and not content_dedup_enabled:
return False
if id_dedup_enabled and self._register_message_id(message_id):
return True
if not content_dedup_enabled:
if id_dedup_enabled:
logger.debug(
"[QQOfficial] New message registered: %s...",
message_id[:50],
)
return False
return self._register_content(
message_id=message_id,
content=content,
sender_id=sender_id,
id_dedup_enabled=id_dedup_enabled,
)
```
This keeps the same behavior (including discarding `message_id` when content is duplicate) but makes each concern local and the top-level control flow linear.
### 2. Remove `_build_content_key` indirection
Currently `_build_content_key` is just a thin wrapper over `build_sender_content_dedup_key` and only used once. Unless you plan to reuse or swap implementations, it’s simpler to inline it or make it the central builder.
If you go with inlining (as above), you can delete `_build_content_key` entirely.
If you prefer to keep a central builder, use it everywhere that constructs a content key (in QQ adapter and any other dedup consumers):
```python
class MessageDeduplicator:
...
@staticmethod
def build_content_key(content: str, sender_id: str) -> str | None:
return build_sender_content_dedup_key(content, sender_id)
```
Then call `MessageDeduplicator.build_content_key(...)` from both `MessageDeduplicator` and e.g. `event_bus` to unify the concept.
</issue_to_address>
### Comment 3
<location path="astrbot/core/event_bus.py" line_range="42" />
<code_context>
# abconf uuid -> scheduler
self.pipeline_scheduler_mapping = pipeline_scheduler_mapping
self.astrbot_config_mgr = astrbot_config_mgr
+ dedup_ttl_seconds = safe_positive_float(
+ self.astrbot_config_mgr.g(
+ None,
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting the new event-level deduplication logic into a separate `EventDeduplicator` helper so `EventBus` only configures and calls it.
You can keep the current behavior but reduce the dispersion of dedup logic by extracting a small, reusable `EventDeduplicator` and letting `EventBus` depend on it.
### 1. Extract event-level dedup into a helper
Move the key-building and policy into a dedicated helper that knows how to inspect `AstrMessageEvent`:
```python
# event_dedup.py
from astrbot.core.message.utils import (
build_content_dedup_key,
build_message_id_dedup_key,
)
from astrbot.core.utils.ttl_registry import TTLKeyRegistry
from .platform import AstrMessageEvent
from astrbot.core import logger
class EventDeduplicator:
def __init__(self, ttl_seconds: float) -> None:
self._registry = TTLKeyRegistry(ttl_seconds=ttl_seconds)
def is_duplicate(self, event: AstrMessageEvent) -> bool:
if self._registry.ttl_seconds == 0:
return False
message_id_key = self._build_message_id_key(event)
if message_id_key is not None:
if self._registry.contains(message_id_key):
logger.debug(
"Skip duplicate event in event_bus (by message_id): umo=%s, sender=%s",
event.unified_msg_origin,
event.get_sender_id(),
)
return True
self._registry.add(message_id_key)
content_key = self._build_content_key(event)
if self._registry.contains(content_key):
logger.debug(
"Skip duplicate event in event_bus (by content): umo=%s, sender=%s",
event.unified_msg_origin,
event.get_sender_id(),
)
if message_id_key is not None:
self._registry.discard(message_id_key)
return True
self._registry.add(content_key)
return False
@staticmethod
def _build_content_key(event: AstrMessageEvent) -> str:
return build_content_dedup_key(
platform_id=str(event.get_platform_id() or ""),
unified_msg_origin=str(event.unified_msg_origin or ""),
sender_id=str(event.get_sender_id() or ""),
text=str(event.get_message_str() or ""),
components=event.get_messages(),
)
@staticmethod
def _build_message_id_key(event: AstrMessageEvent) -> str | None:
message_id = getattr(event.message_obj, "message_id", "") or getattr(
event.message_obj,
"id",
"",
)
return build_message_id_dedup_key(
platform_id=str(event.get_platform_id() or ""),
unified_msg_origin=str(event.unified_msg_origin or ""),
message_id=str(message_id or ""),
)
```
This centralizes:
- which `AstrMessageEvent` fields participate in dedup
- the policy (ID-first, then content, discard ID key on content dup)
- the TTL registry ownership
It can later be reused/aligned with `MessageDeduplicator` to avoid policy drift.
### 2. Slim down `EventBus` to focus on dispatch
`EventBus` then just configures and uses this helper:
```python
# event_bus.py
from astrbot.core.utils.number_utils import safe_positive_float
from .event_dedup import EventDeduplicator
class EventBus:
def __init__(self, event_queue, pipeline_scheduler_mapping, astrbot_config_mgr):
self.event_queue = event_queue
self.pipeline_scheduler_mapping = pipeline_scheduler_mapping
self.astrbot_config_mgr = astrbot_config_mgr
dedup_ttl_seconds = safe_positive_float(
self.astrbot_config_mgr.g(
None,
"event_bus_dedup_ttl_seconds",
0.5,
),
default=0.5,
)
self._deduplicator = EventDeduplicator(ttl_seconds=dedup_ttl_seconds)
async def dispatch(self) -> None:
# event_queue 由单一消费者处理;去重结构不是线程安全的,按设计仅在此循环中使用。
while True:
event: AstrMessageEvent = await self.event_queue.get()
if self._deduplicator.is_duplicate(event):
continue
conf_info = self.astrbot_config_mgr.get_conf_info(event.unified_msg_origin)
conf_id = conf_info["id"]
conf_name = conf_info.get("name") or conf_id
self._print_event(event, conf_name)
scheduler = self.pipeline_scheduler_mapping.get(conf_id)
...
```
Benefits:
- `EventBus` no longer encodes dedup policy or key construction, just calls `is_duplicate`.
- Dedup behavior is centralized and can be reused or later unified with `MessageDeduplicator` to keep one mental model for dedup across adapters.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py
Outdated
Show resolved
Hide resolved
001VIsir
left a comment
There was a problem hiding this comment.
变更概述
本次 PR 针对最新 review 意见,重点完成了去重链路的健壮性修复和结构精简,在不改变既有业务语义的前提下提升可维护性。
- 加固
QQOfficialPlatformAdapter.should_handle_raw_message:- 使用
getattr(message, "id", "")防御式读取消息 ID - 对消息内容统一做字符串归一化(
str(raw_content or "")),避免非字符串内容进入哈希流程时触发.encode()运行时异常
- 使用
- 恢复
send_helper的兼容形状:- 在主动发送路径中使用
SimpleNamespace(bot=self.client) - 保持下游 QQ 上传/发送辅助方法对
.bot属性的依赖不被破坏
- 在主动发送路径中使用
- 重构
MessageDeduplicator流程,拆分为职责清晰的小方法:_id_dedup_enabled、_content_dedup_enabled_register_message_id、_register_content- 保留原有语义:命中内容去重时会丢弃已登记的 message_id key
- 将 EventBus 事件级去重逻辑抽离为独立 helper:
- 新增
astrbot/core/event_dedup.py(EventDeduplicator) EventBus只负责配置与调度,去重策略由 helper 统一处理- 保持 “先 message_id 再 content” 的判重顺序及行为一致
- 新增
变更目的
- 提升异常输入场景下的稳定性(特别是 malformed/test message)
- 避免 send helper 形状变化导致的隐式回归
- 降低 EventBus 与 QQ 去重逻辑的认知复杂度,便于后续维护与测试
|
@sourcery-ai review |
There was a problem hiding this comment.
Hey - I've found 1 issue
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py" line_range="77" />
<code_context>
+ return sender_id_str
+
+
+class MessageDeduplicator:
+ def __init__(
+ self,
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying `MessageDeduplicator` by collapsing the helper methods into a streamlined `is_duplicate` that uses `TTLKeyRegistry.seen()` and optionally adding a raw-message view helper to centralize normalization.
You can simplify `MessageDeduplicator` quite a bit by leaning on `TTLKeyRegistry.seen()` and centralizing the policy & logging in `is_duplicate`. This keeps the same behavior (including the “undo ID registration if content duplicate” rule) while reducing helper methods and branching.
### 1) Use `seen()` and inline small helpers
`_id_dedup_enabled` / `_content_dedup_enabled` and the `_register_*` helpers mostly re‑implement `TTLKeyRegistry.seen()`. You can collapse this into a single `is_duplicate` method that:
* Normalizes / checks enablement
* Uses `seen()` with clear, linear control flow
* Explicitly handles the “discard ID on content duplicate” policy
Example refactor:
```python
class MessageDeduplicator:
def __init__(
self,
message_id_ttl_seconds: float = 30 * 60,
content_key_ttl_seconds: float = 3.0,
cleanup_interval_seconds: float = 1.0,
) -> None:
self._message_ids = TTLKeyRegistry(
ttl_seconds=message_id_ttl_seconds,
cleanup_interval_seconds=cleanup_interval_seconds,
)
self._content_keys = TTLKeyRegistry(
ttl_seconds=content_key_ttl_seconds,
cleanup_interval_seconds=cleanup_interval_seconds,
)
self._lock = asyncio.Lock()
async def is_duplicate(
self,
message_id: str,
content: str = "",
sender_id: str = "",
) -> bool:
async with self._lock:
id_dedup_enabled = self._message_ids.ttl_seconds > 0 and bool(message_id)
content_dedup_enabled = self._content_keys.ttl_seconds > 0
if not id_dedup_enabled and not content_dedup_enabled:
return False
# 1) ID-based dedup
if id_dedup_enabled and self._message_ids.seen(message_id):
logger.debug(
"[QQOfficial] Duplicate message detected (by ID): %s...",
message_id[:50],
)
return True
# 2) Content-based dedup
if not content_dedup_enabled:
logger.debug(
"[QQOfficial] New message registered: %s...", message_id[:50]
)
return False
content_key = build_sender_content_dedup_key(content, sender_id)
if content_key is None:
logger.debug(
"[QQOfficial] New message registered: %s...", message_id[:50]
)
return False
if self._content_keys.seen(content_key):
logger.debug(
"[QQOfficial] Duplicate message detected (by content): %s",
content_key,
)
# Preserve existing behavior: do not keep message_id on content duplicates
if id_dedup_enabled:
self._message_ids.discard(message_id)
return True
logger.debug(
"[QQOfficial] New message registered: %s...", message_id[:50]
)
return False
```
This:
* Removes `_id_dedup_enabled`, `_content_dedup_enabled`, `_register_message_id`, `_register_content`.
* Uses `seen()` instead of `contains` + `add`.
* Keeps the “discard message_id when content duplicate” behavior, but makes it explicit.
### 2) Optional: move normalization into the deduplicator input
You’re already reusing `_extract_sender_id` in both the adapter and `_parse_from_qqofficial`. To thin the adapter further, you can also pass a small “raw message view” into the deduplicator instead of multiple normalized strings.
Minimal example, keeping most of your current surface:
```python
@dataclass
class RawMessageView:
id: str
content: str
sender_id: str
class MessageDeduplicator:
# ctor unchanged...
async def is_duplicate_from_raw(self, message: Any) -> bool:
view = RawMessageView(
id=str(getattr(message, "id", "") or ""),
content=str(getattr(message, "content", "") or ""),
sender_id=_extract_sender_id(message),
)
return await self.is_duplicate(view.id, view.content, view.sender_id)
```
Then in `QQOfficialPlatformAdapter`:
```python
async def should_handle_raw_message(self, message: Any) -> bool:
is_duplicate = await self._deduplicator.is_duplicate_from_raw(message)
return not is_duplicate
```
This keeps all functionality but moves the normalization logic next to the dedup policy, reducing the adapter’s responsibility surface.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| return sender_id_str | ||
|
|
||
|
|
||
| class MessageDeduplicator: |
There was a problem hiding this comment.
issue (complexity): Consider simplifying MessageDeduplicator by collapsing the helper methods into a streamlined is_duplicate that uses TTLKeyRegistry.seen() and optionally adding a raw-message view helper to centralize normalization.
You can simplify MessageDeduplicator quite a bit by leaning on TTLKeyRegistry.seen() and centralizing the policy & logging in is_duplicate. This keeps the same behavior (including the “undo ID registration if content duplicate” rule) while reducing helper methods and branching.
1) Use seen() and inline small helpers
_id_dedup_enabled / _content_dedup_enabled and the _register_* helpers mostly re‑implement TTLKeyRegistry.seen(). You can collapse this into a single is_duplicate method that:
- Normalizes / checks enablement
- Uses
seen()with clear, linear control flow - Explicitly handles the “discard ID on content duplicate” policy
Example refactor:
class MessageDeduplicator:
def __init__(
self,
message_id_ttl_seconds: float = 30 * 60,
content_key_ttl_seconds: float = 3.0,
cleanup_interval_seconds: float = 1.0,
) -> None:
self._message_ids = TTLKeyRegistry(
ttl_seconds=message_id_ttl_seconds,
cleanup_interval_seconds=cleanup_interval_seconds,
)
self._content_keys = TTLKeyRegistry(
ttl_seconds=content_key_ttl_seconds,
cleanup_interval_seconds=cleanup_interval_seconds,
)
self._lock = asyncio.Lock()
async def is_duplicate(
self,
message_id: str,
content: str = "",
sender_id: str = "",
) -> bool:
async with self._lock:
id_dedup_enabled = self._message_ids.ttl_seconds > 0 and bool(message_id)
content_dedup_enabled = self._content_keys.ttl_seconds > 0
if not id_dedup_enabled and not content_dedup_enabled:
return False
# 1) ID-based dedup
if id_dedup_enabled and self._message_ids.seen(message_id):
logger.debug(
"[QQOfficial] Duplicate message detected (by ID): %s...",
message_id[:50],
)
return True
# 2) Content-based dedup
if not content_dedup_enabled:
logger.debug(
"[QQOfficial] New message registered: %s...", message_id[:50]
)
return False
content_key = build_sender_content_dedup_key(content, sender_id)
if content_key is None:
logger.debug(
"[QQOfficial] New message registered: %s...", message_id[:50]
)
return False
if self._content_keys.seen(content_key):
logger.debug(
"[QQOfficial] Duplicate message detected (by content): %s",
content_key,
)
# Preserve existing behavior: do not keep message_id on content duplicates
if id_dedup_enabled:
self._message_ids.discard(message_id)
return True
logger.debug(
"[QQOfficial] New message registered: %s...", message_id[:50]
)
return FalseThis:
- Removes
_id_dedup_enabled,_content_dedup_enabled,_register_message_id,_register_content. - Uses
seen()instead ofcontains+add. - Keeps the “discard message_id when content duplicate” behavior, but makes it explicit.
2) Optional: move normalization into the deduplicator input
You’re already reusing _extract_sender_id in both the adapter and _parse_from_qqofficial. To thin the adapter further, you can also pass a small “raw message view” into the deduplicator instead of multiple normalized strings.
Minimal example, keeping most of your current surface:
@dataclass
class RawMessageView:
id: str
content: str
sender_id: str
class MessageDeduplicator:
# ctor unchanged...
async def is_duplicate_from_raw(self, message: Any) -> bool:
view = RawMessageView(
id=str(getattr(message, "id", "") or ""),
content=str(getattr(message, "content", "") or ""),
sender_id=_extract_sender_id(message),
)
return await self.is_duplicate(view.id, view.content, view.sender_id)Then in QQOfficialPlatformAdapter:
async def should_handle_raw_message(self, message: Any) -> bool:
is_duplicate = await self._deduplicator.is_duplicate_from_raw(message)
return not is_duplicateThis keeps all functionality but moves the normalization logic next to the dedup policy, reducing the adapter’s responsibility surface.
# Conflicts: # astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py
|
@sourcery-ai review |
There was a problem hiding this comment.
Hey - I've found 2 issues
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location path="astrbot/core/event_dedup.py" line_range="15-16" />
<code_context>
+ def __init__(self, ttl_seconds: float = 0.5) -> None:
+ self._registry = TTLKeyRegistry(ttl_seconds=ttl_seconds)
+
+ def is_duplicate(self, event: AstrMessageEvent) -> bool:
+ if self._registry.ttl_seconds == 0:
+ return False
+
</code_context>
<issue_to_address>
**suggestion:** Event deduplication is disabled only when TTL is exactly 0; it might be safer to treat all non-positive TTLs as disabled.
Here `is_duplicate` short-circuits only when `self._registry.ttl_seconds == 0`. Even though `safe_positive_float` should prevent negative values, using `<= 0` would be more robust against unexpected config or future parsing changes, and would align with `_clean_expired`, which already treats `ttl_seconds <= 0` as disabling cleanup.
</issue_to_address>
### Comment 2
<location path="astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py" line_range="81" />
<code_context>
+ return sender_id_str
+
+
+class MessageDeduplicator:
+ def __init__(
+ self,
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying `MessageDeduplicator` into a synchronous, linear implementation that uses `TTLKeyRegistry.seen()` directly while preserving existing deduplication behavior.
You can simplify `MessageDeduplicator` significantly without changing behavior by:
* dropping the async lock and making it synchronous (callers are already single-threaded async handlers),
* collapsing the helper methods into a single flow,
* using `TTLKeyRegistry.seen(...)` instead of `contains` + `add`.
That keeps all configuration knobs and existing semantics (including the “discard ID entry on content-dup” behavior) but removes a lot of local complexity and indirection.
### Suggested refactor
```python
class MessageDeduplicator:
def __init__(
self,
message_id_ttl_seconds: float = 30 * 60,
content_key_ttl_seconds: float = 3.0,
cleanup_interval_seconds: float = 1.0,
) -> None:
self._message_ids = TTLKeyRegistry(
ttl_seconds=message_id_ttl_seconds,
cleanup_interval_seconds=cleanup_interval_seconds,
)
self._content_keys = TTLKeyRegistry(
ttl_seconds=content_key_ttl_seconds,
cleanup_interval_seconds=cleanup_interval_seconds,
)
def is_duplicate(
self,
message_id: str,
content: str = "",
sender_id: str = "",
) -> bool:
id_dedup_enabled = self._message_ids.ttl_seconds > 0 and bool(message_id)
content_dedup_enabled = self._content_keys.ttl_seconds > 0
if not id_dedup_enabled and not content_dedup_enabled:
return False
# 1) ID-based dedup
if id_dedup_enabled:
seen_before = self._message_ids.seen(message_id)
if seen_before:
logger.debug(
"[QQOfficial] Duplicate message detected (by ID): %s...",
message_id[:50],
)
return True
# 2) Content-based dedup
if not content_dedup_enabled:
logger.debug("[QQOfficial] New message registered: %s...", message_id[:50])
return False
content_key = build_sender_content_dedup_key(content, sender_id)
if content_key is None:
logger.debug("[QQOfficial] New message registered: %s...", message_id[:50])
return False
content_seen_before = self._content_keys.seen(content_key)
if content_seen_before:
logger.debug(
"[QQOfficial] Duplicate message detected (by content): %s",
content_key,
)
# Preserve existing behavior: do not keep message_id on content duplicates
if id_dedup_enabled:
self._message_ids.discard(message_id)
return True
logger.debug("[QQOfficial] New message registered: %s...", message_id[:50])
return False
```
Then `QQOfficialPlatformAdapter.should_handle_raw_message` can become:
```python
async def should_handle_raw_message(self, message: Any) -> bool:
sender_id = _extract_sender_id(message)
message_id = str(getattr(message, "id", "") or "")
raw_content = getattr(message, "content", "")
content = str(raw_content or "")
is_duplicate = self._deduplicator.is_duplicate(
message_id,
content,
sender_id,
)
return not is_duplicate
```
And the call sites remain unchanged:
```python
async def on_group_at_message_create(self, message: botpy.message.GroupMessage) -> None:
if not await self.platform.should_handle_raw_message(message):
return
...
```
This keeps all current behavior (ID TTL, content TTL, cleanup interval, and the “drop ID entry on content duplicate” rule) but removes:
* the async lock and related concurrency complexity,
* four small helper methods with scattered control flow,
* the manual `contains` + `add` usage in favor of the existing `seen()` abstraction.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| def is_duplicate(self, event: AstrMessageEvent) -> bool: | ||
| if self._registry.ttl_seconds == 0: |
There was a problem hiding this comment.
suggestion: Event deduplication is disabled only when TTL is exactly 0; it might be safer to treat all non-positive TTLs as disabled.
Here is_duplicate short-circuits only when self._registry.ttl_seconds == 0. Even though safe_positive_float should prevent negative values, using <= 0 would be more robust against unexpected config or future parsing changes, and would align with _clean_expired, which already treats ttl_seconds <= 0 as disabling cleanup.
|
#6320 似乎遇到了和此pr类似的问题,不过在 4.22.1版本 我用master分支测试下来bug依然没被解决,仍然存在重复消费问题 |


Fixes #5848
在
qq_official场景下,消息可能被重复消费,导致同一输入触发多次回复,严重时会造成接口异常/封禁风险。从 issue 日志看,重复事件会在短时间内连续进入处理链,且在运行一段时间后有“递增-回落-再递增”的趋势。
本 PR 的目标是从平台入口、事件总线、平台实例管理三个层面同时消除重复消费,并避免过度去重误拦截正常消息。
Modifications / 改动点
本次核心修改文件如下:
astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.pyastrbot/core/event_bus.pyastrbot/core/platform/manager.pyastrbot/core/config/default.pyastrbot/core/utils/ttl_registry.pyastrbot/core/utils/number_utils.pyastrbot/core/message/utils.py主要改动说明:
MessageDeduplicator,拆分两类去重:message_id长窗口去重(默认1800s)sender_id + content_hash短窗口去重(默认3s)group/channel/direct/c2c四类入口回调统一前置判重,命中即丢弃重复消息。user_openid > member_openid > id),减少不同回调路径导致的判重漂移。dispatch前新增EventDeduplicator:message_id快速短路判重;event_bus_dedup_ttl_seconds(默认0.5s,支持0关闭)。PlatformManager.load_platform中增加防御逻辑:platform_id实例已存在,先终止旧实例再加载新实例。TTLKeyRegistry(TTL 去重注册表,使用time.monotonic(),支持清理节流)。safe_positive_float(配置解析安全化,允许0作为禁用值)。Screenshots or Test Results / 运行截图或测试结果
Verification Steps / 验证步骤:
一、改动前
qq_official适配器启动,保持默认去重配置:-打开
启用消息列表单聊功能和启用频道私聊功能1和2两条消息后日志里却收到2条1和2条2一共四条消息一、改动后
qq_official适配器启动,保持默认去重配置:-打开
启用消息列表单聊功能和启用频道私聊功能** 可以看见消息并没有重复消费,测试了3分钟一切正常**
Checklist / 检查清单
/ 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。
(本 PR 为 bugfix,无新增功能)
/ 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”。
requirements.txtandpyproject.toml./ 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到
requirements.txt和pyproject.toml文件相应位置。/ 我的更改没有引入恶意代码。
Summary by Sourcery
Prevent duplicate message processing for the qq_official platform by adding multi-layer deduplication and safer configuration handling.
Bug Fixes:
Enhancements: