Predicate DSL
约 1032 字大约 3 分钟
2026-03-19
声明式事件过滤——用运算符组合替代冗长的 lambda,让
wait_event()和事件流过滤更简洁。
目录
快速示例
from ncatbot.core import from_event, msg_equals, msg_in
# 传统写法——冗长的 lambda
await self.wait_event(
predicate=lambda e: (
hasattr(e.data, "user_id")
and str(e.data.user_id) == str(event.user_id)
and hasattr(e.data, "group_id")
and str(e.data.group_id) == str(event.group_id)
and hasattr(e.data, "raw_message")
and e.data.raw_message.strip() == "确认"
),
timeout=15,
)
# Predicate DSL——一行搞定
await self.wait_event(predicate=from_event(event) * msg_equals("确认"), timeout=15)P 基类
所有 Predicate 都继承自抽象基类 P。P 实例是一个可调用对象,接收 Event 返回 bool:
from ncatbot.core.dispatcher.predicate import P
class MyPredicate(P):
def __call__(self, event) -> bool:
return event.data.raw_message == "hello"P.of — 将 callable 升级
如果你已经有一个普通函数或 lambda,可以用 P.of() 将它升级为支持运算符组合的 P 实例:
from ncatbot.core import P, from_event
# lambda 不支持 * + ~ 运算符,但 P.of() 可以
is_positive = P.of(lambda e: int(e.data.raw_message) > 0)
# 现在可以组合了
pred = from_event(event) * is_positive
await self.wait_event(predicate=pred, timeout=30)组合运算符
P 支持三种运算符,返回新的 P 实例:
| 运算符 | 含义 | 返回类型 | 示例 |
|---|---|---|---|
* 或 & | AND(全部满足) | AndP | p1 * p2 |
+ 或 | | OR(任一满足) | OrP | p1 + p2 |
~ | NOT(取反) | NotP | ~p |
组合示例
from ncatbot.core import same_user, same_group, is_group, msg_equals, msg_in
# AND:所有条件都要满足
pred = same_user(uid) * same_group(gid) * is_group() * msg_equals("确认")
# OR:满足任一即可
pred = msg_equals("是") + msg_equals("yes") + msg_equals("y")
# 简写等价
pred = msg_in("是", "yes", "y")
# NOT:排除特定条件
pred = same_group(gid) * ~same_user(bot_id)
# 混合组合
pred = from_event(event) * (msg_equals("确认") + msg_equals("取消"))工厂函数
ncatbot.core 导出以下工厂函数,每个返回一个 P 实例:
| 函数 | 签名 | 说明 |
|---|---|---|
from_event(event) | (event: object) -> P | 从触发事件自动推导同 session 谓词 |
same_user(uid) | (user_id: Union[str, int]) -> P | 匹配 event.data.user_id |
same_group(gid) | (group_id: Union[str, int]) -> P | 匹配 event.data.group_id |
is_private() | () -> P | 事件为私聊消息 |
is_group() | () -> P | 事件为群消息 |
is_message() | () -> P | 事件为消息类型(群/私聊均可) |
event_type(prefix) | (prefix: str) -> P | event.type 前缀匹配 |
has_keyword(*words) | (*words: str) -> P | raw_message 包含任一关键词 |
msg_equals(text) | (text: str) -> P | raw_message.strip() 完全等于 text |
msg_in(*options) | (*options: str) -> P | raw_message.strip() 等于选项之一 |
msg_matches(pattern) | (pattern: str) -> P | raw_message 正则匹配(re.search) |
P.of(fn) | (fn: Callable[[Event], bool]) -> P | 将普通 callable 升级为可组合的 P |
导入
# 推荐:从 ncatbot.core 导入
from ncatbot.core import (
P, from_event, same_user, same_group,
is_private, is_group, is_message,
has_keyword, msg_equals, msg_in, msg_matches, event_type,
)from_event 自动推导
from_event() 是最常用的工厂函数。它从触发事件中提取 user_id、group_id、message_type,自动组合出 "同 session" 谓词:
| 触发事件类型 | 生成的 predicate |
|---|---|
| 群消息 | same_user(uid) * same_group(gid) * is_group() |
| 私聊消息 | same_user(uid) * is_private() |
| 其他(带 user_id) | same_user(uid) * is_message() |
# 不需要手动写同 session 判断
pred = from_event(event) # 同用户 + 同场景
pred = from_event(event) * msg_equals("确认") # + 精确匹配
pred = from_event(event) * msg_in("是", "否") # + 选择匹配
pred = from_event(event) * has_keyword("帮助") # + 关键词匹配实战模式
多步对话确认
@registrar.on_group_command("删除")
async def on_delete(self, event: GroupMessageEvent):
await event.reply("确定要删除吗?请回复「确认」或「取消」")
try:
reply = await self.wait_event(
predicate=from_event(event) * msg_in("确认", "取消"),
timeout=15,
)
if reply.data.raw_message.strip() == "确认":
await event.reply("已删除 ✅")
else:
await event.reply("已取消 ❌")
except asyncio.TimeoutError:
await event.reply("操作超时 ⏰")自由文本收集
@registrar.on_group_command("反馈")
async def on_feedback(self, event: GroupMessageEvent):
await event.reply("请输入你的反馈内容:")
try:
reply = await self.wait_event(
predicate=from_event(event),
timeout=60,
)
feedback = reply.data.raw_message.strip()
self.data.setdefault("feedbacks", []).append(feedback)
await event.reply(f"收到反馈:{feedback}")
except asyncio.TimeoutError:
await event.reply("反馈超时 ⏰")数值输入校验
@registrar.on_group_command("设置数量")
async def on_set_count(self, event: GroupMessageEvent):
await event.reply("请输入一个正整数:")
try:
reply = await self.wait_event(
predicate=from_event(event) * P.of(
lambda e: e.data.raw_message.strip().isdigit()
and int(e.data.raw_message.strip()) > 0
),
timeout=30,
)
count = int(reply.data.raw_message.strip())
self.set_config("count", count)
await event.reply(f"已设置为 {count}")
except asyncio.TimeoutError:
await event.reply("输入超时 ⏰")排除 Bot 自身消息
# 在事件流中排除 Bot 自身的消息
async def _monitor(self):
async with self.events("message") as stream:
async for event in stream:
pred = is_group() * ~same_user(event.self_id)
if pred(event):
LOG.info("群消息: %s", event.data.raw_message)下一步
版权所有
版权归属:huan-yp
