Description / 描述
背景
我在看 EventBus 的消息分发逻辑时,发现这里对每条消息都会直接丢一个后台任务出去:
asyncio.create_task(scheduler.execute(event))
位置在:
- astrbot/core/event_bus.py
- EventBus.dispatch()
这块本身没问题,消息进来后异步跑 Pipeline 是合理的。但现在这个 task 创建后就没人管了:没有保存引用,也没有
done_callback 去检查执行结果。
## 可能的问题
如果 Pipeline 里面某个阶段炸了,比如:
- 插件处理异常;
- Provider 调用异常;
- 文件处理异常;
- 最后发消息时平台适配器异常;
那这个异常可能只会变成 asyncio 的后台 task exception。实际排查时,日志里不一定能很清楚地看到“是哪条消息、哪个平台、哪
个会话触发的”。
对用户来说,表现可能就是:消息没回、插件没生效、控制台里也不好定位具体原因。
## 为什么我觉得这个值得处理
EventBus -> Pipeline 是所有消息都会经过的核心路径,不是某个平台或某个插件的局部问题。
现在 Pipeline 自己会做一些清理,比如 finally 里清理临时文件,这很好。但 task 本身没有被 EventBus 或生命周期管理起来,
所以还有两个小问题:
1. 异常可观测性不够好;
2. 停机时也不好明确知道还有没有正在跑的 Pipeline task。
这类问题平时不一定明显,但一旦线上有插件异常或者平台偶发异常,排查成本会比较高。
## 期望行为
希望 EventBus 能把这些后台 Pipeline task 管起来,比如:
- 创建 task 后放进一个 active tasks 集合;
- task 完成后自动从集合里移除;
- task 如果抛异常,统一打日志;
- 日志里最好带上消息上下文,比如:
- unified_msg_origin
- 配置 ID / 配置名
- 平台 ID / 平台名称
- 发送者 ID / 名称
- 消息概要
- 后续如果需要优雅停机,也可以等待或取消还没结束的 Pipeline task。
## 一个可能的实现方向
可以大概做成这样:
def _create_pipeline_task(self, event, scheduler, conf_info):
task = asyncio.create_task(scheduler.execute(event))
self._active_tasks.add(task)
task.add_done_callback(
lambda task: self._on_pipeline_task_done(task, event, conf_info)
)
return task
然后 _on_pipeline_task_done() 里做几件事:
- 如果 task 是正常取消,就忽略;
- 安全读取 task.exception();
- 如果有异常,就带上 event 上下文打日志。
## 建议补充的测试
可以在 tests/unit/test_event_bus.py 里补几条:
- Pipeline 正常执行完成后,active task 会被清理;
- Pipeline 抛异常后,会记录日志,并且 active task 也会被清理;
- 找不到 scheduler 的时候,不应该创建 Pipeline task;
- 多条消息并发进入时,最终 active tasks 能清空。
## 影响范围
这个改动应该不改变现有消息处理逻辑,只是让后台任务更可控、异常更好查。
对插件、Provider、平台适配器应该都是兼容的。
### Use Case / 使用场景
_No response_
### Willing to Submit PR? / 是否愿意提交PR?
- [x] Yes, I am willing to submit a PR. / 是的,我愿意提交 PR。
### Code of Conduct
- [x] I have read and agree to abide by the project's [Code of Conduct](https://docs.github.com/zh/site-policy/github-terms/github-community-code-of-conduct). /
Description / 描述
背景
我在看
EventBus的消息分发逻辑时,发现这里对每条消息都会直接丢一个后台任务出去: