EventBus 事件总线
约 716 字大约 2 分钟
2025-09-26
EventBus:发布订阅与超时隔离
EventBus 为插件系统提供高性能的事件发布/处理机制,支持精确与正则订阅、优先级、超时监控与线程隔离,并可收集每个处理器的返回值与异常。
事件对象 NcatBotEvent
事件发布时使用 NcatBotEvent(type, data)
:
type
: 字符串事件名(例如官方事件名或自定义事件)data
: 任意数据对象(BotClient 发布官方事件时会传入对应的BaseEventData
子类)(BaseEventData)event.results
: 聚合各处理器返回值event.exceptions
: 聚合过程中收集的异常- 控制传播:
event.stop_propagation()
或event.intercept()
(拦截并停止传播)
订阅事件 subscribe()
签名: subscribe(event_type, handler, priority=0, timeout=None, plugin=None) -> handler_id
event_type
:- 精确匹配:如
"ncat.private.message"
- 正则匹配:以
re:
前缀,例如re:^ncat\.(message|notice)\.
- 精确匹配:如
handler
:同步或异步函数,接收NcatBotEvent
。priority
:数值越大优先级越高,先执行;同优先级按函数名排序。timeout
:超时秒数;默认使用总线的default_timeout
。plugin
:可选,注入插件元数据;发生异常/超时时用于记录来源。- 返回
UUID
标识,可用于unsubscribe()
。
取消订阅:unsubscribe(handler_id) -> bool
发布事件 publish()
签名:await publish(event) -> List[Any]
流程:
- 收集匹配
event.type
的处理器(精确 + 正则),按优先级排序; - 为每个处理器在线程池中提交任务,支持同步/异步函数执行;
- 为每个任务单独设置超时并监控,超时会强制终止该工作线程并以
HandlerTimeoutError
记录到event.exceptions
; - 正常返回值会追加到
event._results
; - 返回结果列表(
event.results
的副本)。
注意:
- 若某处理器调用了
event.stop_propagation()
或event.intercept()
,后续处理器将不会再被调度。 - 每个处理器在独立工作线程中执行,避免单个卡死影响全局;被终止的线程会被替补一个新的工作线程。
线程与超时模型
- EventBus 维护一个任务队列和一组工作线程(默认
max_workers=1
,可在构造时自定义)。 - 监控线程会定期检查工作线程是否超时,若超时:
- 使用
PyThreadState_SetAsyncExc
注入TimeoutError
终止该线程; - 立即补充新的工作线程以维持并发度。
- 使用
典型用法
订阅:
hid = event_bus.subscribe(
"ncat.private.message",
handler=my_handler,
priority=10,
timeout=3,
plugin=self, # 在插件内订阅时建议传入,便于异常标注来源
)
发布:
from ncatbot.plugin_system.event import NcatBotEvent
results = await event_bus.publish(NcatBotEvent("ncatbot.myplugin.myevent", data))
取消订阅:
event_bus.unsubscribe(hid)
与 BotClient/Adapter 的协作
- Adapter 将 NapCat 的 WS 事件转换为
BaseEventData
并通过 BotClient 的内部处理器分发; - BotClient 内置处理器会把这些官方事件包装为
NcatBotEvent
发布到 EventBus; - 插件只需订阅相应事件类型即可参与处理,无需直接对接底层适配器。
版权所有
版权归属:huan-yp