EventBus 事件总线
约 716 字大约 2 分钟
2025-09-26
EventBus:发布订阅与超时隔离
EventBus 为插件系统提供高性能的事件发布/处理机制,支持精确与正则订阅、优先级、超时监控与线程隔离,并可收集每个处理器的返回值与异常。
事件对象 NcatBotEvent
事件发布时使用 NcatBotEvent(type, data):
type: 字符串事件名(例如官方事件名或自定义事件)data: 任意数据对象(BotClient 发布官方事件时会传入对应的BaseEventData子类)(BaseEventData)event.results: 聚合各处理器返回值event.exceptions: 聚合过程中收集的异常- 控制传播:
event.stop_propagation()或event.intercept()(拦截并停止传播)
订阅事件 subscribe()
签名: subscribe(event_type, handler, priority=0, timeout=None, plugin=None) -> handler_id
event_type:- 精确匹配:如
"ncat.private.message" - 正则匹配:以
re:前缀,例如re:^ncat\.(message|notice)\.
- 精确匹配:如
handler:同步或异步函数,接收NcatBotEvent。priority:数值越大优先级越高,先执行;同优先级按函数名排序。timeout:超时秒数;默认使用总线的default_timeout。plugin:可选,注入插件元数据;发生异常/超时时用于记录来源。- 返回
UUID标识,可用于unsubscribe()。
取消订阅:unsubscribe(handler_id) -> bool
发布事件 publish()
签名:await publish(event) -> List[Any]
流程:
- 收集匹配
event.type的处理器(精确 + 正则),按优先级排序; - 为每个处理器在线程池中提交任务,支持同步/异步函数执行;
- 为每个任务单独设置超时并监控,超时会强制终止该工作线程并以
HandlerTimeoutError记录到event.exceptions; - 正常返回值会追加到
event._results; - 返回结果列表(
event.results的副本)。
注意:
- 若某处理器调用了
event.stop_propagation()或event.intercept(),后续处理器将不会再被调度。 - 每个处理器在独立工作线程中执行,避免单个卡死影响全局;被终止的线程会被替补一个新的工作线程。
线程与超时模型
- EventBus 维护一个任务队列和一组工作线程(默认
max_workers=1,可在构造时自定义)。 - 监控线程会定期检查工作线程是否超时,若超时:
- 使用
PyThreadState_SetAsyncExc注入TimeoutError终止该线程; - 立即补充新的工作线程以维持并发度。
- 使用
典型用法
订阅:
hid = event_bus.subscribe(
"ncat.private.message",
handler=my_handler,
priority=10,
timeout=3,
plugin=self, # 在插件内订阅时建议传入,便于异常标注来源
)发布:
from ncatbot.plugin_system.event import NcatBotEvent
results = await event_bus.publish(NcatBotEvent("ncatbot.myplugin.myevent", data))取消订阅:
event_bus.unsubscribe(hid)与 BotClient/Adapter 的协作
- Adapter 将 NapCat 的 WS 事件转换为
BaseEventData并通过 BotClient 的内部处理器分发; - BotClient 内置处理器会把这些官方事件包装为
NcatBotEvent发布到 EventBus; - 插件只需订阅相应事件类型即可参与处理,无需直接对接底层适配器。
版权所有
版权归属:huan-yp
