FinSight × BettaFish 多Agent升级终极执行计划（2025-12-09）
本文档是 FinSight 项目对齐 BettaFish 架构的唯一执行依据。其他相关文档（BettaFish_Alignment_Plan_CN.md、FinSight_BettaFish_Final_Plan_2025-12-08.md、feature_logs/opus.md）归档为技术参考，不再单独维护。

一、BettaFish 核心机制速览（必须理解）
1.1 论坛式多Agent协作
四个专职Agent：QueryAgent（新闻搜索）、MediaAgent（多模态）、InsightAgent（历史舆情）、ReportAgent（报告整合）
ForumEngine：中央论坛，Agent不直接通信，通过forum.log异步交流
ForumHost：独立LLM主持人，输出四段式引导（事件时间线→观点整合→深度分析→讨论指引）
1.2 反思循环（Reflection Loop）
每个Agent内部：初始搜索 → 首次总结 → [ReflectionNode识别知识空白 → 精炼搜索 → ReflectionSummaryNode更新总结] × 2-3轮

1.3 高召回+KV缓存
多源并行搜索（Tavily+DDG+爬虫）
LLM仅做2-5句摘要，保留Markdown链接
结果写入KV（key=ticker:field，含as_of/source/text/links/ttl）
1.4 中间表示（IR）
报告先生成结构化JSON（sections/evidence/confidence/risks），校验后再渲染Markdown/HTML

二、FinSight目标架构
User → Orchestrator(LangGraph)
       ├── PriceAgent        [常驻] 实时行情，TTL=30秒
       ├── TechnicalAgent    [常驻] 技术指标，TTL=5分钟
       ├── FundamentalAgent  [常驻] 财报估值，TTL=1小时
       ├── NewsAgent         [常驻] 新闻舆情，TTL=10分钟
       ├── MacroAgent        [按需] 宏观事件，复杂问题触发
       └── DeepSearchAgent   [按需] 长文研究，信息不足触发
       → ForumHost（冲突消解+观点融合）
       → IR校验 → ReportRenderer（Markdown/HTML/PDF）

共享层：
- KV缓存：backend/services/cache.py
- 熔断器：backend/services/circuit_breaker.py
- 诊断日志：source/duration_ms/fail_reason/fallback_used/cache_hit
三、三阶段执行计划
阶段0：基座强化（第1-2周）
Day 1-2：工具输出标准化
tools.py
改动：所有工具函数返回统一结构
{
    "value": ...,           # 实际数据
    "as_of": "ISO时间戳",    # 数据时间
    "source": "yfinance",   # 数据来源
    "fallback_used": false, # 是否用了兜底
    "duration_ms": 230,     # 耗时毫秒
    "fail_reason": null     # 失败原因（成功为null）
}
验收：运行test/test_tools.py，所有工具输出符合schema

Day 3-4：KV缓存层
新建文件：backend/services/cache.py
class ToolCache:
    def __init__(self):
        self.cache = {}
    
    def get(self, key: str) -> Optional[dict]:
        if key in self.cache:
            data, timestamp, ttl = self.cache[key]
            if (datetime.now() - timestamp).seconds < ttl:
                return data
        return None
    
    def set(self, key: str, value: dict, ttl: int = 60):
        self.cache[key] = (value, datetime.now(), ttl)
    
    def make_key(self, ticker: str, field: str, interval: str = "1d") -> str:
        return f"{ticker}:{field}:{interval}"
TTL配置：行情30秒、新闻600秒、财报3600秒
验收：连续调用get_stock_price("AAPL")两次，第二次命中缓存

Day 5-6：熔断器
新建文件：backend/services/circuit_breaker.py
class CircuitBreaker:
    # 状态：CLOSED(正常) → OPEN(熔断) → HALF_OPEN(试探)
    def __init__(self, failure_threshold=3, recovery_timeout=300):
        self.states = {}  # {source: {failures, last_failure, state}}
    
    def record_failure(self, source: str): ...
    def record_success(self, source: str): ...
    def can_call(self, source: str) -> bool: ...
    验收：yfinance连续失败3次后自动跳过，5分钟后恢复

Day 7-8：搜索兜底
tools.py
改动：在get_stock_price等函数末尾增加Tavily兜底
# 所有源都失败时
if all_failed and TAVILY_AVAILABLE:
    result = _search_price_fallback(ticker)  # 限时3秒
    result["fallback_used"] = True
    return result
验收：模拟所有API失败，仍能返回搜索兜底的价格

Day 9-10：LangGraph打点
langchain_agent.py
改动：每个节点增加tracing metadata
# 在工具调用前后记录
with trace_tool(name="get_stock_price", ticker=ticker) as span:
    result = get_stock_price(ticker)
    span.set_attribute("source", result["source"])
    span.set_attribute("duration_ms", result["duration_ms"])
    span.set_attribute("cache_hit", result.get("cache_hit", False))
    验收：LangSmith可见完整调用链，每个工具有source/duration标签

Day 11-12：前端诊断面板
文件：frontend/src/components/Diagnostics.tsx（新建）
功能：

显示工具调用列表（名称、耗时、状态）
标记数据来源（API/缓存/兜底）
失败原因高亮显示
验收：对话时可展开查看诊断信息
Day 13-14：测试+文档
新增测试：

test/test_cache.py - 缓存命中/过期/清理
test/test_circuit_breaker.py - 熔断/恢复状态机
test/test_fallback.py - 全源失败→搜索兜底成功
文档：更新本文件的"阶段0完成"状态
阶段0验收标准：

 工具失败不再500，返回结构化错误
 缓存命中率可观测（日志可见cache_hit）
 熔断状态可见（diagnostics显示跳过原因）
 搜索兜底可用（Tavily限时返回）
 LangSmith可见完整调用链
阶段1：子Agent雏形（第3-4周）
Week 3：BaseAgent + NewsAgent + PriceAgent
新建目录：backend/agents/

base_agent.py
from dataclasses import dataclass
from typing import List, Optional
from datetime import datetime

@dataclass
class EvidenceItem:
    text: str
    source: str
    url: Optional[str]
    timestamp: Optional[datetime]
    confidence: float  # 0-1

@dataclass
class AgentOutput:
    agent_name: str
    summary: str
    evidence: List[EvidenceItem]
    confidence: float
    data_sources: List[str]
    as_of: datetime
    fallback_used: bool
    risks: List[str]

class BaseFinancialAgent:
    AGENT_NAME = "base"
    MAX_REFLECTIONS = 2
    
    def __init__(self, llm, cache):
        self.llm = llm
        self.cache = cache
    
    async def research(self, query: str, ticker: str) -> AgentOutput:
        # 1. 初始搜索
        results = await self._initial_search(query, ticker)
        summary = await self._first_summary(results)
        
        # 2. 反思循环
        for i in range(self.MAX_REFLECTIONS):
            gaps = await self._identify_gaps(summary)
            if not gaps:
                break
            new_data = await self._targeted_search(gaps)
            summary = await self._update_summary(summary, new_data)
        
        return self._format_output(summary, results)
    
    async def _identify_gaps(self, summary: str) -> List[str]:
        """LLM识别知识空白"""
        prompt = f"""分析以下总结，识别信息空白：
        {summary}
        
        检查：缺少时间线？缺少数值？缺少对比？缺少风险？
        输出需要补充的问题列表，信息充足则输出空列表。"""
        return await self.llm.generate(prompt)
news_agent.py
                class NewsAgent(BaseFinancialAgent):
            AGENT_NAME = "NewsAgent"
            CACHE_TTL = 600
            
            async def _initial_search(self, query: str, ticker: str):
                cache_key = f"{ticker}:news:24h"
                cached = self.cache.get(cache_key)
                if cached:
                    return cached
                
                results = []
                # Finnhub新闻
                results.extend(await self._fetch_finnhub(ticker))
                # Tavily搜索
                results.extend(await self._fetch_tavily(f"{ticker} stock news"))
                
                results = self._deduplicate(results)
                self.cache.set(cache_key, results, self.CACHE_TTL)
                return results
price_agent.py
class PriceAgent(BaseFinancialAgent):
    AGENT_NAME = "PriceAgent"
    CACHE_TTL = 30
    MAX_REFLECTIONS = 0  # 行情不需要反思
    
    async def _initial_search(self, query: str, ticker: str):
        cache_key = f"{ticker}:price:realtime"
        cached = self.cache.get(cache_key)
        if cached:
            return cached
        
        # 多源回退
        for source in ["yfinance", "alphavantage", "finnhub", "tavily"]:
            if self.circuit_breaker.can_call(source):
                try:
                    result = await self._fetch_from(source, ticker)
                    self.circuit_breaker.record_success(source)
                    self.cache.set(cache_key, result, self.CACHE_TTL)
                    return result
                except Exception as e:
                    self.circuit_breaker.record_failure(source)
        
        raise AllSourcesFailedError(ticker)
Week 3验收：

 NewsAgent可独立调用，输出符合AgentOutput
 PriceAgent多源回退正常，熔断生效
 两个Agent的缓存独立生效
Week 4：TechnicalAgent + FundamentalAgent + Orchestrator
technical_agent.py
职责：技术指标分析（MA/RSI/MACD/支撑阻力）
工具：get_kline_data, 技术指标计算函数
TTL：300秒

fundamental_agent.py
职责：财报、估值、盈利分析
工具：get_financial_statements, get_key_metrics
TTL：3600秒

orchestrator.py
class Orchestrator:
    def __init__(self, agents: Dict[str, BaseFinancialAgent]):
        self.agents = agents
        self.forum = ForumHost()
    
    async def analyze(self, query: str, ticker: str) -> ForumOutput:
        # 1. 并行调用常驻Agent
        tasks = [
            self.agents["price"].research(query, ticker),
            self.agents["news"].research(query, ticker),
            self.agents["technical"].research(query, ticker),
            self.agents["fundamental"].research(query, ticker),
        ]
        outputs = await asyncio.gather(*tasks, return_exceptions=True)
        
        # 2. 过滤失败的
        valid_outputs = {
            name: out for name, out in zip(self.agents.keys(), outputs)
            if not isinstance(out, Exception)
        }
        
        # 3. Forum综合
        return await self.forum.synthesize(valid_outputs)
forum.py
class ForumHost:
    SYNTHESIS_PROMPT = """
    你是金融研究主持人。根据各Agent分析结果生成综合报告：
    
    ## 价格Agent: {price}
    ## 新闻Agent: {news}
    ## 技术Agent: {technical}
    ## 基本面Agent: {fundamental}
    
    输出：
    1. 【共识观点】各Agent一致认同的结论
    2. 【分歧观点】存在冲突的判断及原因
    3. 【置信度】根据数据质量给出0-1分数
    4. 【投资建议】BUY/HOLD/SELL及理由
    5. 【风险提示】关键风险因素
    """
    
    async def synthesize(self, outputs: Dict[str, AgentOutput]) -> ForumOutput:
        # 冲突检测
        conflicts = self._detect_conflicts(outputs)
        # LLM综合
        result = await self.llm.generate(self.SYNTHESIS_PROMPT.format(**outputs))
        return ForumOutput(...)
Week 4验收：

 4个常驻Agent都可独立调用
 Orchestrator并行调用成功
 Forum能识别冲突并给出综合结论
 典型查询触发≥2个Agent
阶段1验收标准：

 backend/agents/目录包含5个文件
 AgentOutput数据类定义完整
 反思循环在NewsAgent中生效
 Orchestrator并行调用耗时<单Agent串行
 Forum输出包含共识/分歧/建议
阶段2：IR + 按需Agent + 前端展示（第5-6周）
Week 5：IR Schema + DeepSearchAgent + MacroAgent
backend/report/ir.py
from pydantic import BaseModel, validator
from typing import List, Optional
from datetime import datetime

class EvidenceItem(BaseModel):
    text: str
    source: str
    url: Optional[str]
    confidence: float

class Section(BaseModel):
    title: str
    key_points: List[str]
    evidence: List[EvidenceItem]
    agent_source: str

class ReportIR(BaseModel):
    ticker: str
    report_title: str
    generated_at: datetime
    data_as_of: datetime
    
    executive_summary: str
    recommendation: str  # BUY/HOLD/SELL
    confidence: float
    
    sections: List[Section]
    risks: List[str]
    data_sources: List[str]
    fallback_used: bool
    
    @validator('sections')
    def must_have_sections(cls, v):
        if len(v) < 2:
            raise ValueError('报告至少需要2个章节')
        return v
    
    @validator('confidence')
    def confidence_range(cls, v):
        if not 0 <= v <= 1:
            raise ValueError('置信度必须在0-1之间')
        return v

class IRRenderer:
    def to_markdown(self, ir: ReportIR) -> str: ...
    def to_html(self, ir: ReportIR) -> str: ...
    deep_search_agent.py
触发条件：主链路信息不足（新闻<3条 或 as_of>24小时）
职责：高召回抓取长文、研报、深度分析
工具：Tavily深度搜索、网页爬虫
输出：写入KV供后续复用

macro_agent.py
触发条件：查询涉及宏观因素（利率、通胀、政策）
职责：宏观环境、事件日历、风险暴露
工具：search宏观关键词、经济数据API

Week 6：前端展示 + 端到端测试
前端改动
报告结构化展示（章节可折叠）
Agent贡献标记（侧栏显示哪些Agent参与）
证据链接可点击
置信度可视化（进度条）
风险提示高亮
端到端测试
# test/test_e2e_multi_agent.py

async def test_full_analysis_pipeline():
    """完整分析流程"""
    result = await orchestrator.analyze("分析AAPL投资价值", "AAPL")
    
    assert result.agents_used >= 2
    assert result.recommendation in ["BUY", "HOLD", "SELL"]
    assert 0 <= result.confidence <= 1
    assert len(result.sections) >= 2
    assert all(s.evidence for s in result.sections)

async def test_fallback_to_deep_search():
    """信息不足时触发DeepSearch"""
    # 模拟NewsAgent返回空
    result = await orchestrator.analyze("分析某冷门股", "OBSCURE")
    
    assert "DeepSearchAgent" in result.agents_used

async def test_ir_validation():
    """IR校验"""
    ir = ReportIR(...)
    assert ir.confidence >= 0
    assert len(ir.sections) >= 2
    阶段2验收标准：

 IR Schema定义完整，Pydantic校验通过率>95%
 DeepSearchAgent按需触发正常
 MacroAgent对宏观问题响应
 前端报告可折叠/展开章节
 Agent贡献可见
 证据链接可点击
 端到端测试通过
 四、关键代码路径汇总
backend/
├── agents/                    # 阶段1新建
│   ├── __init__.py
│   ├── base_agent.py         # AgentOutput + BaseFinancialAgent
│   ├── price_agent.py        # 行情Agent
│   ├── news_agent.py         # 新闻Agent（含反思循环）
│   ├── technical_agent.py    # 技术Agent
│   ├── fundamental_agent.py  # 基本面Agent
│   ├── macro_agent.py        # 宏观Agent（按需）
│   └── deep_search_agent.py  # 深度搜索Agent（按需）
├── orchestration/
│   ├── orchestrator.py       # 并行调用+结果收集
│   └── forum.py              # ForumHost冲突消解
├── report/
│   └── ir.py                 # ReportIR + IRRenderer
├── services/                  # 阶段0新建
│   ├── cache.py              # KV缓存
│   └── circuit_breaker.py    # 熔断器
├── tools.py                   # 阶段0改造（输出标准化）
└── langchain_agent.py         # 阶段0改造（tracing）

frontend/src/
├── components/
│   ├── Diagnostics.tsx       # 阶段0新建
│   ├── ReportView.tsx        # 阶段2改造（IR展示）
│   └── AgentBadge.tsx        # 阶段2新建（Agent标记）

test/
├── test_cache.py             # 阶段0
├── test_circuit_breaker.py   # 阶段0
├── test_fallback.py          # 阶段0
├── test_agents.py            # 阶段1
├── test_orchestrator.py      # 阶段1
├── test_ir.py                # 阶段2
└── test_e2e_multi_agent.py   # 阶段2
五、日志字段规范
所有工具调用必须记录：
{
    "timestamp": "2025-12-09T10:30:00.123Z",
    "tool_name": "get_stock_price",
    "ticker": "AAPL",
    "source": "yfinance",
    "duration_ms": 230,
    "cache_hit": false,
    "fallback_used": false,
    "fail_reason": null,
    "result_preview": "price=150.23"
}
Agent调用记录：
{
    "timestamp": "2025-12-09T10:30:01.456Z",
    "agent_name": "NewsAgent",
    "ticker": "AAPL",
    "reflection_rounds": 2,
    "evidence_count": 5,
    "confidence": 0.85,
    "duration_ms": 3200
}
六、风险与应对
风险	应对
多Agent并行导致API限流	熔断器+请求间隔+优先级排序
反思循环无限递归	MAX_REFLECTIONS硬限制
IR校验失败率高	先宽松后收紧，记录失败case
Forum冲突消解不准	保留原始Agent输出供人工审核
前端渲染复杂度	分阶段：先文本→再折叠→再可视化
七、验收Checklist总览
阶段0（第1-2周）
 tools.py所有函数返回标准schema
 cache.py实现且TTL可配置
 circuit_breaker.py三状态机正常
 搜索兜底Tavily可用
 LangSmith可见tracing
 前端Diagnostics面板可用
阶段1（第3-4周）
 4个常驻Agent定义完成
 AgentOutput数据类可用
 反思循环在NewsAgent生效
 Orchestrator并行调用正常
 ForumHost输出结构化综合
阶段2（第5-6周）
 ReportIR Pydantic校验通过
 DeepSearchAgent按需触发
 前端报告章节可折叠
 Agent贡献可见
 端到端测试全通过
完成以上全部，FinSight将从「单Agent+工具」升级为「多Agent协作+反思循环+IR结构化+KV缓存」的专业金融研究平台。
