#!/usr/bin/env python3
"""
pr-actionable — classify open PRs into actionable/not-actionable states.

Daily metric: how many PRs are waiting on *us* vs. waiting on the author/spec.
Writes the tier→state→PR tree to actionability.json for the dashboard's pulls page.

Usage:
  pr-actionable --repo typescript-sdk [--no-llm] [--json F] [--gist-markdown F] [--visr-json F]
  pr-actionable --repo typescript-sdk,python-sdk --gist-markdown '/tmp/gist-{repo}.md'
  pr-actionable --repo typescript-sdk,python-sdk --visr-json 'data/{repo}/actionability.json'

Data model: _structured(r) is the single source of truth. Gist/Slack/VISR all render from it.
"""

import argparse
import json
import os
import shutil
import subprocess
import sys
import time
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Literal

sys.path.insert(0, str(Path(__file__).parent / "lib"))
from access import load as load_maintainers, load_from_visr_json, RepoMaintainers  # noqa: E402

# --- Config ---

# Maintainers loaded per-repo from modelcontextprotocol/access (source of truth).
# Set at runtime in main() so classify() can read it without threading through 8 call sites.
MAINTAINERS: set[str] = set()

BOT_REVIEWER_PATTERNS = (
    "github-advanced-security", "copilot-pull-request-reviewer",
    "dependabot", "renovate", "claude",
)

AUTH_PATH_GLOBS = ("auth", "oauth", "token", "Token", "OAuth")
# Dropped from title keywords: "session" (MCP ClientSession/session_id is a transport
# concept), "token" (progressToken/resumption/cancellation tokens), "oidc" (CI trusted
# publishing). "token"/"Token" stay in PATH_GLOBS — auth code lives in token-named files.
AUTH_TITLE_KEYWORDS = (
    "auth", "oauth", "credential", "authorization",
    "jwt", "bearer", "scope", "pkce",
)

BOT_RELEASE_TITLE_PREFIXES = (
    "Version Packages",
    "chore: weekly dependency update",
)

LLM_MODEL = "sonnet"

# Labels that directly set state (Tier-2 overrides — human applied once, then deterministic)
LABEL_OVERRIDES = {
    "blocked:sep": "blocked-on-sep",
    "blocked:depends-on": "blocked-on-dependent",
    "blocked:on-hold": "on-hold",
    "blocked:deferred": "maintainer-drafted-deferred",
    "state:parked": "parked",
}

# --- Timers ---

DAYS_COMMUNITY_SLA = 7
DAYS_STALE_AUTHOR = 14
DAYS_STALE_AUTHOR_SUSPECTED_BOT = 3
DAYS_PARKED_THRESHOLD = 30
DAYS_PRE_CI_GHOST = 90
DAYS_CI_RED_ABANDONED = 30
DAYS_AUTHOR_PING_WAIT = 14  # double the SLA — give the @-mentioned maintainer time to actually see the ping

# --- State catalog ---
#
# Single source of truth. Everything downstream (routing sets, effort table,
# tier membership, gist definitions, sort order) is DERIVED from this list.
# Adding a state = one StateSpec line + the pr.state assignment in classify().
# Import-time assertions catch omissions before a PR can silently disappear.

Route = Literal["actionable", "not-actionable", "excluded"]
Sort = Literal["age", "wait", "number"]


@dataclass(frozen=True)
class StateSpec:
    name: str
    route: Route
    desc: str
    effort_min: int | None = None   # required for actionable, None otherwise
    tier: str | None = None          # required for non-bot actionable, None otherwise
    sort_by: Sort = "number"
    is_bot: bool = False


# Tier definitions: (name, blockquote). Order = gist render order.
TIER_DEFS: list[tuple[str, str]] = [
    ("Press a button",         "Already approved or already decided — ready for action."),
    ("We're blocking someone", "Author did what we asked and is waiting on us. Longest-waiting first."),
    ("High leverage",          "One decision unblocks or closes multiple things."),
    ("Intake",                 "PRs not yet reviewed by a maintainer. Oldest first."),
    ("Hygiene",                "Batchable procedural ops — pings, rebases, peer reviews."),
    ("Close candidates",       "Likely to be closed — inactive, stale, or needing more context. Reviewed before closing."),
]

STATES: list[StateSpec] = [
    # --- Tier 0: Press a button ---
    StateSpec("needs-merge", "actionable",
              "Approved and CI-green — ready to merge.",
              effort_min=1, tier="Press a button"),
    StateSpec("awaiting-ci", "actionable",
              "Approved — waiting for CI to finish.",
              effort_min=1, tier="Press a button"),
    StateSpec("approved-ci-red", "actionable",
              "Approved but CI is failing — needs a fix or retry before merge.",
              effort_min=5, tier="Press a button"),

    # --- Tier 1: We're blocking someone ---
    StateSpec("author-pinged-after-procedural", "actionable",
              "Author addressed maintainer feedback and pinged — awaiting response.",
              effort_min=10, tier="We're blocking someone", sort_by="wait"),
    StateSpec("needs-re-review", "actionable",
              "Author pushed changes after review feedback — needs re-review.",
              effort_min=20, tier="We're blocking someone", sort_by="wait"),
    StateSpec("approval-contested", "actionable",
              "One reviewer approved, another requested changes — reviewers need to align.",
              effort_min=15, tier="We're blocking someone"),
    StateSpec("maintainer-intake", "actionable",
              "Maintainer-authored PR awaiting review from another maintainer.",
              effort_min=20, tier="We're blocking someone", sort_by="age"),

    # --- Tier 2: High leverage ---
    StateSpec("duplicate-cluster", "actionable",
              "Multiple PRs address the same issue — one will be picked, others closed as duplicates.",
              effort_min=8, tier="High leverage"),
    StateSpec("needs-decision", "actionable",
              "Maintainer discussed but hasn't approved or requested changes yet.",
              effort_min=20, tier="High leverage"),
    StateSpec("backport-follows-primary", "actionable",
              "v1.x sibling of a main-branch PR. Review together — backport diff is usually mechanical.",
              effort_min=5, tier="High leverage"),

    # --- Tier 3: Intake ---
    StateSpec("needs-first-review", "actionable",
              "Not yet reviewed by a maintainer.",
              effort_min=30, tier="Intake", sort_by="age"),
    StateSpec("community-reviewed", "actionable",
              "Community members have reviewed, but no maintainer has engaged yet.",
              effort_min=25, tier="Intake", sort_by="age"),

    # --- Tier 4: Hygiene ---
    StateSpec("stale-awaiting-author", "actionable",
              "Changes requested over two weeks ago with no author response.",
              effort_min=5, tier="Hygiene", sort_by="wait"),
    StateSpec("approved-rotted", "actionable",
              "Was approved, now conflicting. Rebase or ask author.",
              effort_min=10, tier="Hygiene"),
    StateSpec("ci-red-silent", "actionable",
              "CI failing, not yet flagged to the author.",
              effort_min=1, tier="Hygiene"),

    # --- Tier 5: Close candidates ---
    StateSpec("stale-ci-abandoned", "actionable",
              "CI has been failing for over a month with no activity.",
              effort_min=1, tier="Close candidates"),
    StateSpec("pre-ci-ghost", "actionable",
              "Over 90 days old and CI never ran.",
              effort_min=1, tier="Close candidates"),

    # --- Bot corner (actionable, rendered separately, no tier) ---
    StateSpec("bot-broke-ci", "actionable",
              "Automated dependency update with failing CI.",
              effort_min=3, is_bot=True),

    # --- Unknown (actionable, no tier — renders in Unclassified) ---
    StateSpec("unknown", "actionable",
              "Classifier couldn't decide. Investigate.",
              effort_min=15),

    # --- Not actionable (ball is elsewhere) ---
    StateSpec("awaiting-author", "not-actionable",
              "Changes requested; waiting for author response."),
    StateSpec("awaiting-author-ci", "not-actionable",
              "CI failure flagged to author; waiting for fix."),
    StateSpec("draft-active", "not-actionable",
              "Author is still working. Fresh draft."),
    StateSpec("stale-draft", "not-actionable",
              "Draft with no activity for over a week."),
    StateSpec("stale-draft-pinged", "not-actionable",
              "Draft; maintainer pinged for status over a week ago with no response."),
    StateSpec("blocked-on-sep", "not-actionable",
              "Waiting on a spec proposal (SEP) to land."),
    StateSpec("blocked-on-dependent", "not-actionable",
              "Waiting on a dependent PR to merge."),
    StateSpec("on-hold", "actionable",
              "Maintainer-internal decision pending. Author can't unblock.",
              effort_min=30, tier="We're blocking someone"),
    StateSpec("maintainer-drafted-deferred", "not-actionable",
              "Converted to draft by maintainers pending a larger decision."),
    StateSpec("duplicate-cluster-member", "not-actionable",
              "Part of a duplicate cluster — see the primary PR."),

    # --- Excluded (not in the pipeline) ---
    StateSpec("parked", "excluded",
              "Maintainer draft >30d, intentionally shelved."),
    StateSpec("bot-release", "excluded",
              "Changesets 'Version Packages' — release trigger, not for triage."),
]

# --- Derived lookups (cannot drift from STATES) ---

_by_name: dict[str, StateSpec] = {s.name: s for s in STATES}
ACTIONABLE_STATES = frozenset(s.name for s in STATES if s.route == "actionable")
NOT_ACTIONABLE_STATES = frozenset(s.name for s in STATES if s.route == "not-actionable")
EXCLUDED_STATES = frozenset(s.name for s in STATES if s.route == "excluded")
EFFORT_MINUTES = {s.name: s.effort_min for s in STATES if s.effort_min is not None}
STATE_DEFS = {s.name: s.desc for s in STATES}
BOT_STATES = [s.name for s in STATES if s.is_bot]

# TIERS keeps its (name, blockquote, [states]) shape so format_gist_markdown is unchanged.
# Preserves STATES declaration order within each tier.
_tier_states: dict[str, list[str]] = {t: [] for t, _ in TIER_DEFS}
for _s in STATES:
    if _s.tier:
        _tier_states[_s.tier].append(_s.name)
TIERS: list[tuple[str, str, list[str]]] = [
    (name, blurb, _tier_states[name]) for name, blurb in TIER_DEFS
]

# Replaces the hardcoded set of age-sorted states in format_gist_markdown.
SORT_BY_AGE = frozenset(s.name for s in STATES if s.sort_by in ("age", "wait"))

# --- Import-time completeness assertions ---
# Crash immediately on inconsistency — before a PR can silently disappear.

_tier_names = {t for t, _ in TIER_DEFS}
_seen_names: set[str] = set()
for _s in STATES:
    assert _s.name not in _seen_names, f"duplicate state: {_s.name!r}"
    _seen_names.add(_s.name)
    if _s.route == "actionable":
        assert _s.effort_min is not None, f"{_s.name!r}: actionable states need effort_min"
        if not _s.is_bot and _s.name != "unknown":
            assert _s.tier in _tier_names, (
                f"{_s.name!r}: tier={_s.tier!r} not in TIER_DEFS "
                f"(actionable non-bot states must have a valid tier)"
            )
    else:
        assert _s.effort_min is None, f"{_s.name!r}: non-actionable states shouldn't have effort_min"
        assert _s.tier is None, f"{_s.name!r}: non-actionable states shouldn't have a tier"
for _t, _ in TIER_DEFS:
    assert _tier_states[_t], f"tier {_t!r} has no states"
del _seen_names, _tier_names, _s, _t  # don't leak into module namespace



@dataclass
class PR:
    number: int
    title: str
    author: str
    is_draft: bool
    created_at: datetime
    base_ref: str
    labels: list[str]
    linked_issues: list[int]
    files: list[str] = field(default_factory=list)
    additions: int = 0
    deletions: int = 0
    body: str = ""
    # Tier B+ — lazy
    reviews: list[dict] | None = None
    mergeable: str | None = None
    ci_status: str | None = None  # SUCCESS / FAILURE / PENDING / NONE
    ci_completed_at: datetime | None = None
    # Tier C — lazy
    pushes: list[datetime] | None = None
    commit_authors: list[str] | None = None
    comments: list[dict] | None = None  # {author, body, created_at}
    # Derived
    state: str = "unknown"
    reason: str = ""
    wait_since: datetime | None = None  # ball-moved-to-us timestamp — SLA clock start for this state
    is_maintainer: bool = False
    is_auth: bool = False
    cluster_id: int | str | None = None  # issue number, or "sem:<hash>" for semantic clusters
    cluster_pick: bool = False  # LLM-recommended winner within cluster
    cluster_pick_reason: str = ""
    needs_llm: str | None = None  # which LLM call site, if any


# --- Helpers ---
#
# Testability seams: module-level callables that tests can reassign without
# subprocess/network mocking. Production behavior unchanged.
#
#   import pr_actionable as m
#   m._now_fn = lambda: datetime(2026, 3, 12, tzinfo=timezone.utc)
#   m._llm_fn = lambda prompt, purpose: fixtures[purpose]
#   m.MAINTAINERS = {"alice"}
#   m.classify(hand_built_pr, "o", "n", use_llm=True)
#   assert hand_built_pr.state == "needs-first-review"

_now_fn = lambda: datetime.now(timezone.utc)  # noqa: E731


def now() -> datetime:
    return _now_fn()


def days_since(dt: datetime | None) -> float:
    if dt is None:
        return 0.0
    return (now() - dt).total_seconds() / 86400


def parse_dt(s: str | None) -> datetime | None:
    if not s:
        return None
    return datetime.fromisoformat(s.replace("Z", "+00:00"))


def is_bot_reviewer(login: str) -> bool:
    login_lower = login.lower()
    return any(p in login_lower for p in BOT_REVIEWER_PATTERNS) or login_lower.endswith("[bot]")


def sh(cmd: list[str]) -> str:
    r = subprocess.run(cmd, capture_output=True, text=True)
    if r.returncode != 0:
        raise RuntimeError(f"{cmd[0]} exit {r.returncode}: {r.stderr.strip() or r.stdout.strip()}")
    return r.stdout


def gh_graphql(query: str, **vars) -> dict:
    args = ["gh", "api", "graphql", "-f", f"query={query}"]
    for k, v in vars.items():
        args += ["-F", f"{k}={v}"]
    # GitHub's GraphQL endpoint occasionally 502s or trips secondary rate
    # limits mid-pagination — retry with backoff before giving up.
    last_err = None
    for attempt in range(3):
        try:
            return json.loads(sh(args))["data"]
        except (RuntimeError, json.JSONDecodeError) as e:
            last_err = e
            if attempt < 2:
                time.sleep(2 ** attempt)
    raise last_err


# --- Tier A: cheap bulk fetch ---

TIER_A_QUERY = """
query($owner: String!, $name: String!, $cursor: String) {
  repository(owner: $owner, name: $name) {
    pullRequests(states: OPEN, first: 50, after: $cursor, orderBy: {field: CREATED_AT, direction: DESC}) {
      pageInfo { hasNextPage endCursor }
      nodes {
        number title isDraft createdAt baseRefName additions deletions bodyText
        author { login __typename }
        labels(first: 20) { nodes { name } }
        closingIssuesReferences(first: 10) { nodes { number repository { nameWithOwner } } }
        files(first: 30) { nodes { path } }
      }
    }
  }
}
"""


def fetch_tier_a(owner: str, name: str, maintainers: RepoMaintainers) -> list[PR]:
    prs = []
    cursor = None
    while True:
        data = gh_graphql(TIER_A_QUERY, owner=owner, name=name, cursor=cursor or "")
        conn = data["repository"]["pullRequests"]
        for n in conn["nodes"]:
            author = n["author"]["login"] if n["author"] else "ghost"
            author_type = n["author"]["__typename"] if n["author"] else "User"
            labels = [l["name"] for l in n["labels"]["nodes"]]
            files = [f["path"] for f in n["files"]["nodes"]]
            prs.append(PR(
                number=n["number"],
                title=n["title"],
                author=author,
                is_draft=n["isDraft"],
                created_at=parse_dt(n["createdAt"]),
                base_ref=n["baseRefName"],
                labels=labels,
                linked_issues=[i["number"] for i in n["closingIssuesReferences"]["nodes"]
                               if i["repository"]["nameWithOwner"] == f"{owner}/{name}"],
                files=files,
                additions=n.get("additions", 0),
                deletions=n.get("deletions", 0),
                body=n.get("bodyText", "") or "",
                is_maintainer=(author in maintainers.all),
                is_auth=_is_auth(n["title"], files, labels),
            ))
            if author_type == "Bot":
                prs[-1].author = f"app/{author}"
        if not conn["pageInfo"]["hasNextPage"]:
            break
        cursor = conn["pageInfo"]["endCursor"]
    return prs


def _is_auth(title: str, files: list[str], labels: list[str]) -> bool:
    if "auth" in labels:
        return True
    title_lower = title.lower()
    if any(kw in title_lower for kw in AUTH_TITLE_KEYWORDS):
        return True
    auth_files = [f for f in files if any(g in f for g in AUTH_PATH_GLOBS)]
    if not auth_files:
        return False
    # Wide refactors that incidentally touch auth.py among many other files aren't auth PRs.
    return len(files) <= 5 or len(auth_files) / len(files) > 0.3


# --- Tier B: reviews + mergeable + CI (batched) ---

TIER_B_QUERY = """
query($owner: String!, $name: String!, $number: Int!) {
  repository(owner: $owner, name: $name) {
    pullRequest(number: $number) {
      mergeable
      reviews(last: 30) {
        nodes { author { login } state submittedAt body }
      }
      statusCheckRollup: commits(last: 1) {
        nodes {
          commit {
            statusCheckRollup {
              state
              contexts(last: 20) {
                nodes {
                  ... on CheckRun { conclusion completedAt }
                  ... on StatusContext { state createdAt }
                }
              }
            }
          }
        }
      }
    }
  }
}
"""


def fetch_tier_b(pr: PR, owner: str, name: str) -> None:
    data = gh_graphql(TIER_B_QUERY, owner=owner, name=name, number=pr.number)
    p = data["repository"]["pullRequest"]
    pr.mergeable = p["mergeable"]

    # Reviews — filter self + bots. Track per-reviewer latest state.
    per_reviewer: dict[str, dict] = {}
    for r in p["reviews"]["nodes"]:
        if r is None or not r["author"]:
            continue
        login = r["author"]["login"]
        if login == pr.author or is_bot_reviewer(login):
            continue
        # Latest review per reviewer wins (reviews are already chronological via last:30)
        per_reviewer[login] = {
            "author": login,
            "state": r["state"],
            "submitted_at": parse_dt(r["submittedAt"]),
            "body": r.get("body", ""),
        }
    pr.reviews = list(per_reviewer.values())

    # CI status
    rollup_nodes = p["statusCheckRollup"]["nodes"]
    if not rollup_nodes or not rollup_nodes[0]["commit"]["statusCheckRollup"]:
        pr.ci_status = "NONE"
    else:
        rollup = rollup_nodes[0]["commit"]["statusCheckRollup"]
        pr.ci_status = rollup["state"]  # SUCCESS / FAILURE / PENDING / ERROR
        # Latest completion time (for ci-red-silent comment-after check)
        times = []
        for ctx in rollup["contexts"]["nodes"]:
            if ctx is None:
                continue
            t = ctx.get("completedAt") or ctx.get("createdAt")
            if t:
                times.append(parse_dt(t))
        pr.ci_completed_at = max(times) if times else None


# --- Tier C: timeline + commits + comments (only when needed) ---

TIER_C_QUERY = """
query($owner: String!, $name: String!, $number: Int!) {
  repository(owner: $owner, name: $name) {
    pullRequest(number: $number) {
      timelineItems(last: 50, itemTypes: [PULL_REQUEST_COMMIT, HEAD_REF_FORCE_PUSHED_EVENT, REOPENED_EVENT]) {
        nodes {
          __typename
          ... on PullRequestCommit { commit { committedDate author { user { login } } } }
          ... on HeadRefForcePushedEvent { createdAt }
          ... on ReopenedEvent { createdAt }
        }
      }
      comments(last: 30) {
        nodes { author { login } body createdAt }
      }
    }
  }
}
"""


def fetch_tier_c(pr: PR, owner: str, name: str) -> None:
    data = gh_graphql(TIER_C_QUERY, owner=owner, name=name, number=pr.number)
    p = data["repository"]["pullRequest"]

    pushes = []
    commit_authors = []
    for item in p["timelineItems"]["nodes"]:
        if item is None:
            continue  # deleted timeline events come back as null
        t = item["__typename"]
        if t == "PullRequestCommit":
            pushes.append(parse_dt(item["commit"]["committedDate"]))
            user = item["commit"]["author"].get("user")
            if user:
                commit_authors.append(user["login"])
        elif t in ("HeadRefForcePushedEvent", "ReopenedEvent"):
            pushes.append(parse_dt(item["createdAt"]))
    pr.pushes = sorted(p for p in pushes if p)
    pr.commit_authors = commit_authors

    pr.comments = [
        {"author": c["author"]["login"] if c["author"] else "ghost",
         "body": c["body"],
         "created_at": parse_dt(c["createdAt"])}
        for c in p["comments"]["nodes"] if c is not None
    ]


# --- LLM calls (claude -p, cost-tracked) ---

@dataclass
class LLMCall:
    purpose: str
    repo: str           # scopes cost to a single repo — otherwise multi-repo runs double-count
    cost_usd: float
    duration_ms: int
    input_tokens: int
    output_tokens: int


_llm_calls: list[LLMCall] = []
_current_repo: str = ""  # set by classify_repo() before each repo's work begins

# claude -p CLI startup (~13-30s/call — plugin marketplace scan, cache checks) dominates
# wall-clock. Each call is independent, so we run them through a thread pool. With ~9+7+6
# calls across phases, width 10 means each phase finishes in roughly one wave.
LLM_POOL_WIDTH = 10


class LLMError(RuntimeError):
    """claude -p returned is_error=true (rate limit, upstream 5xx, overload).
    Distinct from CalledProcessError so callers can catch LLM failures separately."""


def _claude_p_once(prompt: str, purpose: str, schema: dict | None = None) -> str | dict:
    cmd = ["claude", "-p", prompt, "--model", LLM_MODEL, "--output-format", "json"]
    if schema is not None:
        cmd += ["--json-schema", json.dumps(schema)]
    t0 = time.monotonic()
    out = json.loads(subprocess.run(
        cmd, capture_output=True, text=True, check=True,
    ).stdout)
    wall_ms = int((time.monotonic() - t0) * 1000)
    usage = out.get("usage", {})
    # Track cost even for errored calls — the API request was billed.
    _llm_calls.append(LLMCall(
        purpose=purpose,
        repo=_current_repo,
        cost_usd=out.get("total_cost_usd", 0.0),
        duration_ms=wall_ms,  # api duration is out["duration_ms"]; we care about wall-clock overhead too
        input_tokens=usage.get("input_tokens", 0) + usage.get("cache_creation_input_tokens", 0),
        output_tokens=usage.get("output_tokens", 0),
    ))
    # claude -p exits 0 on transient API errors with {"is_error": true, "result": "API Error: ..."}.
    if out.get("is_error"):
        raise LLMError(out.get("result", "claude -p returned is_error with no message"))
    if schema is not None:
        structured = out.get("structured_output")
        if structured is None:
            raise LLMError(f"--json-schema passed but no structured_output in envelope (result={out.get('result', '')[:200]!r})")
        return structured
    return out.get("result", "").strip()


def _claude_p_subprocess(prompt: str, purpose: str, schema: dict | None = None) -> str | dict:
    """Real LLM call via claude -p. Retries once on transient API error."""
    try:
        return _claude_p_once(prompt, purpose, schema)
    except LLMError as e:
        print(f"  [llm:{purpose}] transient error, retrying in 2s: {e}", file=sys.stderr)
        time.sleep(2)
        return _claude_p_once(prompt, purpose, schema)


# Tests reassign this: _llm_fn = lambda prompt, purpose, schema=None: canned_response
_llm_fn = _claude_p_subprocess


def claude_p(prompt: str, purpose: str, schema: dict | None = None) -> str | dict:
    return _llm_fn(prompt, purpose, schema)


def llm_cost_summary(repo: str | None = None) -> dict:
    """If repo is given, filter to calls made during that repo's classification."""
    calls = [c for c in _llm_calls if repo is None or c.repo == repo]
    by_purpose = defaultdict(lambda: {"count": 0, "cost": 0.0})
    for c in calls:
        by_purpose[c.purpose]["count"] += 1
        by_purpose[c.purpose]["cost"] += c.cost_usd
    return {
        "total_usd": round(sum(c.cost_usd for c in calls), 4),
        "total_calls": len(calls),
        "by_purpose": {k: {"count": v["count"], "cost": round(v["cost"], 4)} for k, v in by_purpose.items()},
    }


def llm_author_comment_intent(reviewer_feedback: str, author_reply: str) -> str:
    """Returns: question | disagreement | promise | noise"""
    prompt = f"""A maintainer requested changes on a PR. The author replied.
Classify the author's reply as exactly one of: question, disagreement, promise, noise.

- question: author is asking the reviewer to clarify (ball moves to maintainer)
- disagreement: author disputes the feedback, wants reconsideration (ball moves to maintainer)
- promise: author agrees, will fix (ball stays with author, clock resets)
- noise: nothing substantive (clock keeps running)

Reviewer feedback:
{reviewer_feedback[:500]}

Author reply:
{author_reply[:500]}

Output ONLY the single word: question, disagreement, promise, or noise."""
    out = claude_p(prompt, purpose="author-intent").lower().strip()
    return out if out in {"question", "disagreement", "promise", "noise"} else "noise"


def llm_hold_direction(hold_text: str) -> str:
    """Returns: on-hold | awaiting-author"""
    prompt = f"""A maintainer left this on a PR:

{hold_text[:500]}

Is the PR waiting on something the author can do (awaiting-author), or on something external
the author cannot unblock — a separate team decision, spec resolution, or upstream release (on-hold)?

Output ONLY: on-hold OR awaiting-author"""
    out = claude_p(prompt, purpose="hold-direction").lower().strip()
    return "on-hold" if "on-hold" in out or "hold" in out else "awaiting-author"


def llm_unknown_fallback(pr: PR, repo: str) -> tuple[str, str]:
    """Last resort: give claude the full context + state catalog."""
    ctx = {
        "number": pr.number, "title": pr.title, "author": pr.author,
        "is_draft": pr.is_draft, "age_days": round(days_since(pr.created_at), 1),
        "labels": pr.labels, "mergeable": pr.mergeable, "ci": pr.ci_status,
        "reviews": pr.reviews,
        "last_comments": [
            {"author": c["author"], "body": c["body"][:200]}
            for c in (pr.comments or [])[-5:]
        ],
    }
    states = sorted(ACTIONABLE_STATES | NOT_ACTIONABLE_STATES | EXCLUDED_STATES)
    prompt = f"""Classify this PR into one state. Context:

{json.dumps(ctx, indent=2, default=str)}

Valid states: {", ".join(states)}"""
    schema = {
        "type": "object",
        "properties": {
            "state": {"type": "string", "enum": states},
            "reason": {"type": "string"},
        },
        "required": ["state", "reason"],
    }
    try:
        out = claude_p(prompt, purpose="unknown-fallback", schema=schema)
    except (subprocess.CalledProcessError, LLMError):
        return "unknown", "[llm] call failed"
    return out["state"], f"[llm] {out['reason']}"


# --- Cluster detection (two-phase) ---

def detect_clusters(prs: list[PR]) -> None:
    """Phase 1: group by shared closingIssuesReferences. Deterministic, no LLM.
    Runs on the full set (including maintainer PRs).

    Partitioned by base_ref so a main-branch fix and its v1.x backport don't
    cluster as duplicates — they're complementary, not competing."""
    issue_to_prs: dict[tuple[int, str], list[PR]] = defaultdict(list)
    for pr in prs:
        for issue in pr.linked_issues:
            issue_to_prs[(issue, pr.base_ref)].append(pr)

    for (issue, _base_ref), members in issue_to_prs.items():
        # A PR linking ≥2 clustering issues would get overwritten on the second pass,
        # orphaning the first cluster. First-wins keeps everyone reachable.
        members = [m for m in members if m.cluster_id is None]
        if len(members) < 2:
            continue
        members.sort(key=lambda p: p.number)
        for m in members:
            m.cluster_id = issue
        # Primary is lowest number for now; LLM may override in phase 2 pick.
        members[0].state = "duplicate-cluster"
        members[0].reason = f"cluster primary for issue #{issue} ({len(members)} PRs)"
        for m in members[1:]:
            m.state = "duplicate-cluster-member"
            m.reason = f"shares linked issue #{issue} with #{members[0].number}"

    # Same issue across different base_refs → backport pair, not a duplicate.
    # Covers the cross-author case that detect_backport_pairs (same-author-only) misses.
    issue_mains: dict[int, PR] = {}
    for pr in prs:
        if pr.base_ref == "main":
            for issue in pr.linked_issues:
                issue_mains.setdefault(issue, pr)
    for pr in prs:
        if pr.base_ref == "main" or pr.cluster_id is not None:
            continue
        for issue in pr.linked_issues:
            if issue in issue_mains:
                pr.state = "backport-follows-primary"
                pr.reason = f"follows #{issue_mains[issue].number} (same issue #{issue}, different branch)"
                break


def _file_jaccard(a: list[str], b: list[str]) -> float:
    sa, sb = set(a), set(b)
    if not sa or not sb:
        return 0.0
    return len(sa & sb) / len(sa | sb)


def detect_semantic_clusters(prs: list[PR], owner: str, name: str, use_llm: bool) -> None:
    """Phase 2: find clusters among PRs that DIDN'T share a linked issue.

    Pre-filter by file-set overlap (cheap), then LLM batch-judge each candidate group.
    The LLM also picks the winner within each cluster (sees diffs, can judge quality).
    """
    # Only unclustered, non-draft, non-excluded candidates
    candidates = [
        p for p in prs
        if p.cluster_id is None
        and not p.is_draft
        and p.state not in EXCLUDED_STATES
        and p.state != "backport-follows-primary"
        and p.files
    ]
    if len(candidates) < 2:
        return

    # Build candidate groups via connected-components over file overlap graph
    edges: dict[int, set[int]] = defaultdict(set)
    for i, a in enumerate(candidates):
        for b in candidates[i+1:]:
            j = _file_jaccard(a.files, b.files)
            # High overlap, OR both touch exactly one file and it's the same file
            if j > 0.5 or (len(set(a.files)) == 1 and set(a.files) == set(b.files)):
                edges[a.number].add(b.number)
                edges[b.number].add(a.number)

    # Connected components
    pr_by_num = {p.number: p for p in candidates}
    visited: set[int] = set()
    groups: list[list[PR]] = []
    for start in edges:
        if start in visited:
            continue
        component = []
        stack = [start]
        while stack:
            n = stack.pop()
            if n in visited:
                continue
            visited.add(n)
            component.append(pr_by_num[n])
            stack.extend(edges[n] - visited)
        if len(component) >= 2:
            groups.append(component)

    if not groups:
        return

    print(f"  [semantic-cluster] {len(groups)} candidate groups from file overlap", file=sys.stderr)

    if not use_llm:
        # Without LLM: flag candidates as unknown so they surface in the digest
        for group in groups:
            nums = sorted(p.number for p in group)
            for p in group:
                if p.state == "unknown" or p.state.startswith("needs-"):
                    p.reason += f" [files overlap with {','.join(f'#{n}' for n in nums if n != p.number)} — may be duplicate]"
        return

    # LLM batch-judge each group. Diff-hunk fetch + LLM call bundled into one worker so the
    # whole pipeline per group fans out — no serial barrier between fetch and judge.
    SEMANTIC_SCHEMA = {
        "type": "object",
        "properties": {
            "clusters": {
                "type": "array",
                "items": {
                    "type": "object",
                    "properties": {
                        "prs": {"type": "array", "items": {"type": "integer"}},
                        "problem": {"type": "string"},
                        "pick": {"type": "integer"},
                        "pick_reason": {"type": "string"},
                    },
                    "required": ["prs", "problem", "pick", "pick_reason"],
                },
            },
        },
        "required": ["clusters"],
    }

    def _judge_group(gi: int, group: list[PR]) -> dict | None:
        group.sort(key=lambda p: p.number)
        diff_snippets = []
        for p in group:
            try:
                diff = sh(["gh", "pr", "diff", str(p.number), "-R", f"{owner}/{name}"])
                hunks = [h for h in diff.split("\n@@") if h.strip()][:3]
                snippet = "\n@@".join(hunks)[:1500]
            except subprocess.CalledProcessError:
                snippet = "(diff unavailable)"
            diff_snippets.append(f"--- PR #{p.number}: {p.title} ---\n{snippet}")
        prompt = f"""These {len(group)} PRs touch overlapping files. Determine which (if any) are fixing the SAME underlying problem and should be consolidated.

{chr(10).join(diff_snippets)}

If none are truly duplicates, use an empty clusters array.
A PR can appear in at most one cluster. \"pick\" is the one to keep; others get closed.

Your pick reason is shown on a public dashboard where the authors can see it. Cite concrete
technical differences (CI status, test coverage, diff size, merge conflicts, which approach
handles which cases) — not value judgments about quality."""
        try:
            return claude_p(prompt, purpose="semantic-cluster", schema=SEMANTIC_SCHEMA)
        except (subprocess.CalledProcessError, LLMError) as e:
            print(f"  [semantic-cluster] group {gi+1}: LLM call failed ({e}), skipping", file=sys.stderr, flush=True)
            return None

    # Fan out. Results applied on main thread — groups are disjoint so order doesn't matter,
    # but keeping mutation single-threaded makes the data flow obvious.
    done = 0
    with ThreadPoolExecutor(max_workers=LLM_POOL_WIDTH) as pool:
        futs = {pool.submit(_judge_group, gi, g): (gi, g) for gi, g in enumerate(groups)}
        for fut in as_completed(futs):
            gi, group = futs[fut]
            done += 1
            nums = ",".join(f"#{p.number}" for p in sorted(group, key=lambda p: p.number))
            result = fut.result()
            verdict = f"{len(result.get('clusters', []))} cluster(s)" if result else "skipped"
            print(f"  [semantic-cluster] {done}/{len(groups)} done: {nums} → {verdict}", file=sys.stderr, flush=True)
            if result is None:
                continue
            for cluster in result.get("clusters", []):
                member_nums = set(cluster.get("prs", []))
                if len(member_nums) < 2:
                    continue
                members = [p for p in group if p.number in member_nums]
                if len(members) < 2:
                    continue
                pick_num = cluster.get("pick")
                cid = f"sem:{min(member_nums)}"
                problem = cluster.get("problem", "semantic duplicate")
                pick_reason = cluster.get("pick_reason", "")
                for m in members:
                    m.cluster_id = cid
                    if m.number == pick_num:
                        m.state = "duplicate-cluster"
                        m.reason = f"[llm] cluster primary: {problem} ({len(members)} PRs)"
                        m.cluster_pick = True
                        m.cluster_pick_reason = pick_reason
                    else:
                        m.state = "duplicate-cluster-member"
                        m.reason = f"[llm] duplicates #{pick_num}: {problem}"


def pick_cluster_winners(prs: list[PR], owner: str, name: str, use_llm: bool) -> None:
    """For phase-1 (issue-linked) clusters, ask LLM to pick the best one.
    Phase-2 clusters already have a pick from detect_semantic_clusters."""
    if not use_llm:
        return

    clusters: dict[tuple[int, str], list[PR]] = defaultdict(list)
    for p in prs:
        if isinstance(p.cluster_id, int):  # phase-1 only (int issue number)
            clusters[(p.cluster_id, p.base_ref)].append(p)

    work = [(issue, sorted(members, key=lambda p: p.number))
            for (issue, _), members in clusters.items() if len(members) >= 2]
    if not work:
        return

    PICK_SCHEMA = {
        "type": "object",
        "properties": {"pick": {"type": "integer"}, "reason": {"type": "string"}},
        "required": ["pick", "reason"],
    }

    def _pick(issue: int, members: list[PR]) -> tuple[int, str] | None:
        summaries = [
            f'#{p.number} "{p.title}" (CI: {p.ci_status or "unknown"}, files: {len(p.files)})'
            for p in members
        ]
        prompt = f"""These {len(members)} PRs all fix the same issue (#{issue}). Pick the ONE to keep.

{chr(10).join(summaries)}

Prefer: passing CI > has tests > smallest focused diff > oldest.

Your reason is shown on a public dashboard where both authors can see it. Cite concrete
technical differences (CI status, test coverage, diff size, merge conflicts, approach
specifics) — not value judgments about which author did better work."""
        try:
            r = claude_p(prompt, purpose="cluster-pick", schema=PICK_SCHEMA)
            return r["pick"], r["reason"]
        except (subprocess.CalledProcessError, LLMError, KeyError):
            return None

    done = 0
    with ThreadPoolExecutor(max_workers=LLM_POOL_WIDTH) as pool:
        futs = {pool.submit(_pick, issue, members): (issue, members) for issue, members in work}
        for fut in as_completed(futs):
            issue, members = futs[fut]
            done += 1
            nums = ",".join(f"#{p.number}" for p in members)
            pick = fut.result()
            verdict = f"→ #{pick[0]}" if pick else "skipped"
            print(f"  [cluster-pick] {done}/{len(work)} done: issue #{issue} ({nums}) {verdict}", file=sys.stderr, flush=True)
            if pick is None:
                continue
            pick_num, reason = pick
            for m in members:
                if m.number == pick_num:
                    m.cluster_pick = True
                    m.cluster_pick_reason = reason
                    if m.state == "duplicate-cluster-member":
                        # Swap primary — the old primary becomes a member
                        old_primary = next((p for p in members if p.state == "duplicate-cluster"), None)
                        if old_primary:
                            old_primary.state = "duplicate-cluster-member"
                            old_primary.reason = f"shares issue #{issue}; #{pick_num} is the pick"
                        m.state = "duplicate-cluster"
                        m.reason = f"[llm pick] cluster primary for issue #{issue}: {reason}"


def detect_backport_pairs(prs: list[PR]) -> None:
    """v1.x PRs that have a main-branch sibling inherit state from the sibling."""
    by_author: dict[str, list[PR]] = defaultdict(list)
    for pr in prs:
        by_author[pr.author].append(pr)

    for author, author_prs in by_author.items():
        mains = [p for p in author_prs if p.base_ref == "main"]
        backports = [p for p in author_prs if p.base_ref != "main"]
        for bp in backports:
            for mp in mains:
                if abs((bp.created_at - mp.created_at).days) <= 7 and _title_similar(bp.title, mp.title):
                    bp.state = "backport-follows-primary"
                    bp.reason = f"follows #{mp.number}"
                    break


def _title_similar(a: str, b: str) -> bool:
    # Strip common version prefixes before comparing
    for prefix in ("[v1.x]", "[v2]", "[1.x]", "(v1.x)", "(v2)"):
        a = a.replace(prefix, "").strip()
        b = b.replace(prefix, "").strip()
    at, bt = set(a.lower().split()), set(b.lower().split())
    if not at or not bt:
        return False
    return len(at & bt) / len(at | bt) > 0.7


# --- The classifier ---

def classify(pr: PR, owner: str, name: str, use_llm: bool) -> None:
    # Already set by cluster/backport detection? Done.
    if pr.state in ("duplicate-cluster", "duplicate-cluster-member", "backport-follows-primary"):
        return

    # --- Tier A classifications (no extra fetch needed) ---
    age = days_since(pr.created_at)

    if pr.author.startswith("app/") and pr.title.startswith(BOT_RELEASE_TITLE_PREFIXES):
        pr.state, pr.reason = "bot-release", "release bot PR"
        return


    # --- Need Tier B data for everything below ---
    fetch_tier_b(pr, owner, name)

    if pr.ci_status == "NONE" and age > DAYS_PRE_CI_GHOST:
        pr.state, pr.reason = "pre-ci-ghost", f"CI never ran, {age:.0f}d old"
        return

    # Drafts
    if pr.is_draft:
        if pr.is_maintainer and age > DAYS_PARKED_THRESHOLD:
            pr.state, pr.reason = "parked", f"maintainer draft, {age:.0f}d"
            return
        if age <= DAYS_COMMUNITY_SLA:
            pr.state, pr.reason = "draft-active", f"{age:.0f}d old"
            return
        # >7d: check if maintainer pinged AND author didn't reply after. A reply followed
        # by silence is stale-draft (re-ping), not stale-draft-pinged (close).
        fetch_tier_c(pr, owner, name)
        maint_comments = [
            c for c in (pr.comments or [])
            if c["author"] in MAINTAINERS and c["author"] != pr.author
        ]
        latest_ping = max((c["created_at"] for c in maint_comments), default=None)
        author_replied_after = latest_ping and any(
            c["author"] == pr.author and c["created_at"] > latest_ping
            for c in (pr.comments or [])
        )
        if latest_ping and days_since(latest_ping) > DAYS_COMMUNITY_SLA and not author_replied_after:
            pr.state, pr.reason = "stale-draft-pinged", f"pinged {days_since(latest_ping):.0f}d ago, no response"
        else:
            pr.state, pr.reason = "stale-draft", f"draft idle {age:.0f}d"
        return

    # --- Review-based classification ---
    reviews = pr.reviews or []

    # Per-reviewer current state (the P0-3 fix)
    approved_by = {
        r["author"] for r in reviews
        if r["state"] == "APPROVED" and r["author"] in MAINTAINERS
    }
    changes_requested_by = {
        r["author"] for r in reviews
        if r["state"] == "CHANGES_REQUESTED" and r["author"] in MAINTAINERS
    }
    latest_review_at = max((r["submitted_at"] for r in reviews), default=None)

    # Never reviewed
    if not reviews:
        # Bot with red CI?
        if pr.author.startswith("app/") and pr.ci_status in ("FAILURE", "ERROR"):
            pr.state, pr.reason = "bot-broke-ci", "bot PR, red CI"
            return

        # Red CI, no comments after?
        if pr.ci_status in ("FAILURE", "ERROR"):
            fetch_tier_c(pr, owner, name)
            comments_after_ci = [
                c for c in (pr.comments or [])
                if pr.ci_completed_at and c["created_at"] > pr.ci_completed_at
            ]
            if not comments_after_ci:
                if age > DAYS_CI_RED_ABANDONED:
                    pr.state, pr.reason = "stale-ci-abandoned", f"red CI {age:.0f}d, no activity"
                else:
                    pr.state, pr.reason = "ci-red-silent", f"red CI {age:.0f}d, nobody's mentioned it"
                return
            # Author self-diagnosed? → stale check
            author_diagnosed = [c for c in comments_after_ci if c["author"] == pr.author]
            if author_diagnosed and days_since(author_diagnosed[-1]["created_at"]) > DAYS_STALE_AUTHOR:
                pr.state = "stale-awaiting-author"
                pr.reason = f"author self-diagnosed CI {days_since(author_diagnosed[-1]['created_at']):.0f}d ago, never fixed"
                return
            # Maintainer pinged about CI → awaiting-author-ci
            maintainer_ci_ping = [
                c for c in comments_after_ci
                if c["author"] in MAINTAINERS and c["author"] != pr.author
            ]
            if maintainer_ci_ping:
                since_ping = days_since(maintainer_ci_ping[-1]["created_at"])
                if since_ping > DAYS_STALE_AUTHOR:
                    pr.state, pr.reason = "stale-awaiting-author", f"CI ping {since_ping:.0f}d ago, no fix"
                else:
                    pr.state, pr.reason = "awaiting-author-ci", f"CI pinged {since_ping:.0f}d ago"
                return

        # Check for substantive maintainer comments (needs-decision case, #819)
        fetch_tier_c(pr, owner, name)
        maintainer_comments = [
            c for c in (pr.comments or [])
            if c["author"] in MAINTAINERS and c["author"] != pr.author
        ]
        author_pings = []
        for c in (pr.comments or []):
            if c["author"] != pr.author or "@" not in c["body"]:
                continue
            mentioned = [m for m in MAINTAINERS if f"@{m}" in c["body"]]
            if mentioned:
                author_pings.append((c, mentioned))

        if len(maintainer_comments) >= 2:
            pr.wait_since = max(c["created_at"] for c in maintainer_comments)
            pr.state = "needs-decision"
            pr.reason = f"{len(maintainer_comments)} maintainer comments, zero reviews"
            return

        if author_pings and days_since(author_pings[-1][0]["created_at"]) > DAYS_AUTHOR_PING_WAIT:
            _, mentioned = author_pings[-1]
            who = ", ".join(f"@{m}" for m in mentioned)
            pr.wait_since = author_pings[-1][0]['created_at']
            pr.state = "author-pinged-after-procedural"
            pr.reason = f"author pinged {who} {days_since(pr.wait_since):.0f}d ago, no response"
            return

        if pr.is_maintainer:
            pr.state, pr.reason = "maintainer-intake", f"{age:.0f}d, no maintainer has looked yet"
            return
        sla = " (SLA breach)" if age > DAYS_COMMUNITY_SLA else ""
        pr.state, pr.reason = "needs-first-review", f"{age:.0f}d since open{sla}"
        return

    # Has reviews. Contested?
    if approved_by and changes_requested_by:
        pr.wait_since = latest_review_at
        pr.state = "approval-contested"
        pr.reason = f"approved by {','.join(approved_by)}; changes req by {','.join(changes_requested_by)}"
        return

    # All approved
    if approved_by and not changes_requested_by:
        # P0-1: check push-after-approval (needs Tier C)
        fetch_tier_c(pr, owner, name)
        pushes_after = [p for p in (pr.pushes or []) if latest_review_at and p > latest_review_at]
        if pushes_after:
            pr.wait_since = max(pushes_after)
            pr.state, pr.reason = "needs-re-review", f"push after approval ({len(pushes_after)} commits, {days_since(pr.wait_since):.0f}d waiting)"
            return

        if pr.mergeable == "CONFLICTING":
            pr.state, pr.reason = "approved-rotted", f"approved {days_since(latest_review_at):.0f}d ago, now conflicting"
            return

        if pr.ci_status == "SUCCESS" and pr.mergeable == "MERGEABLE":
            pr.state, pr.reason = "needs-merge", f"approved {days_since(latest_review_at):.0f}d ago, green"
            return

        if pr.ci_status == "PENDING":
            pr.state, pr.reason = "awaiting-ci", f"approved, CI running"
            return

        if pr.ci_status in ("FAILURE", "ERROR"):
            pr.state, pr.reason = "approved-ci-red", f"approved {days_since(latest_review_at):.0f}d ago, CI red"
            return

        pr.state = "unknown"
        pr.reason = f"approved but ci={pr.ci_status} mergeable={pr.mergeable}"
        pr.needs_llm = "unknown"
        return

    # Changes requested
    if changes_requested_by:
        fetch_tier_c(pr, owner, name)
        pushes_after = [p for p in (pr.pushes or []) if latest_review_at and p > latest_review_at]

        if pushes_after:
            pr.wait_since = max(pushes_after)
            pr.state, pr.reason = "needs-re-review", f"author pushed after CHANGES_REQUESTED ({days_since(pr.wait_since):.0f}d waiting)"
            return

        # Author commented after review? P0-2: needs LLM intent classification.
        author_comments_after = [
            c for c in (pr.comments or [])
            if c["author"] == pr.author and latest_review_at and c["created_at"] > latest_review_at
        ]
        if author_comments_after:
            if not use_llm:
                pr.state = "unknown"
                pr.reason = "author commented after CHANGES_REQUESTED — needs intent classification (--no-llm)"
                pr.needs_llm = "author-intent"
                return
            review_body = next(
                (r.get("body", "") for r in reviews if r["state"] == "CHANGES_REQUESTED"), ""
            )
            intent = llm_author_comment_intent(review_body, author_comments_after[-1]["body"])
            if intent in ("question", "disagreement"):
                pr.wait_since = author_comments_after[-1]["created_at"]
                pr.state, pr.reason = "needs-re-review", f"author {intent} after CHANGES_REQUESTED ({days_since(pr.wait_since):.0f}d waiting) [llm]"
                return
            clock_start = author_comments_after[-1]["created_at"]
        else:
            clock_start = latest_review_at

        since = days_since(clock_start)
        if since > DAYS_STALE_AUTHOR:
            pr.state, pr.reason = "stale-awaiting-author", f"{since:.0f}d since feedback, no response"
        else:
            pr.state, pr.reason = "awaiting-author", f"{since:.0f}d since feedback"
        return

    # COMMENTED only (no approve, no request-changes)
    maintainer_reviewers = [r["author"] for r in reviews if r["author"] in MAINTAINERS]
    if maintainer_reviewers:
        pr.wait_since = latest_review_at
        pr.state, pr.reason = "needs-decision", "maintainer COMMENTED but took no stance"
        return

    # Only non-maintainer reviews — community engaged but no maintainer yet
    reviewers = ", ".join(sorted({r["author"] for r in reviews}))
    if pr.is_maintainer:
        pr.state, pr.reason = "maintainer-intake", f"{age:.0f}d, community reviewed ({reviewers})"
        return
    sla = " (SLA breach)" if age > DAYS_COMMUNITY_SLA else ""
    pr.state, pr.reason = "community-reviewed", f"{age:.0f}d, reviewed by {reviewers}{sla}"


def _label_hint(pr: PR) -> str | None:
    for label_prefix, hint_state in LABEL_OVERRIDES.items():
        if any(l.startswith(label_prefix) for l in pr.labels):
            return hint_state
    return None


def reconcile_label_hint(pr: PR) -> None:
    """Cross-check human-applied blocked:* labels against the computed state.

    Labels are hints that can go stale — a blocked:sep from 6mo ago shouldn't freeze
    a PR whose SEP has since merged. Run the full classifier first, then compare.

    Label says blocked, classifier agrees (not-actionable) → trust label.
    Label says blocked, classifier found actionable signals → stale label, surface as unknown.
    """
    label_hint = _label_hint(pr)
    if label_hint is None:
        return
    if pr.state in ("duplicate-cluster", "duplicate-cluster-member", "backport-follows-primary"):
        return  # cluster state is authoritative

    computed = pr.state

    # Classifier independently landed on a not-actionable state — label is consistent, trust it.
    # (e.g. label says blocked:on-hold, classifier found awaiting-author — both agree ball isn't ours)
    if computed in NOT_ACTIONABLE_STATES or computed in EXCLUDED_STATES:
        pr.state = label_hint
        pr.reason = f"label: {label_hint} (classifier concurs: {computed})"
        return

    # Classifier found actionable signals but label says blocked — the label is probably stale.
    # Don't blindly trust either. Surface for human re-verification.
    pr.state = "unknown"
    pr.reason = (
        f"label says {label_hint} but classifier sees {computed} — "
        f"label may be stale, verify the block still holds"
    )


def route(pr: PR) -> str:
    """Final routing: actionable / not-actionable / excluded. Maintainer PRs are interleaved.

    Raises on unrecognized state — every other gap in the catalog funnels through here,
    so silently returning "excluded" would hide PRs with no warning.
    """
    if pr.state in ACTIONABLE_STATES:
        return "actionable"
    if pr.state in NOT_ACTIONABLE_STATES:
        return "not-actionable"
    if pr.state in EXCLUDED_STATES:
        return "excluded"
    raise ValueError(
        f"PR #{pr.number}: state {pr.state!r} not in ACTIONABLE_STATES / "
        f"NOT_ACTIONABLE_STATES / EXCLUDED_STATES — add it to the catalog"
    )


# --- Output ---

@dataclass
class RepoResult:
    repo: str
    prs: list[PR]

    @property
    def actionable(self) -> list[PR]:
        return [p for p in self.prs if route(p) == "actionable"]

    @property
    def maintainer_count(self) -> int:
        return sum(1 for p in self.actionable if p.is_maintainer)

    @property
    def cluster_count(self) -> int:
        return sum(1 for p in self.actionable if p.state == "duplicate-cluster")

    @property
    def cluster_member_count(self) -> int:
        return sum(1 for p in self.prs if p.state == "duplicate-cluster-member")

    @property
    def auth(self) -> list[PR]:
        return [p for p in self.actionable if p.is_auth]

    @property
    def effort_hours(self) -> float:
        mins = sum(EFFORT_MINUTES.get(p.state, 15) for p in self.actionable)
        return mins / 60


def _pr_url(repo: str, n: int) -> str:
    return f"https://github.com/modelcontextprotocol/{repo}/pull/{n}"


def _issue_url(repo: str, n: int) -> str:
    return f"https://github.com/modelcontextprotocol/{repo}/issues/{n}"


def _trunc(s: str, n: int) -> str:
    return s if len(s) <= n else s[: n - 1] + "\u2026"


def _pr_line_d(p: dict, suffix: str = "") -> str:
    """Compact one-liner: [#NNNN](url) (+A/-D) [@author] [auth] title… — suffix

    Escaped brackets so markdown doesn't treat [@x] [auth] as [text][ref] link syntax.
    GitHub still renders \\[@x\\] as [@x]; Ctrl-F for the literal works in both raw and rendered.
    """
    num = f"[#{p['number']}]({p['url']})"
    diff = f"(+{p['additions']}/-{p['deletions']})"
    author_raw = _trunc(p['author'].removeprefix("app/"), 16)
    author = f"\\[**@{author_raw}**\\]" if p['is_maintainer'] else f"\\[@{author_raw}\\]"
    auth = " \\[auth\\]" if p['is_auth'] else ""
    fixed = 5 + 1 + len(diff) + 1 + len(author_raw) + 4 + (7 if auth else 0) + 1 + (len(suffix) + 3 if suffix else 0)
    title_room = max(20, 80 - fixed)
    title = _trunc(p['title'], title_room)
    tail = f" — {suffix}" if suffix else ""
    return f"- {num} {diff} {author}{auth} {title}{tail}"


def _sort_maint_first(prs: list[PR], secondary=lambda p: p.number) -> list[PR]:
    return sorted(prs, key=lambda p: (not p.is_maintainer, secondary(p)))


def _short_reason(p: PR) -> str:
    """Tighter per-line reason now that section headers explain the state."""
    r = p.reason
    # reason already has the age as "37d since open..." — strip the tail, keep the number+d
    r = r.replace(" since open (SLA breach)", "").replace(" since open", "")
    r = r.replace(", no response", "").replace("nobody's mentioned it", "")
    return r.strip().rstrip(",")


def _fmt_mins(m: float) -> str:
    return f"{m/60:.1f}h" if m >= 60 else f"{m:.0f}m"


@dataclass
class TierRow:
    idx: int           # -1 for untriaged, 99 for bot corner
    name: str
    count: int
    auth: int
    maint: int
    mins: float


def _tier_summary(r: RepoResult) -> list[TierRow]:
    """Per-tier breakdown of actionable PRs, empty tiers omitted.

    Shared by the gist markdown table and the Slack code block so the
    numbers can't drift between the two.
    """
    by_state: dict[str, list[PR]] = defaultdict(list)
    for p in r.actionable:
        by_state[p.state].append(p)

    rows: list[TierRow] = []
    for tier_idx, (tier_name, _, tier_states) in enumerate(TIERS):
        tier_prs = [p for s in tier_states for p in by_state.get(s, [])]
        if not tier_prs:
            continue
        rows.append(TierRow(
            idx=tier_idx, name=tier_name, count=len(tier_prs),
            auth=sum(1 for p in tier_prs if p.is_auth),
            maint=sum(1 for p in tier_prs if p.is_maintainer),
            mins=sum(len(by_state.get(s, [])) * EFFORT_MINUTES.get(s, 15) for s in tier_states),
        ))

    tiered = {s for _, _, ts in TIERS for s in ts} | set(BOT_STATES)
    left = [p for s in by_state if s not in tiered for p in by_state[s]]
    if left:
        rows.append(TierRow(
            idx=-1, name="untriaged", count=len(left),
            auth=sum(1 for p in left if p.is_auth),
            maint=sum(1 for p in left if p.is_maintainer),
            mins=sum(EFFORT_MINUTES.get(p.state, 15) for p in left),
        ))

    bot = [p for s in BOT_STATES for p in by_state.get(s, [])]
    if bot:
        rows.append(TierRow(idx=99, name="bot corner", count=len(bot), auth=0, maint=0, mins=0))

    return rows


# SLA kind is PR-property-based, not state-based: a maintainer PR in a duplicate cluster
# still gets the 24h team SLA. Tier only gates whether SLA applies at all — Hygiene/Close/
# Bot/Not-our-move aren't pressing work.
_SLA_TIERS = {"Press a button", "We're blocking someone", "High leverage", "Intake"}
_SLA_EXEMPT = {"awaiting-ci"}  # CI is running, not our move


def _sla_kind(p: PR) -> tuple[str, bool] | None:
    """(kind, is_breaching) when eligible, None when SLA doesn't apply.

    kind is 'maintainer' / 're-review' / '1st-review' — PR-property-based.
    is_breaching is whether the PR is past its threshold.
    """
    spec = _by_name.get(p.state)
    if spec is None or spec.tier not in _SLA_TIERS or p.state in _SLA_EXEMPT:
        return None
    if p.is_maintainer:
        kind, days = "maintainer", 1
    elif p.wait_since is not None:
        kind, days = "re-review", 1
    else:
        kind, days = "1st-review", 7
    return kind, days_since(p.wait_since or p.created_at) > days


def _pr_dict(p: PR, repo: str) -> dict:
    """JSON-safe PR representation. Consumed by VISR JSON, --json, and format_gist_markdown."""
    sla = _sla_kind(p)
    breach = sla is not None and sla[1]
    return {
        "number": p.number, "title": p.title, "author": p.author,
        "url": _pr_url(repo, p.number),
        "state": p.state, "reason": p.reason, "short_reason": _short_reason(p),
        "age_days": round(days_since(p.created_at), 1),
        "additions": p.additions, "deletions": p.deletions,
        "is_auth": p.is_auth, "is_maintainer": p.is_maintainer,
        "sla_breach": breach, "sla_kind": sla[0] if breach else None,
        "cluster_id": str(p.cluster_id) if p.cluster_id is not None else None,
        "cluster_pick": p.cluster_pick, "cluster_pick_reason": p.cluster_pick_reason,
    }


def _structured(r: RepoResult) -> dict:
    """The canonical tier→state→PR tree. Single source of truth for all renderers:
    gist markdown, Slack headline, --json, --visr-json, and the eventual VISR page.

    PRs are pre-grouped, pre-sorted (per StateSpec.sort_by), clusters pre-expanded.
    Consumers just walk the tree — no re-derivation of tier membership.
    """
    by_state: dict[str, list[PR]] = defaultdict(list)
    for p in r.actionable:
        by_state[p.state].append(p)

    # Clusters index the FULL pr list (members are duplicate-cluster-member, not actionable)
    clusters_by_id: dict[int | str, list[PR]] = defaultdict(list)
    for p in r.prs:
        if p.cluster_id is not None:
            clusters_by_id[p.cluster_id].append(p)

    not_counted = [
        p for p in r.prs
        if route(p) in ("not-actionable", "excluded") and p.state != "duplicate-cluster-member"
    ]

    tiered_states = {s for _, _, ts in TIERS for s in ts} | set(BOT_STATES)
    leftover_states = [s for s in by_state if s not in tiered_states]

    def _sorted_prs(state: str, members: list[PR]) -> list[dict]:
        if state in SORT_BY_AGE:
            sm = _sort_maint_first(members, secondary=lambda p: -days_since(p.created_at))
        else:
            sm = _sort_maint_first(members)
        return [_pr_dict(p, r.repo) for p in sm]

    def _state_block(state: str, members: list[PR]) -> dict:
        return {
            "name": state,
            "desc": STATE_DEFS.get(state, ""),
            "effort_min": EFFORT_MINUTES.get(state, 15),
            "prs": _sorted_prs(state, members),
        }

    # Clusters (expanded, with pick marked)
    cluster_blocks = []
    for cid, members in sorted(clusters_by_id.items(), key=lambda kv: str(kv[0])):
        members_sorted = sorted(members, key=lambda p: p.number)
        pick = next((m for m in members_sorted if m.cluster_pick), None)
        primary = next((m for m in members_sorted if m.state == "duplicate-cluster"), members_sorted[0])
        block = {
            "id": str(cid),
            "kind": "issue" if isinstance(cid, int) else "semantic",
            "pick": pick.number if pick else None,
            "pick_reason": pick.cluster_pick_reason if pick else None,
            "members": [_pr_dict(m, r.repo) for m in members_sorted],
        }
        if isinstance(cid, int):
            block["issue_url"] = _issue_url(r.repo, cid)
        else:
            # semantic reason format: "semantic: <desc>"
            block["desc"] = primary.reason.split(":", 1)[-1].strip()
        cluster_blocks.append(block)

    # Tier sections
    tier_blocks = []
    for tier_idx, (tier_name, tier_blurb, tier_states) in enumerate(TIERS):
        state_blocks = []
        for state in tier_states:
            members = by_state.get(state, [])
            if not members:
                continue
            sb = _state_block(state, members)
            if state == "duplicate-cluster":
                # clusters are rendered differently — attach the expanded cluster blocks
                # for the primaries that appear in this state
                primary_ids = {str(p.cluster_id) for p in members}
                sb["clusters"] = [c for c in cluster_blocks if c["id"] in primary_ids]
            state_blocks.append(sb)
        if not state_blocks:
            continue
        tier_prs_count = sum(len(sb["prs"]) for sb in state_blocks)
        tier_mins = sum(len(sb["prs"]) * sb["effort_min"] for sb in state_blocks)
        tier_blocks.append({
            "idx": tier_idx, "name": tier_name, "blurb": tier_blurb,
            "count": tier_prs_count, "mins": tier_mins,
            "states": state_blocks,
        })

    # Bot / untriaged / not-our-move: plain number sort (no maint-first — these aren't prioritized)
    def _flat_block(state: str, members: list[PR]) -> dict:
        return {
            "name": state,
            "desc": STATE_DEFS.get(state, ""),
            "prs": [_pr_dict(p, r.repo) for p in sorted(members, key=lambda p: p.number)],
        }

    bot_blocks = [_flat_block(s, by_state[s]) for s in BOT_STATES if by_state.get(s)]
    untriaged_blocks = [_flat_block(s, by_state[s]) for s in leftover_states]

    nc_by_state: dict[str, list[PR]] = defaultdict(list)
    for p in not_counted:
        nc_by_state[p.state].append(p)
    not_our_move_blocks = [
        _flat_block(s, ps) for s, ps in sorted(nc_by_state.items(), key=lambda x: -len(x[1]))
    ]

    # by_state count map (for snapshots — includes ALL prs, not just actionable)
    all_by_state: dict[str, int] = defaultdict(int)
    for p in r.prs:
        all_by_state[p.state] += 1

    return {
        "repo": r.repo,
        "lastUpdated": now().isoformat(),
        "summary": {
            "actionable_count": len(r.actionable),
            "auth_count": len(r.auth),
            "maint_count": r.maintainer_count,
            "effort_hours": round(r.effort_hours, 1),
            "cluster_count": r.cluster_count,
            "cluster_member_count": r.cluster_member_count,
            "not_our_move_count": len(not_counted),
            "by_tier": [t.__dict__ for t in _tier_summary(r)],
            "by_state": dict(sorted(all_by_state.items())),
            # SLA by kind — same computation as per-PR badges, so card headline ==
            # len(filter label:<kind>). eligible = in a pressing tier with this kind's
            # PR properties, regardless of threshold.
            "sla": (lambda ks: {
                "first_review_7d_breach":   sum(1 for k, b in ks if k == "1st-review" and b),
                "first_review_eligible":    sum(1 for k, b in ks if k == "1st-review"),
                "re_review_24h_breach":     sum(1 for k, b in ks if k == "re-review" and b),
                "re_review_eligible":       sum(1 for k, b in ks if k == "re-review"),
                "maint_24h_breach":         sum(1 for k, b in ks if k == "maintainer" and b),
                "maint_eligible":           sum(1 for k, b in ks if k == "maintainer"),
                "actionable_target":        25,
                "actionable_over_target":   len(r.actionable) > 25,
            })([s for p in r.prs if (s := _sla_kind(p)) is not None]),
        },
        "tiers": tier_blocks,
        "bot_corner": bot_blocks,
        "untriaged": untriaged_blocks,
        "not_our_move": not_our_move_blocks,
        "clusters": cluster_blocks,
    }


def format_headline(results: list[RepoResult], cost: dict, gist_url: str | None) -> str:
    lines = [f"📊 PR actionability — {now().date()}", ""]
    for r in results:
        short = r.repo.replace("typescript-sdk", "ts").replace("python-sdk", "py")
        lines.append(
            f"*{short}*: {len(r.actionable)} actionable · {len(r.auth)} auth · "
            f"~{r.effort_hours:.0f}h"
        )
        # Tier table as a Slack code block — markdown tables don't render there.
        rows = _tier_summary(r)
        if rows:
            lines.append("```")
            lines.append(f"{'tier':<20}{'count':>6}{'auth':>6}{'~time':>7}")
            for t in rows:
                label = f"{t.idx} " if 0 <= t.idx < 99 else ""
                name = _trunc(t.name, 18 - len(label))
                time_s = _fmt_mins(t.mins) if t.mins else "—"
                lines.append(f"{label}{name:<{20-len(label)}}{t.count:>6}{t.auth:>6}{time_s:>7}")
            lines.append("```")
        lines.append("")
    cost_str = f"${cost['total_usd']:.2f}" if cost['total_calls'] else "$0 (no LLM)"
    if gist_url:
        lines.append(f"digest cost {cost_str} · 🔗 <{gist_url}|details>")
    else:
        lines.append(f"digest cost {cost_str}")
    return "\n".join(lines)


def format_gist_markdown(r: RepoResult, cost: dict) -> str:
    """Single-repo gist body. Walks _structured(r) — no grouping logic here."""
    s = _structured(r)
    sm = s["summary"]
    date = now().strftime("%Y-%m-%d")
    n_act = sm["actionable_count"]
    out = [f"# {s['repo']} · {n_act} PRs actionable · {date}", ""]

    out.append(f"> **{n_act} actionable** · ~{sm['effort_hours']}h  ")
    out.append(f"> _Ctrl-F `[auth]` to jump between the {sm['auth_count']} auth PRs._")
    out.append("")

    if cost['total_calls']:
        out.append(f"<details><summary>digest cost ${cost['total_usd']:.2f} ({cost['total_calls']} LLM calls)</summary>")
        out.append("")
        out.append("| purpose | calls | cost |")
        out.append("|---|---|---|")
        for purpose, v in sorted(cost['by_purpose'].items(), key=lambda kv: -kv[1]['cost']):
            out.append(f"| {purpose} | {v['count']} | ${v['cost']:.2f} |")
        out.append("")
        out.append("</details>")
    else:
        out.append("_digest cost: $0 (no LLM calls)_")
    out.append("")

    # --- Summary table ---
    out.append("## Summary")
    out.append("")
    out.append("| tier | count | auth | maint. | ~time |")
    out.append("|---|---|---|---|---|")
    for t in sm["by_tier"]:
        if t["idx"] == 99:
            out.append(f"| bot corner | {t['count']} | 0 | 0 | — |")
        elif t["idx"] == -1:
            out.append(f"| _untriaged_ | {t['count']} | {t['auth']} | {t['maint']} | {_fmt_mins(t['mins'])} |")
        else:
            out.append(f"| {t['idx']} — {t['name']} | {t['count']} | {t['auth']} | {t['maint']} | {_fmt_mins(t['mins'])} |")
    tot_n = sum(t["count"] for t in sm["by_tier"])
    tot_a = sum(t["auth"] for t in sm["by_tier"])
    tot_m = sum(t["maint"] for t in sm["by_tier"])
    tot_mins = sum(t["mins"] for t in sm["by_tier"])
    out.append(f"| **total actionable** | **{tot_n}** | **{tot_a}** | **{tot_m}** | **{_fmt_mins(tot_mins)}** |")
    out.append(f"| not our move | {sm['not_our_move_count']} | — | — | — |")
    out.append("")
    out.append("---")
    out.append("")

    # --- Tier sections ---
    for tier in s["tiers"]:
        out.append(f"## Tier {tier['idx']} — {tier['name']} ({tier['count']} · ~{_fmt_mins(tier['mins'])})")
        out.append(f"> {tier['blurb']}")
        out.append("")
        for st in tier["states"]:
            if st["name"] == "duplicate-cluster":
                out.append(f"**{st['name']}** ({len(st['prs'])}) — {st['desc']}")
                out.append("")
                for c in st["clusters"]:
                    pick_note = f" · ⭐ #{c['pick']}" if c['pick'] else ""
                    if c["kind"] == "issue":
                        summary = f"Issue <a href='{c['issue_url']}'>#{c['id']}</a> — {len(c['members'])} PRs{pick_note}"
                    else:
                        summary = f"Semantic: {_trunc(c['desc'], 60)} — {len(c['members'])} PRs{pick_note}"
                    out.append(f"<details><summary>{summary}</summary>")
                    out.append("")
                    for m in c["members"]:
                        star = f"⭐ {m['cluster_pick_reason']}" if m['cluster_pick'] else ""
                        out.append(_pr_line_d(m, suffix=star))
                    out.append("")
                    out.append("</details>")
                    out.append("")
                continue
            out.append(f"<details><summary><b>{st['name']}</b> ({len(st['prs'])}) — {st['desc']}</summary>")
            out.append("")
            for p in st["prs"]:
                out.append(_pr_line_d(p, suffix=p["short_reason"]))
            out.append("")
            out.append("</details>")
            out.append("")

    # --- Bot corner ---
    if s["bot_corner"]:
        bot_total = sum(len(b["prs"]) for b in s["bot_corner"])
        out.append(f"## Bot corner ({bot_total})")
        out.append("> Dependabot/renovate PRs that need human attention.")
        out.append("")
        for st in s["bot_corner"]:
            out.append(f"<details><summary><b>{st['name']}</b> ({len(st['prs'])}) — {st['desc']}</summary>")
            out.append("")
            for p in st["prs"]:
                out.append(_pr_line_d(p, suffix=p["short_reason"]))
            out.append("")
            out.append("</details>")
            out.append("")

    # --- Unclassified ---
    if s["untriaged"]:
        out.append("## Unclassified")
        out.append("> Classifier couldn't place these. Investigate + fix the classifier.")
        out.append("")
        for st in s["untriaged"]:
            for p in st["prs"]:
                out.append(_pr_line_d(p, suffix=f"`{st['name']}` {p['short_reason']}"))
        out.append("")

    # --- Not our move ---
    if s["not_our_move"]:
        out.append(f"## Not our move ({sm['not_our_move_count']})")
        out.append("> Clock is on the author or blocked externally. No action owed today.")
        out.append("")
        for st in s["not_our_move"]:
            out.append(f"<details><summary><b>{st['name']}</b> ({len(st['prs'])}) — {st['desc']}</summary>")
            out.append("")
            for p in st["prs"]:
                out.append(_pr_line_d(p, suffix=p["short_reason"]))
            out.append("")
            out.append("</details>")
            out.append("")

    return "\n".join(out)


# --- Main ---

def classify_repo(repo: str, use_llm: bool, maintainers_json: str | None = None) -> RepoResult:
    owner, name = "modelcontextprotocol", repo

    if maintainers_json:
        print(f"[{repo}] loading maintainers from {maintainers_json}...", file=sys.stderr)
        maintainers = load_from_visr_json(repo, maintainers_json)
    else:
        print(f"[{repo}] loading maintainers from access repo...", file=sys.stderr)
        maintainers = load_maintainers(repo)
    global MAINTAINERS, _current_repo
    MAINTAINERS = maintainers.all
    _current_repo = repo
    print(f"[{repo}] {len(maintainers.all)} maintainers, {len(maintainers.auth)} auth", file=sys.stderr)

    print(f"[{repo}] fetching tier A...", file=sys.stderr)
    prs = fetch_tier_a(owner, name, maintainers)
    print(f"[{repo}] {len(prs)} open PRs", file=sys.stderr)

    # Cluster + backport detection on full set (before any routing)
    detect_clusters(prs)
    detect_backport_pairs(prs)

    print(f"[{repo}] classifying...", file=sys.stderr)
    for i, pr in enumerate(prs):
        try:
            classify(pr, owner, name, use_llm=use_llm)
            reconcile_label_hint(pr)
        except subprocess.CalledProcessError as e:
            pr.state, pr.reason = "unknown", f"fetch failed: {e.stderr[:100] if e.stderr else e}"
        except LLMError as e:
            pr.state, pr.reason = "unknown", f"llm failed after retry: {str(e)[:100]}"
        if (i + 1) % 20 == 0:
            print(f"[{repo}] {i+1}/{len(prs)}", file=sys.stderr)

    # Semantic clustering needs CI status from classify(), so runs after
    print(f"[{repo}] semantic cluster detection...", file=sys.stderr)
    detect_semantic_clusters(prs, owner, name, use_llm=use_llm)
    pick_cluster_winners(prs, owner, name, use_llm=use_llm)

    # LLM fallback for remaining unknowns
    if use_llm:
        unknowns = [p for p in prs if p.state == "unknown" and p.needs_llm == "unknown"]
        if unknowns:
            print(f"[{repo}] llm fallback for {len(unknowns)} unknowns...", file=sys.stderr)
            for pr in unknowns:
                try:
                    pr.state, pr.reason = llm_unknown_fallback(pr, repo)
                except LLMError as e:
                    pr.reason = f"llm fallback failed after retry: {str(e)[:100]}"

    return RepoResult(repo=repo, prs=prs)


def main() -> None:
    ap = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
    ap.add_argument("--repo", required=True, help="typescript-sdk or python-sdk (comma-separated ok)")
    ap.add_argument("--no-llm", action="store_true", help="skip claude -p calls; semantic clusters flagged not resolved")
    ap.add_argument("--json", metavar="FILE", nargs="?", const="-",
                    help="write full classification + cost as JSON (file path, or '-' for stdout)")
    ap.add_argument("--gist-markdown", metavar="FILE", nargs="?", const="-",
                    help="write gist-ready markdown (file path, or '-' for stdout)")
    ap.add_argument("--visr-json", metavar="FILE",
                    help="write per-repo structured JSON for VISR dashboard (use {repo} placeholder for multi-repo)")
    ap.add_argument("--maintainers-json", metavar="PATH",
                    help="load maintainers from pre-parsed JSON ({github, roles} entries) instead of the access repo checkout")
    args = ap.parse_args()

    repos = [r.strip() for r in args.repo.split(",") if r.strip()]
    if not repos:
        ap.error("--repo resolved to empty (shell var not expanded? try --repo typescript-sdk literally)")

    use_llm = not args.no_llm
    if use_llm and not os.environ.get("ANTHROPIC_API_KEY"):
        print("WARNING: ANTHROPIC_API_KEY not set — falling back to --no-llm mode. "
              "Semantic clusters won't resolve, intent-ambiguous PRs stay in 'unknown'.", file=sys.stderr)
        use_llm = False
    if use_llm and shutil.which("claude") is None:
        print("WARNING: 'claude' CLI not on PATH — falling back to --no-llm mode.", file=sys.stderr)
        use_llm = False

    results: list[RepoResult] = []
    for repo in repos:
        results.append(classify_repo(repo, use_llm=use_llm, maintainers_json=args.maintainers_json))

    cost = llm_cost_summary()
    if cost["total_calls"]:
        print(f"\n[llm cost] ${cost['total_usd']:.2f} across {cost['total_calls']} calls:", file=sys.stderr)
        for purpose, v in cost["by_purpose"].items():
            print(f"  {purpose}: {v['count']} calls, ${v['cost']:.2f}", file=sys.stderr)

    def _emit(content: str, target: str | None) -> None:
        if target is None:
            return
        if target == "-":
            print(content)
        else:
            Path(target).write_text(content)
            print(f"wrote {target}", file=sys.stderr)

    def _per_repo_target(template: str, repo: str, flag: str) -> str:
        if "{repo}" in template:
            return template.replace("{repo}", repo)
        if len(results) > 1:
            print(f"error: {flag} needs '{{repo}}' placeholder for multi-repo runs", file=sys.stderr)
            sys.exit(1)
        return template

    if args.gist_markdown is not None:
        for r in results:
            target = _per_repo_target(args.gist_markdown, r.repo, "--gist-markdown")
            _emit(format_gist_markdown(r, llm_cost_summary(repo=r.repo)), target)

    if args.visr_json is not None:
        for r in results:
            target = _per_repo_target(args.visr_json, r.repo, "--visr-json")
            s = _structured(r)
            s["llm_cost"] = llm_cost_summary(repo=r.repo)
            _emit(json.dumps(s, indent=2), target)

    if args.json is not None:
        # Envelope: top-level scalars for jq extraction; full _structured per repo.
        out = {
            "date": now().isoformat(),
            "llm_cost": cost,
            "headline": format_headline(results, cost, gist_url=None),
            "repos": [
                {
                    # Flat scalars the skill reads (.repos[].actionable_count etc.)
                    "repo": r.repo,
                    "actionable_count": len(r.actionable),
                    "auth_count": len(r.auth),
                    "effort_hours": round(r.effort_hours, 1),
                    "llm_cost": llm_cost_summary(repo=r.repo),
                    # Full structured tree — the actual data model
                    "structured": _structured(r),
                }
                for r in results
            ],
        }
        _emit(json.dumps(out, indent=2), args.json)

    if args.json is None and args.gist_markdown is None and args.visr_json is None:
        # No output target specified — print headline only
        print(format_headline(results, cost, gist_url=None))


if __name__ == "__main__":
    main()
