权限、定时任务与事件 Mixin
约 1548 字大约 5 分钟
2026-03-19
RBACMixin 提供基于角色的权限管理,TimeTaskMixin 提供定时任务调度,EventMixin 提供直接事件流操作。
目录
RBACMixin — 权限管理
RBACMixin 提供基于角色的访问控制(RBAC),底层使用 RBACService 管理权限数据。
基本用法
class MyPlugin(NcatBotPlugin):
name = "rbac_demo"
version = "1.0.0"
async def on_load(self):
# 注册权限路径
self.add_permission("rbac.admin")
self.add_permission("rbac.user")
# 创建角色
self.add_role("rbac_admin", exist_ok=True)
self.add_role("rbac_user", exist_ok=True)
# 给角色分配权限(通过底层 RBAC 服务)
if self.rbac:
self.rbac.grant("role", "rbac_admin", "rbac.admin")
self.rbac.grant("role", "rbac_admin", "rbac.user")
self.rbac.grant("role", "rbac_user", "rbac.user")
@registrar.on_group_command("授权")
async def on_grant(self, event: GroupMessageEvent, target: At = None):
"""授予用户 admin 角色"""
if target is None:
await event.reply("请 @一个用户")
return
target_uid = str(target.user_id)
if self.rbac:
self.rbac.assign_role("user", target_uid, "rbac_admin")
await event.reply(f"已授予 {target_uid} 管理员权限 ✅")
@registrar.on_group_command("管理命令")
async def on_admin_cmd(self, event: GroupMessageEvent):
"""受权限保护的命令"""
uid = str(event.user_id)
if self.check_permission(uid, "rbac.admin"):
await event.reply("🔑 管理命令执行成功!")
else:
await event.reply("🚫 你没有执行此命令的权限")
@registrar.on_group_command("查权限")
async def on_check_perm(self, event: GroupMessageEvent):
"""查看自己的权限"""
uid = str(event.user_id)
has_admin = self.check_permission(uid, "rbac.admin")
is_admin_role = self.user_has_role(uid, "rbac_admin")
await event.reply(
f"👤 权限状态:\n"
f" 角色 rbac_admin: {'✅' if is_admin_role else '❌'}\n"
f" 权限 rbac.admin: {'✅' if has_admin else '❌'}"
)API
| 方法 | 说明 |
|---|---|
check_permission(user, permission) -> bool | 检查用户是否拥有权限 |
add_permission(path) | 注册权限路径 |
remove_permission(path) | 移除权限路径 |
add_role(role, exist_ok=True) | 创建角色 |
user_has_role(user, role) -> bool | 检查用户是否拥有角色 |
self.rbac | 底层 RBACService 实例(可为 None) |
底层 RBAC 服务
通过 self.rbac 访问更细粒度的操作:
| 方法 | 说明 |
|---|---|
rbac.grant("role", role_name, permission_path) | 给角色授予权限 |
rbac.assign_role("user", user_id, role_name) | 给用户分配角色 |
rbac.unassign_role("user", user_id, role_name) | 移除用户的角色 |
rbac.revoke("role", role_name, permission_path) | 撤销角色权限 |
权限路径规范
权限路径使用点分格式,支持层级结构:
plugin_name.feature # 如 "group_manager.admin"
plugin_name.feature.sub # 如 "group_manager.admin.kick"框架内部使用 Trie 树进行高效的路径匹配。
实战:群管理权限控制
class GroupManagerPlugin(NcatBotPlugin):
name = "group_manager"
version = "1.0.0"
async def on_load(self):
self.add_permission("group_manager.admin")
self.add_role("gm_admin", exist_ok=True)
if self.rbac:
self.rbac.grant("role", "gm_admin", "group_manager.admin")
def _is_admin(self, user_id) -> bool:
return self.check_permission(str(user_id), "group_manager.admin")
@registrar.on_group_command("踢")
async def on_kick(self, event: GroupMessageEvent, target: At = None):
if not self._is_admin(event.user_id):
await event.reply("🚫 你没有管理权限")
return
if target is None:
await event.reply("请 @一个用户")
return
await self.api.qq.manage.set_group_kick(event.group_id, target.user_id)
await event.reply(f"已踢出用户 {target.user_id}")TimeTaskMixin — 定时任务
TimeTaskMixin 提供定时任务管理,支持多种时间格式。
基本用法
class MyPlugin(NcatBotPlugin):
name = "scheduled_tasks"
version = "1.0.0"
async def on_load(self):
self._enabled = True
self._notify_group = None
# 带条件的定时任务:每 60 秒执行,仅在 enabled 时
self.add_scheduled_task(
"conditional_tick",
"60s",
conditions=[lambda: self._enabled],
)
@registrar.on_group_command("启动心跳")
async def on_start_heartbeat(self, event: GroupMessageEvent):
self._notify_group = str(event.group_id)
success = self.add_scheduled_task("heartbeat", "30s")
if success:
await event.reply("💓 心跳任务已启动(每 30 秒)")
@registrar.on_group_command("停止心跳")
async def on_stop_heartbeat(self, event: GroupMessageEvent):
self.remove_scheduled_task("heartbeat")
await event.reply("💔 心跳任务已停止")
@registrar.on_group_command("添加提醒")
async def on_add_reminder(self, event: GroupMessageEvent, seconds: int = 0):
"""一次性任务:'添加提醒 10' → 10 秒后执行"""
if seconds <= 0:
await event.reply("请输入秒数")
return
self._notify_group = str(event.group_id)
task_name = f"reminder_{seconds}s"
self.add_scheduled_task(task_name, seconds, max_runs=1)
await event.reply(f"⏰ 将在 {seconds} 秒后提醒你")
@registrar.on_group_command("任务列表")
async def on_list_tasks(self, event: GroupMessageEvent):
tasks = self.list_scheduled_tasks()
if not tasks:
await event.reply("当前没有活跃的定时任务")
return
lines = ["📋 定时任务列表:"]
for name in tasks:
status = self.get_task_status(name)
if status:
lines.append(f" {name}: 运行 {status.get('run_count', 0)} 次")
await event.reply("\n".join(lines))
# ---- 任务回调(框架自动调用同名方法) ----
async def heartbeat(self):
"""心跳回调"""
LOG.info("💓 心跳")
if self._notify_group:
await self.api.qq.post_group_msg(
self._notify_group, text="💓 心跳 - 我还活着!"
)
async def conditional_tick(self):
"""条件任务回调"""
LOG.info("⏱️ 条件定时任务执行了")时间格式
| 格式 | 示例 | 说明 |
|---|---|---|
| 秒数字符串 | "30s" / "2h30m" / "0.5d" | 周期执行 |
| 每日时间 | "22:00" / "07:30" | 每天定时执行 |
| 秒数(数字) | 120 / 0.5 | 周期执行 |
| 一次性时间 | "2024-12-31 23:59:59" | 一次性执行 |
API
| 方法 | 说明 |
|---|---|
add_scheduled_task(name, interval, conditions=None, max_runs=None) -> bool | 添加定时任务 |
remove_scheduled_task(name) -> bool | 移除定时任务 |
list_scheduled_tasks() -> List[str] | 列出本插件的所有任务 |
get_task_status(name) -> Optional[Dict] | 获取任务状态 |
cleanup_scheduled_tasks() | 清理所有任务(卸载时自动调用) |
回调约定
任务的回调方法必须与任务名同名——框架自动查找插件实例上的同名异步方法:
# 添加名为 "daily_report" 的任务
self.add_scheduled_task("daily_report", "22:00")
# 框架自动调用 self.daily_report()
async def daily_report(self):
for gid in self.data.get("enabled_groups", []):
await self._send_report(gid)条件执行
通过 conditions 参数传入条件函数列表,只有所有条件都返回 True 时才执行:
self.add_scheduled_task(
"conditional_tick",
"60s",
conditions=[lambda: self._enabled],
)一次性任务
设置 max_runs=1 创建一次性任务,执行后自动移除:
self.add_scheduled_task(f"reminder_{seconds}s", seconds, max_runs=1)EventMixin — 事件流
EventMixin 提供直接操作底层事件流的能力,详细使用方式已在 事件高级用法 和 事件高级用法 中介绍。
API
| 方法 | 说明 |
|---|---|
events(event_type=None) -> EventStream | 创建事件流(支持 async with + async for) |
wait_event(predicate=None, timeout=None) -> Event | 等待满足条件的下一个事件 |
API 速查表
ConfigMixin
| 方法 | 签名 | 说明 |
|---|---|---|
get_config | (key: str, default=None) -> Any | 读取配置 |
set_config | (key: str, value: Any) -> None | 设置配置(立即保存) |
update_config | (updates: Dict[str, Any]) -> None | 批量更新 |
remove_config | (key: str) -> bool | 删除配置项 |
DataMixin
| 属性 | 类型 | 说明 |
|---|---|---|
self.data | Dict[str, Any] | 数据字典(自动持久化) |
RBACMixin
| 方法 | 签名 | 说明 |
|---|---|---|
check_permission | (user: str, permission: str) -> bool | 检查权限 |
add_permission | (path: str) -> None | 注册权限路径 |
remove_permission | (path: str) -> None | 移除权限路径 |
add_role | (role: str, exist_ok=True) -> None | 创建角色 |
user_has_role | (user: str, role: str) -> bool | 检查角色 |
TimeTaskMixin
| 方法 | 签名 | 说明 |
|---|---|---|
add_scheduled_task | (name, interval, conditions=None, max_runs=None) -> bool | 添加任务 |
remove_scheduled_task | (name: str) -> bool | 移除任务 |
list_scheduled_tasks | () -> List[str] | 列出任务 |
get_task_status | (name: str) -> Optional[Dict] | 获取状态 |
EventMixin
| 方法 | 签名 | 说明 |
|---|---|---|
events | (event_type=None) -> EventStream | 创建事件流 |
wait_event | (predicate=None, timeout=None) -> Event | 等待事件 |
下一步
版权所有
版权归属:huan-yp
