API REFERENCE · EVENT LAYER
事件层 Event Layer
参考手册
事件层将 OpenProgram 各子系统的运行时活动统一为一条可订阅的事件流。本手册分三部分:系统框架图给出事件层在框架中的位置与连接关系;事件目录按子系统列出全部事件类型及其 payload、触发时机、源码位置;API 与用法给出订阅、拦截、发问三类接口的签名与示例。
OVERVIEW
概述
一条进程级单例总线(openprogram/agent/event_bus.py)。各子系统当「源」,发生事情时把事件 emit 进总线;「消费者」(webui、将来的任何功能)按事件类型 subscribe。源和消费者互不认识,只认总线——所以新增一个事件源或一个消费者都不牵动别处。
SYSTEM MAP
系统框架图
左侧为发出事件的各子系统(方块内标注其事件,点击可跳转至对应详情),中部为总线 EventBus,右侧为消费者。琥珀色支线为唯一的同步拦截点 tool.before。
DATA MODEL
Event 数据模型
每条事件都是一个不可变(frozen)的小数据包。核心三样固定,关联信息放进开放的 metadata 口袋。openprogram/agent/event_bus.py:39-49
@dataclass(frozen=True)
class Event:
id: str # 唯一编号
ts: float # 发生时间
type: str # 事件类型(如 tool.before)
origin: str # 谁引起:user/agent/tool/system/proactive
payload: dict # 这件事的内容(每类不同)
metadata: dict # 开放口袋:{"session":…,"turn":…},需要才有
metadata 由发事件时自动从 ContextVar 补上 session/turn——agent 活动事件通常有,系统事件通常没有。
API · CORE
总线 API + 发事件 helper
get_event_bus() → EventBus
取进程级单例总线(双检锁)。同一 worker 进程任意线程拿到同一实例。event_bus.py:241-251
EventBus.subscribe(handler, *, types=None) → unsubscribe()
注册类型化订阅者。types=None 收全部;types={"tool.before",…} 只收这几类。返回取消订阅函数。event_bus.py:159-178
EventBus.emit(event)
分发给所有匹配订阅者。fire-and-forget——某 handler 抛异常不影响其他、不影响 emit 方。
emit_safe(type, origin, payload=None, metadata=None)
源发事件的便捷入口:构造 Event(自动填 id/ts + ContextVar 关联)并 emit,吞掉一切异常。框架里所有 A/B 类事件都用它。event_bus.py:96-107
emit_ws_frame(frame)
外部源用:把现成前端 WS 帧装进 ws.frame 事件送往前端,让源不再 import webui。event_bus.py:110-124
GUIDE ①
观察:订阅事件做只读响应
订阅指定类型的事件,在其发生时执行只读逻辑。异步执行,不阻塞 agent 主循环。
from openprogram.agent.event_bus import get_event_bus
def on_event(ev):
if ev.type == "file.changed":
print("改了", ev.payload["path"], ev.metadata.get("session"))
unsub = get_event_bus().subscribe(on_event, types={"file.changed", "tool.after"})
GUIDE ②
拦截:在工具执行前拦下来
全框架唯一的同步拦截点,只在 tool.before。gate 函数返回 None 放行 / deny 理由字符串拦下(理由作为 error tool result 回给模型)。多 gate 取最严,对 subagent 也生效。
from openprogram.agent.tool_gate import register_tool_gate
def guard(ev):
if ev.payload.get("tool") == "bash" and "rm -rf /" in str(ev.payload.get("args", {}).get("command", "")):
return "危险删除,拦下"
return None
unregister = register_tool_gate(guard)
gate 必须快:挡在工具执行同步路径上,agent 在等它。别调 LLM / 读网络。自身抛异常按放行(fail-open)。openprogram/agent/tool_gate.py
GUIDE ③
发问:函数中途停下来问用户
在函数体里调 runtime.ask / runtime.confirm,阻塞到用户在前端回答。三态显式。
answer = runtime.ask("用哪个日期库?", options=["dayjs", "luxon"], timeout=300, default=None)
# 答了→返回;拒绝→抛 UserDeclined;超时→有 default 返 default 否则抛 AskTimeout
ok = runtime.confirm("要归档全部 87 封邮件吗?", default=False) # 超时返 default,不抛
if runtime.can_ask(): # headless 无前端时为 False
name = runtime.ask("项目名?")
当前范围:同进程 @function 工具可用;@agentic_function 跑子进程需 parent→child 通道(Phase 2 未做)。runtime.py(ask/confirm)· agent/questions.py
DEBUG
调试:事件日志
设置环境变量 OPENPROGRAM_EVENT_LOG=1(或指定路径),每条事件将以一行 JSON 追加落盘。event_bus.py:256-285
OPENPROGRAM_EVENT_LOG=1 openprogram worker restart
cat /tmp/openprogram-events.jsonl
普通 chat turn 会依序打出 user.prompt_submitted → model.response_started → tool.before → tool.after → model.response_completed → turn.ended,带 metadata 里的 session/turn。确认事件层在跑的最快方式。
APPENDIX
附录
origin 枚举:user(用户)· agent(模型/主循环)· tool(工具)· system(系统状态)· proactive(保留,策略未激活)。
不在范围: proactive 策略包(policies/engine)是已写好但未激活的存量,不属于事件层。许多 webui 内部帧(chat_ack/chat_response/status 等)仍走 webui 直接广播,不是事件层基础设施,本文档只列经事件层流动的部分。