DESIGN · EVENT LAYER

一条统一的事件流
给整个框架用。

框架里"某件事发生了"的信号,现在散在六套互不相通的机制里。这层把它们统一成一条总线:源往里 emit,消费者从里 subscribe。proactive 只是它的第一个消费者。

✅ 已落地(迁移步 1·2):总线 · A 类 taps · file.changed · tool gate  | B 类桥接 = 步 3 待做 | 观察:OPENPROGRAM_EVENT_LOG=1

AgentEvent 流auth._emiton_event 回调 WS 广播定时 poll纯日志
现状:六套机制,各用各的,想接一个时机得先搞清它归哪套
EventBus · emit / subscribe
统一后:一条总线,一种接法
01 · ARCHITECTURE

架构总览

所有组件都在同一个 worker 进程里(各是 daemon 线程),所以总线就是一个进程级单例。看事件怎么流:

WORKER 进程 · 单进程 · 多 DAEMON 线程 A 类 · AGENT 活动 A agent loop 模型回复 · 工具前后 · 文件改 A task runner 子任务起止 B 类 · 系统状态 B auth 凭据限流 / 轮换 B context 压缩阈值 70% / 80% B channels 外部消息进来 B memory 空闲处理起止 emit(Event) EventBus 进程级单例 get_event_bus() emit(Event) subscribe(types=…) 源和消费者互不认识 只认总线 观察 · 异步 · 不挡 agent webui server 订阅 → 转发前端 WebSocket proactive 第一个消费者 · 规则层(另文设计) 拦截 · 同步 · 仅 tool.before tool.before 问询点 复用 _approval · 拦下 / 确认 / 放行 对 subagent 同样生效 WS 前端 / TUI 浏览器 · 命令行 (worker 进程之外)
A 类事件 · 观察(异步) B 类事件 · 观察(异步) 拦截(同步,仅 tool.before)
02 · EVENT

Event 模型:核心三样 + 开放口袋

核心(是什么事 + 内容 + 时间)固定;关联信息放进开放的 metadata 口袋,不写死成字段

Event · dataclassfrozen=True · 产生后不可改
核心 · 固定
id唯一编号 ts发生时间 type是什么事 originuser / agent / tool / system / proactive
内容
payload : dict命令、文件路径、哪个账号被限流……
开放口袋 · 需要才塞
metadata : dict{"session":…, "turn":…, "lane":…}
为什么 turn / session 进口袋、不做字段?
它们不是事件的内在属性,是外加的关联——对一半事件(auth、channel)根本没意义,做成字段就得靠"可空"打补丁。成熟事件系统(DOM / 日志 / MQ / 追踪)都这个形状:核心固定,关联进 labels / headers。
turn 不是这层要建模的东西。
框架里没有 Turn 对象,它就是 assistant 消息的 id,靠 ContextVar(_current_turn_id)传着。agent 事件 emit 时它有值就顺手塞进口袋;auth / channel 事件 emit 时它是空的,口袋里自然没有 turn。
03 · SOURCES

两类事件源

容易只盯着 A 类,但 agent loop 之外的 B 类对主动性常常更重要。两类进同一条总线,metadata 口袋天然装得下。

CLASS A

agent 活动事件

agent 干活的过程中发生
用户消息模型回复工具前后文件改动一轮结束子任务起止
对 proactive:基础。metadata 通常带 session / turn / lane。
CLASS B

系统状态事件

全局状态变化——可能没有任何 agent 在跑
凭据被限流上下文要溢出外部消息进来技能变更插件有新版
对 proactive:往往更有价值——"凭据限流了""要溢出了"是明确的可响应时机。
04 · TYPES

事件类型(第一版)

精挑的"有消费者想响应"的时机,不是框架所有动作的清单。以后加新类型是纯加法(见 07)。

TYPE何时现有代码来源
Auser.prompt_submitted用户发消息dispatcher 入口
Amodel.response_started / .completed模型开始 / 说完回复AgentEventMessageStart/End
Atool.before工具即将执行(可拦截)AgentEventToolStart
Atool.after工具执行完AgentEventToolEnd
Afile.changed新增文件被改挂 backup_for_current_turn
Aturn.ended一轮结束AgentEventTurnEnd
Asubagent.started / .ended子任务起止TaskRunner 广播
Bcredential.cooldown / .exhausted / .rotated凭据限流 / 池耗尽 / 轮换AuthStore._emit
Bcontext.compaction_recommended / .compacted上下文到阈值 / 已压缩context/engine.py
Bchannel.message_inbound外部消息进来channels/_broadcast.py
Bskills.changed / plugins.update_available技能改 / 插件有新版webui watcher
05 · PLACEMENT

总线:一个进程级单例

webui、agent loop、channels、memory、auth、task runner 全在同一个 worker 进程里。总线照框架已有的 get_store() / get_runner() 双检锁先例做单例,复用闲置的 agent/event_bus.py。不需要跨进程桥接。

# agent/event_bus.py · 照 AuthStore 先例
def get_event_bus() -> EventBus: …

class EventBus:
  def emit(self, event: Event): …
    # 广播,fire-and-forget,不阻塞调用方
  def subscribe(self, handler, *, types=None): …
    # 按事件类型订阅,只收关心的那几类
01同进程所有线程拿同一实例,直接 emit / subscribe——这是调研确认的事实,不是假设。
02跟现有 EventBus 的差别:从按 channel 字符串订阅,改成按事件类型订阅、传统一 Event
03auth 已经把事件做对了(subscribe/_emit,11 种事件)——这层是把它的做法推广到全框架,不是发明。
06 · TWO MODES

观察 vs 拦截

观察型

默认 · 异步

emit 出去,订阅者异步收到,事件源不等。

  • 所有事件都走这条
  • 订阅者再慢也不拖慢框架

拦截型

仅 tool.before · 同步

工具执行前,下游可以说"别执行"。挂在工具单一入口 _execute_tool_callstool.execute() 之前。

  • 必须快——不许调 LLM
  • 多方表态取最严:拦下 > 确认 > 放行,"确认"复用现有 _approval
  • 对 subagent 也生效——独立于 permission_mode=bypass,否则危险动作塞进子任务就溜了
07 · PRINCIPLES

两条要记住的原则

有消费者想响应,才是事件

不是所有调用都是事件。类型表是精挑的:工具要执行、凭据被限流——是事件;agent 内部一次列表拼接——不是。事件流变成什么都往里倒的垃圾场,是这类系统最常见的腐烂方式。

演进只加不改

源和消费者互不认识,所以加事件只动 emit 那一处:框架内函数加一行,一类动作在公共入口加一次。加类型、给 payload 加字段都零风险;改老结构才会伤老订阅者——这也是 payload / metadata 用开放 dict 的理由。

落地顺序与接线点(file:line)见 docs/plans/proactive-implementation.md:先把 A 类收口成总线并验证完整事件序列 → 补 file.changed 和工具前可截 → 桥进 B 类 → 补并发 lane 区分。