事件高级用法
约 2335 字大约 8 分钟
2026-03-19
事件流消费、一次性等待、三种模式对比、事件实体详解与实战组合。
目录
模式 B:事件流消费
使用 EventMixin 的 events() 方法创建持续的事件流,适合后台监控场景:
import asyncio
class MyPlugin(NcatBotPlugin):
name = "my_plugin"
version = "1.0.0"
async def on_load(self):
self._stream_task = asyncio.create_task(self._stream_listener())
async def on_close(self):
if hasattr(self, "_stream_task"):
self._stream_task.cancel()
async def _stream_listener(self):
"""后台事件流:监听所有私聊消息"""
try:
async with self.events("message") as stream:
async for event in stream:
if (
getattr(event.data, "message_type", None)
and event.data.message_type.value == "private"
):
LOG.info(
"[事件流] 私聊消息: %s (来自 %s)",
event.data.raw_message,
event.data.user_id,
)
except asyncio.CancelledError:
pass关键点
self.events(event_type)—event_type可选,传入则按前缀过滤- 返回
EventStream:支持async with+async for - 必须在后台任务中运行(
asyncio.create_task),否则会阻塞on_load() - 卸载时自动关闭(
EventMixin._mixin_unload()清理所有活跃流)
模式 C:一次性等待
使用 EventMixin 的 wait_event() 等待满足条件的下一个事件,适合交互确认和多步对话:
@registrar.on_group_command("确认测试")
async def on_confirm_test(self, event: GroupMessageEvent):
"""等待用户在 15 秒内回复「确认」"""
await event.reply("请在 15 秒内回复「确认」来完成操作...")
try:
await self.wait_event(
predicate=lambda e: (
hasattr(e.data, "user_id")
and str(e.data.user_id) == str(event.user_id)
and hasattr(e.data, "raw_message")
and e.data.raw_message.strip() == "确认"
),
timeout=15.0,
)
await self.api.qq.post_group_msg(event.group_id, text="操作已确认 ✅")
except asyncio.TimeoutError:
await self.api.qq.post_group_msg(event.group_id, text="操作超时已取消 ⏰")wait_event() 签名
async def wait_event(
predicate: Optional[Callable[[Event], bool]] = None,
timeout: Optional[float] = None,
) -> Event| 参数 | 说明 |
|---|---|
predicate | 过滤函数,返回 True 时匹配 |
timeout | 超时秒数,超时抛出 asyncio.TimeoutError |
封装辅助方法
实际开发中建议使用 Predicate 语法糖(见下文)或封装 _wait_user_reply() 减少重复代码:
async def _wait_user_reply(self, group_id, user_id):
"""等待指定用户在指定群的下一条消息"""
event = await self.wait_event(
predicate=lambda e: (
hasattr(e.data, "user_id")
and str(e.data.user_id) == str(user_id)
and hasattr(e.data, "group_id")
and str(e.data.group_id) == str(group_id)
and hasattr(e.data, "raw_message")
),
timeout=30,
)
return event.data.raw_message.strip()Predicate 语法糖
ncatbot.core 模块提供了一套声明式的 Predicate DSL,可以用运算符组合替代冗长的 lambda。
运算符
| 运算符 | 含义 | 示例 |
|---|---|---|
* 或 & | AND(全部满足) | from_event(event) * msg_equals("确认") |
+ 或 | | OR(任一满足) | msg_equals("是") + msg_equals("yes") |
~ | NOT(取反) | ~same_user(bot_id) |
核心函数:from_event
从触发事件 自动推导同 session 条件(同用户 + 群消息同群 / 私聊同私聊),是最常用的语法糖:
from ncatbot.core import from_event, msg_equals
# 等待同 session 用户的下一条消息
evt = await self.wait_event(predicate=from_event(event), timeout=30)
# 同 session + 精确匹配
evt = await self.wait_event(
predicate=from_event(event) * msg_equals("确认"),
timeout=15,
)推导规则:
| 触发事件类型 | 生成的 predicate |
|---|---|
| 群消息 | same_user(uid) * same_group(gid) * is_group() |
| 私聊消息 | same_user(uid) * is_private() |
| 其他 | same_user(uid) * is_message() |
全部工厂函数
| 函数 | 说明 |
|---|---|
from_event(event) | 从触发事件推导同 session 谓词 |
same_user(uid) | 匹配 user_id |
same_group(gid) | 匹配 group_id |
is_private() | 事件为私聊消息 |
is_group() | 事件为群消息 |
is_message() | 事件为消息类型 |
has_keyword(*words) | raw_message 含任一关键词 |
msg_equals(text) | raw_message.strip() 完全匹配 |
msg_in(*options) | raw_message.strip() 匹配选项之一 |
msg_matches(pattern) | raw_message 正则匹配 |
event_type(prefix) | event.type 前缀匹配 |
P.of(lambda) | 将普通 callable 升级为可组合的 P |
组合示例
from ncatbot.core import from_event, msg_equals, msg_in, has_keyword, same_user, P
# 等"确认"或"取消"
pred = from_event(event) * (msg_equals("确认") + msg_equals("取消"))
# 等价简写
pred = from_event(event) * msg_in("确认", "取消")
# 含关键词
pred = from_event(event) * has_keyword("帮助", "help")
# 排除某用户
pred = from_event(event) * ~same_user(bot_id)
# 混用 lambda
pred = from_event(event) * P.of(lambda e: int(e.data.raw_message) > 0)三种模式对比
| 维度 | 模式 A:装饰器 | 模式 B:事件流 | 模式 C:wait_event |
|---|---|---|---|
| 适用场景 | 命令响应、通知处理 | 后台监控、日志记录 | 交互确认、多步对话 |
| 代码风格 | 声明式(装饰器) | 响应式(async for) | 命令式(await) |
| 并发模型 | 框架自动路由 | 需手动 create_task | Handler 内顺序执行 |
| 生命周期 | 随插件自动管理 | 需手动启动/取消 | 单次调用 |
| 优先级 | ✅ 支持 priority | ❌ 无优先级概念 | ❌ 无优先级概念 |
| 参数绑定 | ✅ CommandHook 自动 | ❌ 需手动解析 | ❌ 需手动解析 |
| 代表示例 | 01_hello_world | 02_event_handling | 10_multi_step_dialog |
建议:优先使用模式 A(装饰器),需要多步交互时用模式 C(wait_event),后台监控用模式 B(事件流)。
事件实体
事件实体是对原始事件数据的包装,提供便捷的属性访问和操作方法。
BaseEvent
所有事件实体的基类,通过 __getattr__ 代理底层数据字段——你可以直接访问 event.user_id、event.group_id 等字段。
MessageEvent
消息事件基类,新增便捷方法:
| 方法/属性 | 说明 |
|---|---|
event.message | MessageArray 实例——消息段数组 |
event.raw_message | 原始消息文本 |
event.user_id | 发送者 QQ 号 |
event.message_id | 消息 ID |
event.sender | 发送者信息 |
await event.reply(text=, image=, ...) | 便捷回复(自动引用 + @发送者) |
await event.delete() | 撤回该消息 |
GroupMessageEvent
继承 MessageEvent,增加群相关操作:
| 方法/属性 | 说明 |
|---|---|
event.group_id | 群号 |
await event.kick() | 踢出发送者 |
await event.ban(duration=60) | 禁言发送者 |
其他事件实体
| 类 | 用于 |
|---|---|
PrivateMessageEvent | 私聊消息 |
GroupIncreaseEvent | 群成员增加 |
NoticeEvent | 通用通知事件 |
FriendRequestEvent | 好友请求(有 approve() 方法) |
GroupRequestEvent | 群请求 |
消息内容处理
消息内容通过 event.message(MessageArray 实例)访问,可用于提取特定类型的消息段:
from ncatbot.types import Reply, Image
# 提取回复段
replies = event.message.filter(Reply)
if replies:
quoted_msg_id = replies[0].id
# 提取图片段
images = event.message.filter(Image)
# 获取纯文本内容
text = event.message.text消息段的完整 API 参见 消息类型详解。
实战组合
装饰器 + wait_event:问答机器人
结合模式 A(命令触发)和模式 C(多步输入),实现一个问答添加流程:
@registrar.on_group_command("注册")
async def on_register(self, event: GroupMessageEvent):
"""多步注册流程"""
gid, uid = event.group_id, event.user_id
await event.reply("📝 请输入你的名字(30秒内回复,输入「取消」退出):")
try:
name = await self._wait_user_reply(gid, uid)
except asyncio.TimeoutError:
await self.api.qq.post_group_msg(gid, text="⏰ 注册超时,已取消")
return
if name == "取消":
await self.api.qq.post_group_msg(gid, text="❌ 注册已取消")
return
await self.api.qq.post_group_msg(gid, text=f"好的,{name}!请输入你的年龄:")
try:
age_str = await self._wait_user_reply(gid, uid)
except asyncio.TimeoutError:
await self.api.qq.post_group_msg(gid, text="⏰ 注册超时,已取消")
return
if not age_str.isdigit():
await self.api.qq.post_group_msg(gid, text="❌ 年龄必须是数字,注册已取消")
return
# 保存数据
self.data.setdefault("users", {})[str(uid)] = {
"name": name, "age": int(age_str)
}
await self.api.qq.post_group_msg(gid, text=f"✅ 注册成功!欢迎你,{name}")装饰器 + 高优先级:消息统计
用高优先级的通用 Handler 统计消息,不影响命令匹配:
@registrar.on_group_message(priority=200)
async def on_count(self, event: GroupMessageEvent):
"""高优先级:每条群消息都统计"""
gid = str(event.group_id)
if gid not in self.data.get("enabled_groups", []):
return
self.data["daily_stats"]["total"] += 1复杂工作流模式
当需求超越线性多步对话——需要并发等待、分支路由、或脱离插件体系直接编排事件流——可以组合 wait_event()、events() 和 run_async() 构建更复杂的工作流。
非阻塞启动 + 事件驱动主循环
在非插件模式下,使用 run_async() 启动 Bot 后,直接通过 bot.dispatcher 消费事件:
import asyncio
from ncatbot.app import BotClient
from ncatbot.core import same_group, has_keyword
bot = BotClient()
async def keyword_monitor():
"""后台监控:检测到关键词时自动提醒"""
async with bot.dispatcher.events("message.group") as stream:
async for event in stream:
if "紧急" in event.data.raw_message:
await bot.api.qq.post_group_msg(
event.data.group_id,
text=f"⚠️ 检测到紧急消息 (来自 {event.data.user_id})",
)
async def main():
await bot.run_async() # Bot 就绪,后台监听
# 启动后台监控任务
monitor_task = asyncio.create_task(keyword_monitor())
try:
await asyncio.Event().wait()
except asyncio.CancelledError:
pass
finally:
monitor_task.cancel()
await bot.shutdown()
if __name__ == "__main__":
asyncio.run(main())并发 wait_event
使用 asyncio.gather 同时等待多个条件,先到先得:
@registrar.on_group_command("投票")
async def on_vote(self, event: GroupMessageEvent):
gid = event.group_id
await event.reply("投票开始!请回复「赞成」或「反对」(30秒)")
votes = {"赞成": 0, "反对": 0}
async def collect_one():
"""收集一票"""
return await self.wait_event(
predicate=same_group(gid) * msg_in("赞成", "反对"),
timeout=30.0,
)
# 并发收集最多 5 票
tasks = [asyncio.create_task(collect_one()) for _ in range(5)]
done, pending = await asyncio.wait(tasks, timeout=30.0)
for task in pending:
task.cancel()
for task in done:
try:
evt = task.result()
text = evt.data.raw_message.strip()
if text in votes:
votes[text] += 1
except (asyncio.TimeoutError, asyncio.CancelledError):
pass
await self.api.qq.post_group_msg(
gid, text=f"投票结果:赞成 {votes['赞成']},反对 {votes['反对']}"
)分支工作流
使用 wait_event + msg_in 做路由分支,实现菜单式交互:
from ncatbot.core import from_event, msg_in
@registrar.on_group_command("服务")
async def on_service(self, event: GroupMessageEvent):
await event.reply("请选择服务:\n1. 查询余额\n2. 充值\n3. 帮助")
try:
choice_evt = await self.wait_event(
predicate=from_event(event) * msg_in("1", "2", "3"),
timeout=30.0,
)
except asyncio.TimeoutError:
await event.reply("超时,已退出")
return
choice = choice_evt.data.raw_message.strip()
if choice == "1":
await self._handle_balance(event)
elif choice == "2":
await self._handle_recharge(event) # 可继续嵌套 wait_event
else:
await event.reply("帮助文档:...")要点总结
| 模式 | 适用场景 | 关键 API |
|---|---|---|
| 非阻塞主循环 | 脱离插件、自定义编排 | run_async() + bot.dispatcher |
| 并发等待 | 收集多人输入、竞争条件 | asyncio.wait() + 多个 wait_event |
| 分支路由 | 菜单式交互、状态机 | msg_in() + 条件分支 |
更完整的编排模式(生命周期事件等待、并发任务协调、清理策略)参见 事件驱动工作流编排。
下一步
- Predicate DSL — 所有工厂函数和组合运算符的完整参考
- 配置与数据 Mixin — 配置、数据持久化
- Hook 基础 — 深入了解过滤器和中间件的工作原理
- 消息类型详解 — 消息段构造、MessageArray、合并转发
- 高级模式 — 多步对话设计模式深入讲解
版权所有
版权归属:huan-yp
