#!/usr/bin/env ruby
# frozen_string_literal: true

# bin/railway — single-file Ruby tooling for showcase Railway operations.
#
# Subcommands:
#   snapshot           Capture current service config to a YAML snapshot.
#   restore            Restore a snapshot to an environment (force-redeploy).
#   rollback           Roll a single service back to its previous deploy.
#   rollback-commit    Roll all services back to digests captured at a git SHA.
#   promote            Promote staging snapshot to production with prechecks.
#   pin                Pin a service to a specific image digest.
#   env-diff           Diff two environments and exit non-zero on drift.
#   resolve-digest     Resolve an image reference to its content digest via GHCR.
#   lint-prod          Verify production is fully digest-pinned (CI gate).
#
# Stdlib only — no Bundler, no Gemfile. Ruby 3.x.
#
# Auth: RAILWAY_TOKEN env var, or ~/.railway/config.json (token field).
# Never invokes `railway login` / `railway logout` / `op` / any external CLI.
#
# Production protection: --yes + typed env confirmation (unless --non-interactive).
#
# Exit codes:
#   0  clean / success
#   1  drift / findings
#   2  error (network, auth, schema, etc.)

require "json"
require "net/http"
require "optparse"
require "set"
require "uri"
require "yaml"
require "io/console"
require "fileutils"
require "time"

module Railway
    VERSION = "0.1.0"

    # ── Constants ──────────────────────────────────────────────────────────────

    WORKSPACE              = "CopilotKit"
    PROJECT_ID             = "6f8c6bff-a80d-4f8f-b78d-50b32bcf4479"
    PRODUCTION_ENV_ID      = "b14919f4-6417-429f-848d-c6ae2201e04f"
    STAGING_ENV_ID         = "8edfef02-ea09-4a20-8689-261f21cc2849"
    GHCR_ORG               = "copilotkit"

    GRAPHQL_ENDPOINT       = "https://backboard.railway.app/graphql/v2"

    ENV_IDS = {
        "production" => PRODUCTION_ENV_ID,
        "prod"       => PRODUCTION_ENV_ID,
        "staging"    => STAGING_ENV_ID,
        "stage"      => STAGING_ENV_ID,
    }.freeze

    # Env vars required to be present (parity-checked) before any promote.
    CRITICAL_ENV_KEYS = %w[
        RAILWAY_TOKEN
        GHCR_TOKEN
        SHARED_SECRET
        OPS_TRIGGER_TOKEN
        POCKETBASE_SUPERUSER_EMAIL
        POCKETBASE_SUPERUSER_PASSWORD
        GITHUB_APP_PRIVATE_KEY
        OPENAI_API_KEY
        ANTHROPIC_API_KEY
        GOOGLE_API_KEY
    ].freeze

    # EXPECTED_DOMAINS is derived from showcase/scripts/railway-envs.generated.json
    # (canonical source: showcase/scripts/railway-envs.ts). The CI guard
    # `npx tsx showcase/scripts/emit-railway-envs-json.ts --check` ensures the
    # JSON artifact stays in sync with the TS SSOT on every PR. Only public
    # (non-`*.up.railway.app`) hosts are included, preserving the original set.
    # Shared SSOT load — read once at class-load, derive both EXPECTED_DOMAINS
    # and STAGING_SERVICES from the same generated.json so they cannot drift.
    SSOT_DATA = begin
        ssot_json = File.expand_path("../scripts/railway-envs.generated.json", __dir__)
        unless File.exist?(ssot_json)
            raise "railway-envs.generated.json not found at #{ssot_json}. " \
                  "Re-run: npx tsx showcase/scripts/emit-railway-envs-json.ts"
        end
        JSON.parse(File.read(ssot_json)).freeze
    end

    EXPECTED_DOMAINS = begin
        by_env = { PRODUCTION_ENV_ID => [], STAGING_ENV_ID => [] }
        SSOT_DATA.fetch("services").each do |svc|
            domains = svc.fetch("domains")
            prod = domains.fetch("prod")
            staging = domains.fetch("staging")
            by_env[PRODUCTION_ENV_ID] << prod unless prod.end_with?(".up.railway.app")
            by_env[STAGING_ENV_ID] << staging unless staging.end_with?(".up.railway.app")
        end
        by_env.transform_values { |v| v.sort.uniq.freeze }.freeze
    end

    # Canonical staging service names — used to validate the optional
    # positional `bin/railway promote <service>` argument. Sourced from the
    # same SSOT as EXPECTED_DOMAINS so they cannot drift.
    STAGING_SERVICES = SSOT_DATA.fetch("services").map { |s| s.fetch("name") }.sort.freeze

    # ── Token / Auth ───────────────────────────────────────────────────────────

    module Auth
        module_function

        def token
            t = ENV["RAILWAY_TOKEN"]
            return t.strip if t && !t.strip.empty?

            cfg = File.expand_path("~/.railway/config.json")
            if File.exist?(cfg)
                begin
                    data = JSON.parse(File.read(cfg))
                    # Railway CLI stores the bearer in `user.accessToken` (43+ chars).
                    # `user.token` is a short legacy CLI session token that does
                    # NOT authenticate to the public GraphQL API. Prefer accessToken.
                    candidate =
                        data.dig("user", "accessToken") ||
                        data["accessToken"] ||
                        data.dig("user", "token") ||
                        data["token"] ||
                        data.dig("projects", PROJECT_ID, "token")
                    return candidate.strip if candidate.is_a?(String) && !candidate.strip.empty?
                rescue JSON::ParserError
                    # fall through
                end
            end

            nil
        end

        def require_token!
            t = token
            if t.nil? || t.empty?
                Railway.die!("RAILWAY_TOKEN not set and ~/.railway/config.json not usable. " \
                    "Export RAILWAY_TOKEN before running.")
            end
            t
        end

        # GHCR bearer. Distinct from the Railway token (Auth.token).
        # Resolution order:
        #   1. GHCR_TOKEN — explicit PAT (local dev or CI override).
        #   2. GITHUB_TOKEN — GitHub Actions automatic token (needs packages:read).
        # Returns nil if no token is available; caller MUST refuse rather than
        # silently fall through to anonymous (which works for public images but
        # not for digest-existence verification against private repos).
        def ghcr_token
            t = ENV["GHCR_TOKEN"]
            return t.strip if t && !t.strip.empty?
            t = ENV["GITHUB_TOKEN"]
            return t.strip if t && !t.strip.empty?
            nil
        end
    end

    # ── GraphQL Client ─────────────────────────────────────────────────────────

    class GraphQL
        class Error < StandardError; end

        def initialize(token: nil, endpoint: GRAPHQL_ENDPOINT, http: nil)
            @token = token || Auth.require_token!
            @endpoint = endpoint
            @http = http # optional injection for tests
        end

        # Execute a query. Returns parsed data hash; raises on errors.
        def query(query, variables = {})
            body = { query: query, variables: variables }.to_json

            if @http
                resp = @http.call(endpoint: @endpoint, token: @token, body: body)
            else
                uri = URI(@endpoint)
                req = Net::HTTP::Post.new(uri)
                req["Content-Type"] = "application/json"
                req["Authorization"] = "Bearer #{@token}"
                req.body = body

                resp = Net::HTTP.start(uri.host, uri.port, use_ssl: true,
                                        open_timeout: 10, read_timeout: 30) do |h|
                    h.request(req)
                end
            end

            status = resp.respond_to?(:code) ? resp.code.to_i : resp[:status].to_i
            body_str = resp.respond_to?(:body) ? resp.body : resp[:body]

            raise Error, "HTTP #{status}: #{body_str}" if status >= 400

            parsed = JSON.parse(body_str)
            if parsed["errors"] && !parsed["errors"].empty?
                msgs = parsed["errors"].map { |e| e["message"] }.join("; ")
                raise Error, "GraphQL: #{msgs}"
            end
            parsed["data"]
        end
    end

    # ── GHCR Client ────────────────────────────────────────────────────────────
    #
    # We need to resolve a tag (e.g. `ghcr.io/copilotkit/showcase-shell:latest`)
    # to its content-addressable digest (`sha256:...`). GHCR exposes the
    # OCI Distribution Spec at https://ghcr.io/v2/<org>/<image>/manifests/<tag>.
    # Requires a bearer token; for public images, a token issued via the
    # /token endpoint works.
    class GHCR
        class Error < StandardError; end

        ACCEPT_MANIFEST = [
            "application/vnd.oci.image.index.v1+json",
            "application/vnd.oci.image.manifest.v1+json",
            "application/vnd.docker.distribution.manifest.list.v2+json",
            "application/vnd.docker.distribution.manifest.v2+json",
        ].join(", ").freeze

        def initialize(token: Railway::Auth.ghcr_token, http: nil)
            @token = token
            @http = http
        end

        # Resolve "ghcr.io/copilotkit/showcase-shell:latest" to "sha256:<...>".
        # Returns nil if the tag does not exist.
        def resolve_digest(image_ref)
            parts = parse_image_ref(image_ref)
            return parts[:digest] if parts[:digest] # already pinned

            org   = parts[:org]
            name  = parts[:name]
            tag   = parts[:tag] || "latest"

            bearer = bearer_for(org, name)
            url = "https://ghcr.io/v2/#{org}/#{name}/manifests/#{tag}"

            if @http
                resp = @http.call(method: :head, url: url, headers: headers(bearer))
            else
                resp = http_head(url, headers: headers(bearer))
            end

            status = resp[:status]
            return nil if status == 404
            raise Error, "GHCR manifest HEAD #{status} for #{image_ref}" if status >= 400

            digest = resp[:headers]["docker-content-digest"] ||
                resp[:headers]["Docker-Content-Digest"]
            raise Error, "GHCR did not return Docker-Content-Digest for #{image_ref}" if digest.nil?

            digest
        end

        # Verify a digest-pinned image exists in GHCR. Returns:
        #   :exists       — HEAD returned 200.
        #   :missing      — HEAD returned 404 (digest GC'd or never built).
        #   :auth_failed  — HEAD returned 401/403 (token missing/insufficient).
        # Raises GHCR::Error on 5xx or transport failure.
        #
        # The caller MUST pass an image ref already pinned to @sha256:<digest>;
        # this is the "P1 precondition" check from the showcase deploy spec, not
        # a tag-resolution. We do not chase a tag here — we verify the exact
        # bytes about to be pinned to prod.
        def manifest_exists(image_ref)
            parts = parse_image_ref(image_ref)
            digest = parts[:digest]
            raise ArgumentError, "manifest_exists requires a digest-pinned ref, got #{image_ref}" if digest.nil?

            org  = parts[:org]
            name = parts[:name]

            bearer = bearer_for(org, name)
            url = "https://ghcr.io/v2/#{org}/#{name}/manifests/#{digest}"

            resp =
                if @http
                    @http.call(method: :head, url: url, headers: headers(bearer))
                else
                    http_head(url, headers: headers(bearer))
                end

            status = resp[:status]
            case status
            when 200       then :exists
            when 404       then :missing
            when 401, 403  then :auth_failed
            else
                raise Error, "GHCR manifest HEAD #{status} for #{image_ref}"
            end
        end

        # Parse image ref into { registry, org, name, tag, digest }.
        def parse_image_ref(ref)
            r = ref.to_s.strip
            registry = nil
            if r.start_with?("ghcr.io/")
                registry = "ghcr.io"
                r = r.sub(/^ghcr\.io\//, "")
            end

            digest = nil
            if r.include?("@")
                r, digest = r.split("@", 2)
            end

            tag = nil
            if r.include?(":")
                r, tag = r.rsplit_colon
            end

            org, name = r.split("/", 2)
            { registry: registry, org: org, name: name, tag: tag, digest: digest }
        end

        private

        def headers(bearer)
            h = { "Accept" => ACCEPT_MANIFEST }
            h["Authorization"] = "Bearer #{bearer}" if bearer
            h
        end

        # Mint the bearer used for manifest reads via GHCR's /token exchange.
        #
        # GHCR's OCI manifest endpoint rejects a RAW GitHub/Actions token sent as
        # `Authorization: Bearer <token>` with HTTP 403 — the token must first be
        # exchanged for a short-lived registry bearer. So we ALWAYS hit /token,
        # even when a token is present (the previous "return @token raw" path was
        # the bug that made every `docs` promote 403 in CI).
        #
        # When a token is present we authenticate the exchange with Basic auth:
        # base64("x-access-token:<token>") — the username is arbitrary, the token
        # is the password. For public packages the exchange also succeeds
        # anonymously, so a missing token still yields a usable bearer.
        def bearer_for(org, name)
            url = "https://ghcr.io/token?service=ghcr.io&scope=repository:#{org}/#{name}:pull"
            headers = {}
            if @token && !@token.empty?
                headers["Authorization"] = "Basic " + ["x-access-token:#{@token}"].pack("m0")
            end

            resp =
                if @http
                    @http.call(method: :get, url: url, headers: headers)
                else
                    http_get(url, headers: headers)
                end

            status = resp[:status]
            if status >= 400
                # A token WAS supplied but the exchange failed: surface it. The
                # caller must NOT silently downgrade to an anonymous manifest
                # read (that conflates "no token" with "supplied token failed").
                raise Error, "GHCR /token exchange failed (#{status}) for #{org}/#{name}" if @token && !@token.empty?

                # No token supplied: a non-2xx is the legitimate signal that the
                # package is not anonymously pullable. The caller handles nil.
                return nil
            end

            begin
                JSON.parse(resp[:body])["token"]
            rescue JSON::ParserError => e
                raise Error, "GHCR /token returned unparseable body for #{org}/#{name}: #{e.message}"
            end
        end

        def http_get(url, headers: {})
            uri = URI(url)
            req = Net::HTTP::Get.new(uri)
            headers.each { |k, v| req[k] = v }
            resp = Net::HTTP.start(uri.host, uri.port, use_ssl: true,
                                    open_timeout: 10, read_timeout: 30) do |h|
                h.request(req)
            end
            { status: resp.code.to_i, headers: resp.to_hash.transform_values(&:first), body: resp.body }
        end

        def http_head(url, headers: {})
            uri = URI(url)
            req = Net::HTTP::Head.new(uri)
            headers.each { |k, v| req[k] = v }
            resp = Net::HTTP.start(uri.host, uri.port, use_ssl: true,
                                    open_timeout: 10, read_timeout: 30) do |h|
                h.request(req)
            end
            hdrs = {}
            resp.each_header { |k, v| hdrs[k] = v; hdrs[k.downcase] = v }
            { status: resp.code.to_i, headers: hdrs, body: resp.body }
        end
    end

    # ── Helpers ────────────────────────────────────────────────────────────────

    module_function

    def die!(msg, code: 2)
        warn "railway: #{msg}"
        exit code
    end

    def env_id_for(name)
        ENV_IDS[name.to_s.downcase] || die!("Unknown env: #{name.inspect}. Use staging or production.")
    end

    def env_label(env_id)
        case env_id
        when PRODUCTION_ENV_ID then "production"
        when STAGING_ENV_ID    then "staging"
        else env_id
        end
    end

    def production?(env_id)
        env_id == PRODUCTION_ENV_ID
    end

    # Prompt for typed confirmation. Returns true if confirmed.
    def confirm_destructive!(env_label:, action:, non_interactive: false, yes: false)
        return true unless production?(env_id_for(env_label))

        unless yes
            die!("Refusing #{action} on production without --yes.", code: 2)
        end

        if non_interactive
            warn "[non-interactive] proceeding with #{action} on production (--yes given)."
            return true
        end

        $stderr.print "Type 'production' to confirm #{action}: "
        line = $stdin.gets&.strip
        unless line == "production"
            die!("Confirmation phrase mismatch. Aborting.", code: 2)
        end
        true
    end

    # Find service entry in a snapshot by name.
    def find_service(snapshot, name)
        (snapshot["services"] || []).find { |s| s["name"] == name }
    end

    # ── Snapshot Schema ────────────────────────────────────────────────────────
    #
    # snapshot:
    #   version: 1
    #   captured_at: <ISO8601>
    #   project_id: <uuid>
    #   environment:
    #     id: <uuid>
    #     name: production|staging
    #   services:
    #     - name: showcase-shell
    #       service_id: <uuid>
    #       image: ghcr.io/copilotkit/showcase-shell@sha256:...
    #       image_tag: ghcr.io/copilotkit/showcase-shell:latest
    #       digest: sha256:...
    #       start_command: <string|nil>
    #       auto_updates_disabled: <bool>
    #       latest_deployment_id: <uuid|nil>
    #       env_keys: [KEY1, KEY2, ...]   # keys only, never values
    #       custom_domains: [...]
    #
    # We capture KEYS only for env vars (never values) so snapshots are safe
    # to commit/share.
    class SnapshotIO
        SCHEMA_VERSION = 2

        # v1: initial fields (name, service_id, image, image_tag, digest,
        #     start_command, auto_updates_disabled, latest_deployment_id,
        #     env_keys, custom_domains).
        # v2: adds healthcheck_path, region, replicas, restart_policy
        #     (for the P6 parity matrix in promote). We accept v1 reads
        #     for backwards-compat with historical committed snapshots —
        #     promote always uses live snapshots, but rollback-commit
        #     replays snapshots from arbitrary SHAs.
        SUPPORTED_VERSIONS = [1, SCHEMA_VERSION].freeze

        def self.write(path, snapshot)
            FileUtils.mkdir_p(File.dirname(path)) unless path == "-"
            yaml = YAML.dump(snapshot)
            if path == "-"
                $stdout.write(yaml)
            else
                File.write(path, yaml)
            end
        end

        def self.read(path)
            raw = path == "-" ? $stdin.read : File.read(path)
            data = YAML.safe_load(raw, permitted_classes: [Time, Symbol], aliases: false)
            unless data.is_a?(Hash) && SUPPORTED_VERSIONS.include?(data["version"])
                Railway.die!("Snapshot schema mismatch (expected version in #{SUPPORTED_VERSIONS.inspect}).")
            end
            data
        end
    end

    # ── Service Inventory ──────────────────────────────────────────────────────
    #
    # GraphQL fragments used by snapshot/env-diff/promote/lint-prod.
    #
    # Railway's public schema notes (verified via introspection 2026-05):
    #   * Project has NO `domains` field. Custom domains are reached either via
    #     top-level `domains(projectId, environmentId, serviceId)` returning
    #     `AllDomains { customDomains, serviceDomains }`, or via
    #     `serviceInstance.domains` (same shape).
    #   * Service has NO `serviceInstances` field. To get an instance's
    #     image/startCommand/etc. for a given env, use
    #     `serviceInstance(serviceId, environmentId)` directly.
    #   * `serviceInstanceDeployV2(serviceId, environmentId, commitSha?)` returns
    #     the NEW deployment id (String!). `commitSha` is optional. To pin a
    #     service to a specific image, use
    #     `serviceInstanceUpdate(serviceId, environmentId, input: { source: { image } })`
    #     followed by `serviceInstanceDeployV2` — which spawns a NEW deployment
    #     that PULLS the just-updated `source.image`. Do NOT use
    #     `serviceInstanceRedeploy(serviceId, environmentId)` for this: it replays
    #     the EXISTING deployment's snapshot (its OLD image), so the freshly
    #     pinned `source.image` never reaches the running container (bug #2).
    #   * `Environment.variables` returns an `EnvironmentVariablesConnection`
    #     whose edges include `node.serviceId` — we filter by service id to get
    #     per-service env-key sets.
    #
    # The query below enumerates all services in the project and, for each one,
    # the per-environment serviceInstance and that env's variables (keys only).
    # We use GraphQL field aliases to fetch all per-service data in a single
    # round-trip rather than N+1.

    SERVICES_LIST_QUERY = <<~GQL
        query ProjectServices($projectId: String!) {
            project(id: $projectId) {
                id
                name
                services {
                    edges {
                        node { id name }
                    }
                }
            }
        }
    GQL

    SERVICE_INSTANCE_QUERY = <<~GQL
        query ServiceInstance($serviceId: String!, $envId: String!) {
            serviceInstance(serviceId: $serviceId, environmentId: $envId) {
                id
                serviceId
                environmentId
                startCommand
                healthcheckPath
                region
                numReplicas
                restartPolicyType
                source { image repo }
                latestDeployment { id status }
                domains {
                    customDomains { id domain }
                    serviceDomains { id domain }
                }
            }
        }
    GQL

    ENVIRONMENT_VARIABLES_QUERY = <<~GQL
        query EnvVariables($envId: String!) {
            environment(id: $envId) {
                id
                name
                variables(first: 1000) {
                    edges {
                        node { name serviceId isSealed }
                    }
                }
            }
        }
    GQL

    # Per-(service,env) variable VALUES. The snapshot path (above) deliberately
    # captures KEYS ONLY so snapshots stay safe to commit. The U5 promote
    # preflight, by contrast, must compare a FEW specific VALUES (a serviceRef
    # host assertion, a prod-specific-key shape assertion, a replicate-class
    # copy) — so it reads values transiently at runtime, never persisting them.
    # `variables(serviceId)` scopes to a single service so we do not pull the
    # whole env's secret surface. Sealed values come back as nil (Railway will
    # not return a sealed value), which the callers treat as "unreadable" and
    # never copy.
    # Railway's top-level `variables(projectId, environmentId, serviceId)`
    # returns a JSON scalar (a flat `{ "NAME" => "value", ... }` map) — NOT a
    # connection. (`Environment.variables` IS a connection but its node carries
    # no `value`, which is why the snapshot path reads keys only.)
    SERVICE_VARIABLES_QUERY = <<~GQL
        query EnvServiceVariables($projectId: String!, $serviceId: String!, $envId: String!) {
            variables(projectId: $projectId, serviceId: $serviceId, environmentId: $envId)
        }
    GQL

    # Upsert a single environment variable VALUE for a (service,env). This is
    # the variable*Upsert family (NOT serviceInstanceUpdate(source.image), which
    # only sets the image). Used ONLY by U5's opt-in replicate-class env-write;
    # the default replicate set is empty, so this fires for no key today.
    VARIABLE_UPSERT_MUTATION = <<~GQL
        mutation VariableUpsert($serviceId: String!, $envId: String!, $name: String!, $value: String!) {
            variableUpsert(
                input: {
                    projectId: "#{PROJECT_ID}"
                    serviceId: $serviceId
                    environmentId: $envId
                    name: $name
                    value: $value
                }
            )
        }
    GQL

    # ── Subcommands ────────────────────────────────────────────────────────────

    class BaseCommand
        attr_reader :argv, :options

        def initialize(argv)
            @argv = argv.dup
            @options = default_options
        end

        def default_options
            {
                env: nil,
                yes: false,
                non_interactive: false,
                dry_run: false,
                output: nil,
            }
        end

        def self.call(argv)
            new(argv).run
        end

        # Each subcommand must implement run and parser.
        def run
            raise NotImplementedError
        end

        def parser
            raise NotImplementedError
        end

        # Returns a Railway::GraphQL client (mockable via @gql=).
        def gql
            @gql ||= GraphQL.new
        end

        # Returns a Railway::GHCR client.
        def ghcr
            @ghcr ||= GHCR.new
        end
    end

    # snapshot — capture an environment's state into a YAML file.
    class SnapshotCommand < BaseCommand
        def parser
            OptionParser.new do |o|
                o.banner = <<~BANNER
                    Usage: bin/railway snapshot --env <staging|production> [--output FILE] [--dry-run]

                      Capture current image digests, start commands, env-var KEYS,
                      and custom domains for every service in the given env.

                      Exits 0 on success. Exits 2 on error.
                BANNER
                o.on("--env ENV", "Environment (staging|production)") { |v| options[:env] = v }
                o.on("--output FILE", "Write to FILE (default stdout)") { |v| options[:output] = v }
                o.on("--dry-run", "Capture but do not write to disk") { options[:dry_run] = true }
                o.on("-h", "--help", "Show this help") { puts o; exit 0 }
            end
        end

        def run
            parser.parse!(argv)
            Railway.die!("--env is required") unless options[:env]
            env_id = Railway.env_id_for(options[:env])

            snap = build_snapshot(env_id)

            if options[:dry_run] && options[:output].nil?
                # always dump to stdout for dry-run with no output
                puts YAML.dump(snap)
                return 0
            end

            path = options[:output] ||
                "showcase/.railway-snapshots/#{Time.now.utc.strftime('%Y%m%dT%H%M%SZ')}-#{options[:env]}.yaml"

            SnapshotIO.write(path, snap)
            warn "wrote snapshot: #{path}" unless path == "-"
            0
        end

        def build_snapshot(env_id)
            # 1. List all services in the project.
            list = gql.query(SERVICES_LIST_QUERY, projectId: PROJECT_ID)
            service_nodes = (list.dig("project", "services", "edges") || []).map { |e| e["node"] }

            # 2. Fetch the env's variables once, group keys by serviceId.
            env_data = gql.query(ENVIRONMENT_VARIABLES_QUERY, envId: env_id)
            keys_by_service = Hash.new { |h, k| h[k] = [] }
            (env_data.dig("environment", "variables", "edges") || []).each do |edge|
                n = edge["node"]
                next unless n && n["serviceId"]
                keys_by_service[n["serviceId"]] << n["name"]
            end

            # 3. For each service, fetch its serviceInstance for this env.
            #    Some services may not exist in this env. Railway signals that
            #    two different ways for the SAME underlying condition:
            #      (a) the field resolves to null (`serviceInstance` => nil), or
            #      (b) the query THROWS `GraphQL: ServiceInstance not found`
            #          (a half-deleted service that still appears in the project
            #          service list but has no instance in this env).
            #    Both mean "this service has no instance here" — skip it and keep
            #    going. We deliberately scope the rescue to ONLY that message so
            #    every other GraphQL failure (auth, rate-limit, schema drift,
            #    transient 5xx surfaced as a GraphQL error) still propagates
            #    fail-loud. Otherwise a single odd service aborts the whole
            #    snapshot — and with it the whole promote — before any preflight
            #    runs (see run 27144525566).
            services = []
            service_nodes.each do |node|
                begin
                    instance_data = gql.query(SERVICE_INSTANCE_QUERY,
                        serviceId: node["id"], envId: env_id)
                rescue GraphQL::Error => e
                    raise unless e.message.include?("ServiceInstance not found")
                    warn "skipping service #{node['name'].inspect} (#{node['id']}): no serviceInstance in this env (#{e.message})"
                    next
                end
                inst = instance_data["serviceInstance"]
                next if inst.nil?

                image_ref = inst.dig("source", "image")
                digest = nil
                tag_ref = image_ref
                if image_ref&.include?("@sha256:")
                    tag_ref, digest = image_ref.split("@", 2)
                end

                custom_domains = (inst.dig("domains", "customDomains") || [])
                    .map { |d| d["domain"] }.compact.sort

                services << {
                    "name"                  => node["name"],
                    "service_id"            => node["id"],
                    "image"                 => image_ref,
                    "image_tag"             => tag_ref,
                    "digest"                => digest,
                    "start_command"         => inst["startCommand"],
                    "healthcheck_path"      => inst["healthcheckPath"],
                    "region"                => inst["region"],
                    "replicas"              => inst["numReplicas"],
                    "restart_policy"        => inst["restartPolicyType"],
                    "auto_updates_disabled" => nil,
                    "latest_deployment_id"  => inst.dig("latestDeployment", "id"),
                    "env_keys"              => (keys_by_service[node["id"]] || []).sort.uniq,
                    "custom_domains"        => custom_domains,
                }
            end

            {
                "version"     => SnapshotIO::SCHEMA_VERSION,
                "captured_at" => Time.now.utc.iso8601,
                "project_id"  => PROJECT_ID,
                "environment" => { "id" => env_id, "name" => Railway.env_label(env_id) },
                "services"    => services.sort_by { |s| s["name"].to_s },
            }
        end
    end

    # restore — given a snapshot YAML, force-redeploy each service to its
    # captured digest via serviceInstanceUpdate(source.image) + deploy.
    #
    # To pin a service to a specific image digest we must:
    #   1. update the service instance's source.image to the desired ref
    #   2. spawn a NEW deployment via serviceInstanceDeployV2 that PULLS the
    #      just-updated source.image.
    #
    # NOTE: serviceInstanceRedeploy is deliberately NOT used here — it replays
    # the existing deployment's snapshot (its OLD image), so the freshly pinned
    # source.image never reaches the running container (bug #2).
    class RestoreCommand < BaseCommand
        UPDATE_IMAGE_MUTATION = <<~GQL
            mutation UpdateImage($serviceId: String!, $envId: String!, $image: String!) {
                serviceInstanceUpdate(
                    serviceId: $serviceId
                    environmentId: $envId
                    input: { source: { image: $image } }
                )
            }
        GQL

        # Variant of UPDATE_IMAGE_MUTATION that ALSO re-asserts the SSOT
        # healthcheckPath alongside source.image. Railway's
        # ServiceInstanceUpdateInput accepts healthcheckPath next to source (see
        # deploy-to-railway.ts / provision-starter-fleet.ts, which set it via the
        # same mutation), and treats an absent input key as "leave as-is" — so a
        # partial update of {source, healthcheckPath} is safe.
        #
        # CRITICAL: this variant is used ONLY when the SSOT declares a path for
        # the target service+env. When the SSOT has NO path (a live-null
        # service), the caller uses the plain UPDATE_IMAGE_MUTATION above so the
        # `healthcheckPath` key is OMITTED entirely — we MUST NOT send
        # `healthcheckPath: null`, which would actively CLEAR it on Railway.
        UPDATE_IMAGE_AND_HEALTHCHECK_MUTATION = <<~GQL
            mutation UpdateImage($serviceId: String!, $envId: String!, $image: String!, $healthcheckPath: String!) {
                serviceInstanceUpdate(
                    serviceId: $serviceId
                    environmentId: $envId
                    input: { source: { image: $image }, healthcheckPath: $healthcheckPath }
                )
            }
        GQL

        # serviceInstanceDeployV2 spawns a NEW deployment that pulls the current
        # source.image (just advanced by UPDATE_IMAGE_MUTATION) and returns the
        # new deployment id (String!). This is what actually activates the pin.
        DEPLOY_V2_MUTATION = <<~GQL
            mutation DeployV2($serviceId: String!, $envId: String!) {
                serviceInstanceDeployV2(
                    serviceId: $serviceId
                    environmentId: $envId
                )
            }
        GQL

        # Helper used by RestoreCommand, PinCommand: pin then deploy a NEW
        # deployment that pulls the updated source.image.
        def self.pin_and_redeploy(gql, service_id:, env_id:, image:)
            gql.query(UPDATE_IMAGE_MUTATION,
                serviceId: service_id, envId: env_id, image: image)
            gql.query(DEPLOY_V2_MUTATION,
                serviceId: service_id, envId: env_id)
        end

        def parser
            OptionParser.new do |o|
                o.banner = <<~BANNER
                    Usage: bin/railway restore --env ENV --snapshot FILE [--yes] [--non-interactive] [--dry-run] [--service NAME]

                      Restore an environment to the digests captured in a snapshot.
                      Production requires --yes + typed 'production' confirmation
                      (or --non-interactive to skip the prompt).

                      Exits 0 on success. Exits 2 on auth/network/refused errors.
                BANNER
                o.on("--env ENV") { |v| options[:env] = v }
                o.on("--snapshot FILE") { |v| options[:snapshot] = v }
                o.on("--service NAME", "Restrict to a single service") { |v| options[:service] = v }
                o.on("--yes") { options[:yes] = true }
                o.on("--non-interactive") { options[:non_interactive] = true }
                o.on("--dry-run") { options[:dry_run] = true }
                o.on("-h", "--help") { puts o; exit 0 }
            end
        end

        def run
            parser.parse!(argv)
            Railway.die!("--env required") unless options[:env]
            Railway.die!("--snapshot required") unless options[:snapshot]

            env_id = Railway.env_id_for(options[:env])
            snap = SnapshotIO.read(options[:snapshot])

            Railway.confirm_destructive!(
                env_label: options[:env],
                action: "restore",
                non_interactive: options[:non_interactive],
                yes: options[:yes],
            )

            services = snap["services"] || []
            services = services.select { |s| s["name"] == options[:service] } if options[:service]
            Railway.die!("No matching services in snapshot.") if services.empty?

            services.each do |svc|
                image = svc["image"] || svc["image_tag"]
                Railway.die!("Service #{svc['name']} has no image in snapshot.") if image.nil?

                if options[:dry_run]
                    puts "[dry-run] would redeploy #{svc['name']} -> #{image}"
                    next
                end

                RestoreCommand.pin_and_redeploy(gql,
                    service_id: svc["service_id"], env_id: env_id, image: image)
                puts "redeployed #{svc['name']} -> #{image}"
            end
            0
        end
    end

    # rollback — roll a single service back one deploy (its previous deploy).
    class RollbackCommand < BaseCommand
        # Number of most-recent deployments the query fetches. Used both to
        # build the query and to detect the truncation hazard in
        # find_previous_deployment (a window saturated at this size may have
        # truncated the true previous deploy out of view).
        DEPLOYMENTS_WINDOW = 10

        DEPLOYMENTS_QUERY = <<~GQL
            query Deployments($serviceId: String!, $envId: String!) {
                deployments(
                    first: #{DEPLOYMENTS_WINDOW}
                    input: { serviceId: $serviceId, environmentId: $envId }
                ) { edges { node { id status meta createdAt } } }
            }
        GQL

        # deploymentRollback returns a scalar Boolean — no selection set.
        ROLLBACK_MUTATION = <<~GQL
            mutation Rollback($id: String!) { deploymentRollback(id: $id) }
        GQL

        def parser
            OptionParser.new do |o|
                o.banner = <<~BANNER
                    Usage: bin/railway rollback --env ENV --service NAME [--to DEPLOYMENT_ID] [--yes]

                      Rolls a single service back to its previous successful
                      deploy (or to a specific deployment id with --to).
                      Production requires --yes + typed confirmation.
                BANNER
                o.on("--env ENV") { |v| options[:env] = v }
                o.on("--service NAME") { |v| options[:service] = v }
                o.on("--to ID", "Specific deployment id to roll to") { |v| options[:to] = v }
                o.on("--yes") { options[:yes] = true }
                o.on("--non-interactive") { options[:non_interactive] = true }
                o.on("--dry-run") { options[:dry_run] = true }
                o.on("-h", "--help") { puts o; exit 0 }
            end
        end

        def run
            parser.parse!(argv)
            Railway.die!("--env required") unless options[:env]
            Railway.die!("--service required") unless options[:service]

            env_id = Railway.env_id_for(options[:env])
            Railway.confirm_destructive!(
                env_label: options[:env],
                action: "rollback",
                non_interactive: options[:non_interactive],
                yes: options[:yes],
            )

            service_id = resolve_service_id(env_id, options[:service])

            target_id = options[:to] || find_previous_deployment(service_id, env_id)
            Railway.die!("No previous deployment found for #{options[:service]}.") unless target_id

            if options[:dry_run]
                puts "[dry-run] would rollback #{options[:service]} -> deployment #{target_id}"
                return 0
            end

            gql.query(ROLLBACK_MUTATION, id: target_id)
            puts "rolled back #{options[:service]} -> #{target_id}"
            0
        end

        def resolve_service_id(_env_id, name)
            data = gql.query(SERVICES_LIST_QUERY, projectId: PROJECT_ID)
            (data.dig("project", "services", "edges") || []).each do |e|
                node = e["node"]
                return node["id"] if node["name"] == name
            end
            Railway.die!("Service #{name.inspect} not found.")
        end

        def find_previous_deployment(service_id, env_id)
            data = gql.query(DEPLOYMENTS_QUERY, serviceId: service_id, envId: env_id)
            deployments = (data.dig("deployments", "edges") || []).map { |e| e["node"] }
            # Railway's `deployments` connection returns nodes in an arbitrary
            # order, so sort newest-first by createdAt before selecting — same
            # hazard the sibling fetch_latest_staging_deployments guards against.
            sorted = deployments
                .sort_by { |d| d["createdAt"].to_s }
                .reverse
            # Target = the newest SUCCESS strictly OLDER than the current HEAD
            # deploy (sorted[0]). Drop the head, then take the first SUCCESS in
            # the remainder. This is correct in BOTH directions:
            #   - head=SUCCESS -> the previous SUCCESS (one deploy back)
            #   - head=FAILED/CRASHED -> the newest SUCCESS below the bad head,
            #     i.e. the last-known-good — never skipping a good deploy.
            target = sorted.drop(1).find { |d| d["status"] == "SUCCESS" }
            return target["id"] if target

            # No SUCCESS-before-head in the fetched window. If the window is
            # SATURATED (we got back the full `first: N` page), the true
            # previous deploy may simply have been truncated out of view — so
            # fail loud rather than silently returning nil (which would make
            # rollback a confusing no-op or roll to the wrong place). If fewer
            # than N came back, this is a genuine "no previous" and nil is
            # correct (the caller die!s with a clear message).
            if deployments.size >= DEPLOYMENTS_WINDOW
                Railway.die!(
                    "Could not determine the previous deployment for " \
                    "#{options[:service]} within the #{DEPLOYMENTS_WINDOW} most recent " \
                    "deploys (no successful deploy older than the current one was " \
                    "found in the window). Pass --to <deployment_id> explicitly.",
                )
            end
            nil
        end
    end

    # rollback-commit — roll all services back to the digests captured at a
    # given git SHA's snapshot file.
    class RollbackCommitCommand < BaseCommand
        def parser
            OptionParser.new do |o|
                o.banner = <<~BANNER
                    Usage: bin/railway rollback-commit --env ENV --sha SHA [--yes]

                      Looks up the snapshot file checked into git at SHA, then
                      redeploys every service to the digests recorded there.
                      Effectively a "restore to point-in-time" using committed
                      snapshots.
                BANNER
                o.on("--env ENV") { |v| options[:env] = v }
                o.on("--sha SHA") { |v| options[:sha] = v }
                o.on("--yes") { options[:yes] = true }
                o.on("--non-interactive") { options[:non_interactive] = true }
                o.on("--dry-run") { options[:dry_run] = true }
                o.on("-h", "--help") { puts o; exit 0 }
            end
        end

        def run
            parser.parse!(argv)
            Railway.die!("--env required") unless options[:env]
            Railway.die!("--sha required") unless options[:sha]

            # Locate the most recent snapshot file for env at SHA via `git show`.
            env = options[:env]
            sha = options[:sha]

            # Input validation BEFORE invoking git — these values flow into
            # subprocess invocations and an attacker-controlled `--sha` or
            # `--env` must not be able to inject shell or git arguments.
            # `sha` must look like a git object name (7-40 lowercase hex).
            # `env` must resolve via Railway.env_id_for, which constrains it
            # to the known set ("staging"/"stage"/"production"/"prod"); on miss
            # it calls Railway.die! itself.
            unless sha.is_a?(String) && sha.match?(/\A[0-9a-f]{7,40}\z/)
                Railway.die!("--sha must be a 7-40 char lowercase hex git SHA " \
                    "(got #{sha.inspect}).")
            end
            Railway.env_id_for(env)  # dies with "Unknown env: ..." if invalid.

            # We look in showcase/.railway-snapshots/*-<env>.yaml at SHA, take last.
            # IO.popen with an argv array (no shell string) so sha/env are
            # passed as literal argv to git — never parsed by a shell.
            pathspec = "showcase/.railway-snapshots/*-#{env}.yaml"
            ls_out = IO.popen(["git", "ls-tree", "-r", "--name-only", sha, "--", pathspec],
                err: [:child, :out]) { |io| io.read }
            Railway.die!("git ls-tree failed for #{sha}") unless $?.exitstatus == 0
            entries = ls_out.lines.map(&:strip).reject(&:empty?).sort
            Railway.die!("No snapshot for env=#{env} at #{sha}.") if entries.empty?
            path = entries.last

            yaml = IO.popen(["git", "show", "#{sha}:#{path}"],
                err: [:child, :out]) { |io| io.read }
            Railway.die!("git show failed for #{sha}:#{path}") unless $?.exitstatus == 0
            Railway.die!("git show failed for #{sha}:#{path}") if yaml.nil? || yaml.empty?

            snap = YAML.safe_load(yaml, permitted_classes: [Time, Symbol], aliases: false)

            # Hand off to RestoreCommand's logic by writing to a tmpfile.
            tmp = "/tmp/railway-rollback-commit-#{Process.pid}.yaml"
            File.write(tmp, YAML.dump(snap))
            restore_argv = ["--env", env, "--snapshot", tmp]
            restore_argv << "--yes" if options[:yes]
            restore_argv << "--non-interactive" if options[:non_interactive]
            restore_argv << "--dry-run" if options[:dry_run]
            RestoreCommand.new(restore_argv).run
        ensure
            FileUtils.rm_f(tmp) if defined?(tmp) && tmp
        end
    end

    # promote — copy staging digests into production with prechecks.
    class PromoteCommand < BaseCommand
        class MutationError < StandardError; end

        SERVICE_INSTANCE_RECHECK_QUERY = <<~GQL
            query ServiceInstanceRecheck($serviceId: String!, $envId: String!) {
                serviceInstance(serviceId: $serviceId, environmentId: $envId) {
                    id
                    source { image }
                    updatedAt
                    latestDeployment { id status meta }
                }
            }
        GQL

        RETRY_COUNT     = 3
        RETRY_DELAY_SEC = 10

        # Serving-digest verification (bug #2): after the NEW deployment is
        # spawned via serviceInstanceDeployV2, poll latestDeployment until it
        # reaches SUCCESS and confirm its meta.imageDigest == the pinned digest.
        # The new deployment must pull + boot the image, which takes longer than
        # the config-level recheck, so this poll is wider than RETRY_COUNT.
        SERVING_RETRY_COUNT     = 30
        SERVING_RETRY_DELAY_SEC = 10
        DEPLOY_FAILURE_STATUSES = %w[FAILED CRASHED REMOVED].freeze

        # Normalize a Railway deployment `meta` (sometimes a JSON string) and
        # extract its imageDigest (a `sha256:<hex>` value), or nil.
        def self.serving_digest_from_deployment(deployment)
            return nil unless deployment.is_a?(Hash)
            meta = deployment["meta"]
            meta = (JSON.parse(meta) rescue meta) if meta.is_a?(String)
            return nil unless meta.is_a?(Hash)
            digest = meta["imageDigest"]
            if (digest.nil? || digest.to_s.empty?) && meta["image"].to_s.include?("@sha256:")
                digest = meta["image"].to_s.split("@", 2).last
            end
            digest = nil if digest.is_a?(String) && digest.empty?
            digest
        end

        # Pin + verify: confirms the boolean mutation result AND re-queries
        # serviceInstance to confirm BOTH source.image advanced to the new
        # digest AND updatedAt strictly advanced past the pre-mutation value.
        # 3 retries 10s apart absorb Railway's eventual consistency. THEN
        # verifies the NEW deployment spawned by serviceInstanceDeployV2 reaches
        # SUCCESS and SERVES the pinned digest (bug #2 — config advancing is not
        # enough; the running container must actually pull the pinned image).
        # `sleeper:` is injected for tests.
        #
        # `healthcheck_path:` — the SSOT-declared HTTP healthcheck path for this
        # service+env, or nil when the SSOT tracks NONE. When present, the pin
        # mutation RE-ASSERTS it alongside source.image so a prod instance whose
        # healthcheckPath silently went null (the aimock incident) self-heals to
        # the tracked value on the next promote. When nil, the plain
        # image-only mutation is used and the `healthcheckPath` key is OMITTED
        # entirely — we NEVER send `healthcheckPath: null` (which would actively
        # CLEAR it). This is the durable lever the SSOT-tracking change adds.
        def self.pin_and_verify(gql, service_id:, env_id:, image:, healthcheck_path: nil, sleeper: ->(n) { sleep(n) })
            # Upfront guard: this method's contract is to pin a DIGEST-form
            # image. Pre-fix, a stray tag ref would fall through to retries
            # (since expected_digest stayed nil and image_ok was always false)
            # and ultimately surface as a misleading "did not observe image
            # advance" error after 30s of futile waits.
            unless image.include?("@sha256:")
                raise ArgumentError,
                    "pin_and_verify requires an @sha256:-pinned image, got #{image.inspect}"
            end

            # Capture pre-update serviceInstance.updatedAt so we can gate on a
            # strict advance after the mutation (spec §7.2 P5). Image-equality
            # alone is insufficient — a no-op re-pin to the current value would
            # otherwise appear green. nil pre_ts means "no prior instance" and
            # is treated as a permanent advance below.
            pre_data = gql.query(SERVICE_INSTANCE_RECHECK_QUERY,
                serviceId: service_id, envId: env_id)
            pre_inst = pre_data && pre_data["serviceInstance"]
            pre_update_ts = pre_inst && pre_inst["updatedAt"]

            # Re-assert the SSOT healthcheckPath alongside source.image ONLY when
            # the SSOT declares one. A nil/empty path uses the image-only
            # mutation so the healthcheckPath key is OMITTED (never sent as
            # null, which would clear it) — a live-null service stays null.
            if healthcheck_path.nil? || healthcheck_path.to_s.empty?
                updated = gql.query(RestoreCommand::UPDATE_IMAGE_MUTATION,
                    serviceId: service_id, envId: env_id, image: image)
            else
                updated = gql.query(RestoreCommand::UPDATE_IMAGE_AND_HEALTHCHECK_MUTATION,
                    serviceId: service_id, envId: env_id, image: image,
                    healthcheckPath: healthcheck_path)
            end
            unless updated && updated["serviceInstanceUpdate"] == true
                raise MutationError,
                    "P5: serviceInstanceUpdate returned #{updated.inspect} (expected true) for #{service_id} -> #{image}"
            end

            # Spawn a NEW deployment that PULLS the just-updated source.image.
            # serviceInstanceDeployV2 returns the new deployment id (String!).
            # serviceInstanceRedeploy is deliberately NOT used: it replays the
            # existing deployment's snapshot (its OLD image), so the freshly
            # pinned source.image would never reach the running container
            # (bug #2 — config advanced but prod kept serving the stale digest).
            deployed = gql.query(RestoreCommand::DEPLOY_V2_MUTATION,
                serviceId: service_id, envId: env_id)
            new_deployment_id = deployed && deployed["serviceInstanceDeployV2"]
            unless new_deployment_id.is_a?(String) && !new_deployment_id.empty?
                raise MutationError,
                    "P5: serviceInstanceDeployV2 returned #{deployed.inspect} " \
                    "(expected a new deployment id String) for #{service_id} -> #{image}"
            end

            expected_digest = image.include?("@") ? image.split("@", 2).last : nil
            last_seen_image = nil
            last_seen_ts    = nil
            config_inst     = nil

            RETRY_COUNT.times do |i|
                data = gql.query(SERVICE_INSTANCE_RECHECK_QUERY,
                    serviceId: service_id, envId: env_id)
                inst = data && data["serviceInstance"]
                actual_image  = inst && inst.dig("source", "image")
                actual_ts     = inst && inst["updatedAt"]
                actual_digest = actual_image && actual_image.include?("@") ? actual_image.split("@", 2).last : nil
                last_seen_image = actual_image
                last_seen_ts    = actual_ts

                image_ok = expected_digest && actual_digest == expected_digest
                # Non-vacuous timestamp gate: a non-nil observed updatedAt is
                # ALWAYS required. Pre-fix this collapsed to (pre_ts.nil? ||
                # ...), which made the gate vacuous for new prod services and
                # fell back to digest-equality alone — the exact weakness this
                # gate was added to prevent.
                ts_ok    = actual_ts && (pre_update_ts.nil? || actual_ts > pre_update_ts)
                if image_ok && ts_ok
                    config_inst = inst
                    break
                end

                sleeper.call(RETRY_DELAY_SEC) if i < RETRY_COUNT - 1
            end

            unless config_inst
                raise MutationError,
                    "P5: re-query did not observe image advance to #{image} AND updatedAt > #{pre_update_ts.inspect} " \
                    "after #{RETRY_COUNT} retries (last seen image=#{last_seen_image.inspect}, " \
                    "updatedAt=#{last_seen_ts.inspect}). Refusing to declare promote successful."
            end

            # Bug #2 SERVING-digest gate: config (source.image) advancing is NOT
            # proof the running container actually pulled the pinned image. Poll
            # latestDeployment until the NEW deployment (#{new_deployment_id})
            # reaches SUCCESS, then assert its meta.imageDigest == the pinned
            # digest. Fail loud if it crashes, never converges, or serves a
            # different digest — promote must NOT report success while prod
            # serves a stale image.
            verify_serving_digest!(gql,
                service_id: service_id, env_id: env_id, image: image,
                expected_digest: expected_digest, new_deployment_id: new_deployment_id,
                sleeper: sleeper)

            config_inst
        end

        # Poll the production serviceInstance.latestDeployment until it reaches
        # SUCCESS and SERVES the pinned digest. Raises MutationError on a failed
        # deploy status, on a SUCCESS deployment serving the wrong digest, or if
        # the new deployment never converges within the retry budget.
        def self.verify_serving_digest!(gql, service_id:, env_id:, image:,
                                        expected_digest:, new_deployment_id:,
                                        sleeper: ->(n) { sleep(n) })
            last_status = nil
            last_serving = nil

            SERVING_RETRY_COUNT.times do |i|
                data = gql.query(SERVICE_INSTANCE_RECHECK_QUERY,
                    serviceId: service_id, envId: env_id)
                inst   = data && data["serviceInstance"]
                deploy = inst && inst["latestDeployment"]
                status = deploy && deploy["status"]
                last_status = status

                # A failed/crashed/removed latest deployment can never serve the
                # pin — fail loud immediately rather than burn the full budget.
                # But only hard-fail when the terminal deployment is the NEW one
                # we triggered. Right after DeployV2 spawns new_deployment_id,
                # latestDeployment may still briefly point to the OLD deployment,
                # whose status flips to REMOVED/terminal as it's superseded
                # (Railway eventual consistency). An ungated raise here would
                # FALSELY abort a valid promote, misreporting the old
                # deployment's teardown as the new deployment failing. If it's a
                # stale old deployment, keep polling — the loop will see the new
                # deployment become latest and converge.
                if DEPLOY_FAILURE_STATUSES.include?(status) &&
                        deploy && deploy["id"] == new_deployment_id
                    raise MutationError,
                        "P5: new deployment #{new_deployment_id} for #{service_id} -> #{image} reached " \
                        "terminal status #{status.inspect}. Refusing to declare promote successful — " \
                        "prod is NOT serving the pinned digest."
                end

                if status == "SUCCESS"
                    serving = serving_digest_from_deployment(deploy)
                    last_serving = serving
                    if serving == expected_digest
                        return inst
                    end
                    # SUCCESS but serving a different digest: only acceptable if
                    # this is still the OLD deployment (Railway eventual
                    # consistency — the new deployment id hasn't become
                    # latestDeployment yet). Keep polling. If the NEW deployment
                    # itself succeeded with the wrong digest, that's a hard fail.
                    if deploy && deploy["id"] == new_deployment_id
                        raise MutationError,
                            "P5: new deployment #{new_deployment_id} for #{service_id} succeeded but SERVES " \
                            "#{serving.inspect}, expected #{expected_digest.inspect}. Refusing to declare " \
                            "promote successful — prod is serving a stale image."
                    end
                end

                sleeper.call(SERVING_RETRY_DELAY_SEC) if i < SERVING_RETRY_COUNT - 1
            end

            raise MutationError,
                "P5: prod did not converge to SERVING the pinned digest #{expected_digest.inspect} for " \
                "#{service_id} -> #{image} after #{SERVING_RETRY_COUNT} retries " \
                "(last deploy status=#{last_status.inspect}, last serving digest=#{last_serving.inspect}). " \
                "Refusing to declare promote successful — prod may be serving a stale image."
        end

        def default_options
            super.merge(
                confirm_divergence:    false,
                require_staging_green: true,   # default-on per spec §7.2 P3
                service:               nil,    # optional positional: restrict to ONE service
                digest:                nil,    # override resolved prod-image ref (single-service only)
            )
        end

        def parser
            OptionParser.new do |o|
                o.banner = <<~BANNER
                    Usage: bin/railway promote [SERVICE] [--digest REF] [--yes] [--non-interactive] [--dry-run]

                      Promote staging snapshot to production.

                      SERVICE (optional positional): restrict the promote to a single
                        staging service by name. Without it, the entire staging fleet
                        is promoted (interactive operator use).
                      --digest REF: pin SERVICE's prod image to REF instead of the
                        digest resolved from staging's :latest tag. Only valid with a
                        positional SERVICE; errors otherwise.

                      MOVES: pins prod image to the staging digest (resolved to
                             @sha256: at promote time) and redeploys.
                      VERIFY-REFUSE: P1 staging digest exists in GHCR, P2 latest
                             staging deployment succeeded (+ in-flight race),
                             P3 staging live-green probe, P6 startCommand/
                             healthcheckPath/image-shape parity, service-set
                             parity, critical env-key parity.
                      ADVISORY (report-only, never blocks): P6 region/replicas/
                             restartPolicy divergence, concurrency-key presence,
                             missing expected prod custom domains.
                      IGNORE: env var KEY set, env var VALUES, env-scoped URLs,
                             volumes.

                      Exit 0 on clean promotion, 1 on refuse/findings, 2 on error.
                BANNER
                o.on("--confirm-divergence", "Proceed past WARN findings (region/replicas/etc.)") { options[:confirm_divergence] = true }
                o.on("--require-staging-green", "Require live staging probe green at promote time (default)") { options[:require_staging_green] = true }
                o.on("--no-require-staging-green", "Skip live staging probe (NOT recommended)") { options[:require_staging_green] = false }
                o.on("--digest REF", "Pin SERVICE's prod image to REF (requires positional SERVICE).") { |v| options[:digest] = v }
                o.on("--yes") { options[:yes] = true }
                o.on("--non-interactive") { options[:non_interactive] = true }
                o.on("--dry-run") { options[:dry_run] = true }
                o.on("-h", "--help") { puts o; exit 0 }
            end
        end

        def run
            parser.parse!(argv)

            # Optional positional service name; remaining argv after parse! is
            # whatever the OptionParser left behind. The workflow shape is
            # `promote <svc> [--digest REF]` — one positional only.
            if argv.length > 1
                Railway.die!("promote: too many positional args #{argv.inspect}; expected at most one service name.")
            end
            options[:service] = argv.first if argv.first && !argv.first.empty?

            # --digest is meaningful only with a positional service. Without
            # one it would otherwise silently promote the whole fleet pinned
            # to one (likely wrong) digest — fail fast instead.
            if options[:digest] && options[:service].nil?
                Railway.die!("promote: --digest requires a positional service argument " \
                             "(e.g. `bin/railway promote <service> --digest <ref>`). " \
                             "Refusing to promote the fleet with a single digest.")
            end

            # Validate the positional service against the SSOT
            # (railway-envs.generated.json). Unknown names must hard-fail with
            # the valid set listed so the operator can self-correct; falling
            # through would silently fleet-promote (the original bug).
            if options[:service] && !STAGING_SERVICES.include?(options[:service])
                Railway.die!("promote: unknown service #{options[:service].inspect}. " \
                             "Valid staging services: #{STAGING_SERVICES.join(', ')}.")
            end

            capture_snapshots

            # When restricted to a single service, narrow PER-SERVICE views
            # of both snapshots BEFORE preflight so per-service checks
            # (P1..P3/P6, critical env keys, execute_promotion) operate
            # only on the targeted service. FLEET-SCOPED invariants
            # (check_expected_prod_domains, check_service_set_parity) MUST
            # continue to evaluate against the FULL un-narrowed snapshots
            # — they describe properties of the fleet itself, not the
            # promote target. Co-narrowing them produces two failure
            # modes that the per-service workflow can't tolerate:
            #
            #   (1) check_expected_prod_domains diffs the fleet-wide
            #       EXPECTED_DOMAINS against the union of custom_domains
            #       in `prod["services"]`. Narrowed prod carries at most
            #       one service's domains → the remaining fleet hosts (the
            #       other 4 of the 5 public prod hosts) look "missing" →
            #       spurious WARN → run_with_preflight_only refuses
            #       (workflow doesn't pass --confirm-divergence).
            #   (2) check_service_set_parity is a fleet-shape invariant
            #       (no env has services the other lacks); evaluating it
            #       on the narrowed pair makes it tautological when both
            #       contain the same single name, and only accidentally
            #       fires in the target-absent-from-prod case.
            #
            # We therefore retain references to the full snapshots and
            # only narrow when the per-service branch is taken. Reads go
            # through the fleet_*/target_* accessors below so the
            # fleet-scoped vs per-service distinction is enforced by
            # method name (not by comment, as it was before — two prior
            # regressions came from reading the wrong ivar in a
            # fleet-scoped check).
            @full_staging_snapshot = @staging_snapshot
            @full_prod_snapshot    = @prod_snapshot
            if options[:service]
                narrow_snapshots_to_single_service!(options[:service])
            end

            run_with_preflight_only
        end

        # Narrow @staging_snapshot and @prod_snapshot to only the named
        # service. Staging presence is mandatory (validated against SSOT
        # already); prod absence is tolerated here so the fleet-scoped
        # service-set-parity REFUSE (comparing fleet_staging vs fleet_prod) can surface as
        # the user-facing error rather than a silent rc=0 'success'
        # (find_service returns nil, so execute_promotion would
        # otherwise skip the absent service with no mutation).
        def narrow_snapshots_to_single_service!(name)
            staging_match = (@staging_snapshot["services"] || []).select { |s| s["name"] == name }
            if staging_match.empty?
                Railway.die!("promote: service #{name.inspect} not present in staging snapshot " \
                             "(SSOT lists it but Railway env returned no instance).")
            end
            @staging_snapshot = @staging_snapshot.merge("services" => staging_match)
            prod_match = (@prod_snapshot["services"] || []).select { |s| s["name"] == name }
            @prod_snapshot = @prod_snapshot.merge("services" => prod_match)
        end

        # Test seam — capture_snapshots may be skipped by injecting
        # @staging_snapshot / @prod_snapshot directly.
        def capture_snapshots
            @staging_snapshot ||= SnapshotCommand.new(["--env", "staging", "--dry-run"]).build_snapshot(STAGING_ENV_ID)
            @prod_snapshot    ||= SnapshotCommand.new(["--env", "production", "--dry-run"]).build_snapshot(PRODUCTION_ENV_ID)
        end

        # ── Snapshot accessors ─────────────────────────────────────────────
        #
        # The fleet-vs-target distinction is load-bearing. A "fleet" view
        # is the FULL un-narrowed snapshot (every service in the env). A
        # "target" view is the snapshot scoped to whatever this promote
        # run actually targets — narrowed to one service when the
        # operator passed a positional service argument, equal to the
        # fleet view otherwise.
        #
        # Use:
        #   fleet_staging / fleet_prod   — for FLEET-SHAPE invariants
        #     (service-set parity, expected prod domain set). These
        #     answer questions about the env itself, not the promote
        #     target, and must produce the same verdict regardless of
        #     whether this run is full-fleet or single-service.
        #   target_staging / target_prod — for PER-SERVICE checks
        #     (P1..P3/P6, critical env-key parity, execute_promotion,
        #     resolved_prod_image). These iterate the services we are
        #     actually about to promote.
        #
        # The `|| @*` fallback preserves the existing test seam: legacy
        # promote tests stub @staging_snapshot/@prod_snapshot directly
        # and invoke run_with_preflight_only without going through `run`
        # (which is what sets @full_*). For those tests full == narrowed
        # by construction (full-fleet semantics), so falling back to the
        # target view is correct.
        # Below: fleet_* / target_* accessors are intentionally private to this
        # class (PromoteCommand-internal views of the snapshots). A matching
        # `public` toggle after target_prod restores default visibility for the
        # subsequent run_with_preflight_only / check_* methods.
        private

        def fleet_staging
            @full_staging_snapshot || @staging_snapshot
        end

        def fleet_prod
            @full_prod_snapshot || @prod_snapshot
        end

        def target_staging
            @staging_snapshot
        end

        def target_prod
            @prod_snapshot
        end

        public

        # All preconditions, then (if clean) the actual promote.
        def run_with_preflight_only
            findings = []

            findings.concat(check_p1_ghcr_digests(target_staging))
            findings.concat(check_p2_staging_deployments(target_staging))
            findings.concat(check_p3_staging_live_green(target_staging))
            findings.concat(check_p6_parity(target_staging, target_prod))
            # FLEET-SCOPED invariants — must see the FULL fleet so they
            # produce the same verdict regardless of whether this run is
            # full-fleet or restricted to a single service. See `run` for
            # the fleet_* rationale.
            findings.concat(check_service_set_parity(fleet_staging, fleet_prod, target: options[:service]))
            findings.concat(check_critical_env_key_parity(target_staging, target_prod))
            findings.concat(check_expected_prod_domains(fleet_prod))

            # U5 (spec §5): env + service-ref + resource preflight (Stage 2).
            # §5.2 service-ref assertion (REFUSE), §5.3.2 prod-specific-key
            # assertion (REFUSE; NEVER copies), §5.4 resource/concurrency
            # divergence (WARN; detect-only). Replicate-class env-write (§5.3.1)
            # is opt-in per service via the SSOT `replicateKeys` field and is
            # empty for every service today, so it fires for nothing.
            findings.concat(check_service_refs(target_staging, target_prod))
            findings.concat(assert_prod_specific_keys(target_staging, target_prod,
                prod_specific_keys: ssot_prod_specific_keys(target_staging)))
            findings.concat(replicate_env_keys(target_staging, target_prod,
                replicate_keys: ssot_replicate_keys(target_staging)))
            findings.concat(check_resource_divergence(target_staging, target_prod))

            # Mandatory parity-NOTE that env var VALUES are not compared.
            puts "NOTE: env var VALUES are not compared between staging and prod " \
                 "(intentional — staging and prod hold different secrets/URLs). " \
                 "Only the set of keys is compared."

            # Three-tier disposition (2026-06-22 prod↔staging comparison policy):
            #   REFUSE   — functional-contract violation; blocks unconditionally.
            #   WARN     — blocks unless --confirm-divergence.
            #   ADVISORY — operational/env-specific divergence; reported but
            #              NEVER blocks (region/replicas/restartPolicy, missing
            #              expected prod domains, concurrency-key presence).
            refuses   = findings.select { |f| f.start_with?("REFUSE") }
            warns     = findings.select { |f| f.start_with?("WARN") }
            advisories = findings.select { |f| f.start_with?("ADVISORY") }

            advisories.each { |f| puts f } unless advisories.empty?

            unless refuses.empty?
                refuses.each { |f| puts f }
                puts "Promote refused due to #{refuses.size} REFUSE finding(s)."
                return 1
            end

            unless warns.empty?
                warns.each { |f| puts f }
                unless options[:confirm_divergence]
                    puts "Promote refused: #{warns.size} WARN finding(s). " \
                         "Re-run with --confirm-divergence after inspecting."
                    return 1
                end
                puts "[--confirm-divergence set] proceeding past #{warns.size} WARN finding(s)."
            end

            # Surface staging drift LOUDLY (stdout block + Slack payload) right
            # before the promote proceeds. Drift is NOT a refuse — promote pins
            # staging's RUNNING digest regardless — but someone must know that
            # what shipped to prod is not the current :latest.
            emit_staging_drift_warnings

            Railway.confirm_destructive!(
                env_label: "production",
                action: "promote",
                non_interactive: options[:non_interactive],
                yes: options[:yes],
            )

            execute_promotion(target_staging, target_prod)
        end

        # Resolve a staging service's image to the DIGEST-form ref that should
        # be pinned to prod. The showcase deploy model is STAGING = mutable
        # `:latest` tag, PROD = immutable `@sha256:<digest>`. Pinning prod to
        # a tag would defeat the entire pipeline invariant — and verify-railway-
        # image-refs PROD_SHAPE would later REFUSE.
        #
        # CONTRACT (the whole point of promote): pin prod to WHAT STAGING IS
        # ACTUALLY SERVING RIGHT NOW — staging's RUNNING resolved digest — NOT
        # whatever `:latest` currently points to in GHCR. The staging tag is
        # mutable, so a build that pushed a new `:latest` AFTER staging last
        # deployed makes `resolve_digest(:latest)` return a digest staging
        # never validated. We instead read the digest the staging deployment
        # is running from Railway's `latestDeployment.meta.imageDigest` (the
        # same field image-drift.ts uses as the deployed/running digest), and
        # pin THAT. If that digest != current `:latest`, the caller emits a
        # LOUD drift warning but still promotes the running digest.
        #
        # Returns the canonical `<ghcr.io/org/name>@sha256:<digest>` ref.
        # Returns nil if staging's running digest cannot be resolved; the
        # caller MUST refuse rather than pin a tag.
        def resolved_prod_image(svc)
            # --digest override: when the operator supplied an explicit ref AND
            # we are restricted to a single service, use that ref verbatim. The
            # `run` entry-point already enforces that --digest requires a
            # positional service, and snapshots are narrowed to that service
            # before preflight, so `svc["name"] == options[:service]` here.
            if options[:digest] && options[:service] && svc["name"] == options[:service]
                return options[:digest]
            end

            image = svc["image"] || svc["image_tag"]
            return nil if image.nil? || image.empty?
            return image if image.include?("@sha256:")  # already pinned

            # Pin to staging's RUNNING digest (what I see in staging), not
            # resolve_digest(:latest). Falls back to nil so check_p1 REFUSEs
            # rather than pinning a mutable tag.
            running = staging_running_digest(svc)
            return nil if running.nil?

            # Strip the tag (`:latest`) and append the running digest.
            # parse_image_ref returns the structured parts; rebuild the
            # canonical pinned ref.
            parts = ghcr.parse_image_ref(image)
            registry = parts[:registry] || "ghcr.io"
            "#{registry}/#{parts[:org]}/#{parts[:name]}@#{running}"
        end

        # The digest the staging deployment is ACTUALLY RUNNING right now.
        #
        # Mechanism mirrored from showcase/harness/src/probes/drivers/
        # image-drift.ts: Railway stores tag-only refs in
        # `serviceInstance.source.image` (e.g. `…:latest`), so the running
        # digest is NOT recoverable from the snapshot's `image`/`digest`
        # fields. The real running digest lives on the latest SUCCESS
        # deployment's `meta.imageDigest` (a `sha256:…` value). We read the
        # newest SUCCESS staging deployment and return that digest.
        #
        # Returns the `sha256:<hex>` digest string, or nil if no SUCCESS
        # deployment / no imageDigest is available (caller REFUSEs).
        #
        # Memoized per service-id so P1 (resolve) and the drift check don't
        # double-query Railway for the same service.
        def staging_running_digest(svc)
            sid = svc["service_id"]
            return nil if sid.nil?
            @staging_running_digests ||= {}
            return @staging_running_digests[sid] if @staging_running_digests.key?(sid)

            deployments = fetch_latest_staging_deployments(sid)
            latest = (deployments || []).find { |d| d["status"] == "SUCCESS" }
            digest = nil
            if latest
                meta = latest["meta"]
                meta = (JSON.parse(meta) rescue meta) if meta.is_a?(String)
                if meta.is_a?(Hash)
                    # Prefer meta.imageDigest (the resolved running digest).
                    # Fall back to a digest embedded in meta.image only if it
                    # happens to be digest-pinned (rare for staging).
                    digest = meta["imageDigest"]
                    if (digest.nil? || digest.empty?) && meta["image"].to_s.include?("@sha256:")
                        digest = meta["image"].to_s.split("@", 2).last
                    end
                    digest = nil if digest.is_a?(String) && digest.empty?
                end
            end
            @staging_running_digests[sid] = digest
        end

        # P1 — every staging digest about to be promoted must exist in GHCR.
        # Verifies the DIGEST-form ref that will actually be pinned to prod
        # (resolves tag refs first; refuses if the tag can't be resolved).
        #
        # Side-effect: populates @promote_refs (service_name => digest-pinned
        # ref) so execute_promotion pins the EXACT ref P1 verified — staging
        # `:latest` is mutable, so a second resolve_digest could return a
        # different digest (TOCTOU) and prod would be pinned to a digest P1
        # never verified.
        def check_p1_ghcr_digests(staging)
            findings = []
            # RESET (not memoize): a reused PromoteCommand instance running a
            # second preflight against a different snapshot must not carry
            # stale A-era refs into a B-era promote.
            @promote_refs = {}
            # Drift records accumulated this preflight: each is a Hash
            # { name:, running:, latest: } where the staging RUNNING digest
            # differs from the current GHCR :latest digest. Surfaced loudly on
            # stdout and pushed to the Slack notify payload (GITHUB_OUTPUT).
            @staging_drift = []
            (staging["services"] || []).each do |svc|
                image = svc["image"] || svc["image_tag"]
                if image.nil? || image.empty?
                    # Make this loud — pre-fix this was a silent `next`, and
                    # execute_promotion later emitted a misleading "internal
                    # error" for the same service.
                    findings << "REFUSE: P1 (#{svc['name']}): no image recorded in staging snapshot; " \
                                "cannot promote."
                    next
                end

                # Per-service rescue: an error on one service must not
                # discard findings already accumulated for earlier services.
                # Broadened to StandardError — a non-GHCR error (e.g. an
                # ArgumentError from parse_image_ref, or a network error
                # wrapped as something else) would otherwise bypass the
                # per-service rescue and crash the whole loop.
                begin
                    resolved = resolved_prod_image(svc)
                    if resolved.nil?
                        findings << "REFUSE: P1 (#{svc['name']}): cannot resolve #{image} to a GHCR digest; " \
                                    "refusing to pin prod to a mutable tag."
                        next
                    end

                    case ghcr.manifest_exists(resolved)
                    when :exists
                        # Record the verified ref for use by execute_promotion
                        # — guarantees pin parity with what P1 verified.
                        @promote_refs[svc["name"]] = resolved
                        # DRIFT CHECK: `resolved` pins staging's RUNNING digest
                        # (the contract). Compare it to what `:latest` points
                        # to RIGHT NOW in GHCR. If they differ, staging is NOT
                        # serving `:latest` — promote STILL proceeds with the
                        # running digest, but we record the drift so it can be
                        # surfaced loudly (stdout block + Slack payload). A
                        # nil/identical current :latest (or a non-tag staging
                        # image) means no drift signal.
                        detect_staging_drift(svc, resolved)
                    when :missing
                        findings << "REFUSE: P1 (#{svc['name']}): #{resolved} not found in GHCR " \
                                    "(digest may have been garbage-collected; staging build may not have pushed)."
                    when :auth_failed
                        findings << "REFUSE: P1 (#{svc['name']}): GHCR auth failed for #{resolved}. " \
                                    "Set GHCR_TOKEN (local: 'gh auth token') or GITHUB_TOKEN (CI: workflow token with packages:read)."
                    end
                rescue StandardError => e
                    findings << "REFUSE: P1 (#{svc['name']}): unexpected #{e.class}: #{e.message}"
                end
            end
            findings
        end

        # Compare the digest we are about to pin (`resolved`, = staging's
        # RUNNING digest) against what the staging `:latest` tag points to in
        # GHCR RIGHT NOW. When they differ, staging is NOT serving the current
        # `:latest` — record it so emit_staging_drift_warnings can surface it
        # loudly. Does NOT add a finding (promote PROCEEDS with the running
        # digest — that is the contract); a drift is a visible warning, not a
        # refuse. Best-effort: a GHCR error resolving `:latest` is surfaced as a
        # WARN (never blocks) so a promote whose running digest already verified
        # proceeds, while the operator is not misled into reading a check
        # failure as "no drift".
        def detect_staging_drift(svc, resolved)
            # --digest override: when the operator deliberately pinned an
            # explicit digest for this single service (see resolved_prod_image
            # ~L1511, mirrored by P2's race-skip ~L1775), `resolved` is the
            # operator's CHOSEN override digest — NOT staging's running digest.
            # "Drift from :latest" is meaningless there (the operator made the
            # choice on purpose), and reporting the override as the running
            # digest is a spurious/misleading signal. Skip drift detection,
            # reusing P2's exact override-detection mechanism.
            return if options[:digest] && options[:service] && svc["name"] == options[:service]

            tag_image = svc["image"] || svc["image_tag"]
            # Only meaningful when staging tracks a mutable tag (the normal
            # case). A digest-pinned staging image has no "current :latest" to
            # drift from.
            return if tag_image.nil? || tag_image.include?("@sha256:")

            running_digest = resolved.include?("@") ? resolved.split("@", 2).last : nil
            return if running_digest.nil?

            begin
                current_latest = ghcr.resolve_digest(tag_image)
            rescue StandardError => e
                # Fail loud: the drift check is diagnostic (the pin already uses
                # staging's verified RUNNING digest, so the promote PROCEEDS),
                # but the operator must not mistake a check failure for "no
                # drift". Surface the failure instead of silently swallowing it.
                warn "WARN: staging drift check failed for #{svc['name']}: #{e.message} — " \
                     "could not compare against current :latest"
                return
            end
            return if current_latest.nil?
            return if current_latest == running_digest

            (@staging_drift ||= []) << {
                name:    svc["name"],
                running: running_digest,
                latest:  current_latest,
            }
        end

        # Short 12-char digest for human-readable warnings (`sha256:` stripped).
        def short_digest(d)
            return "(none)" if d.nil? || d.empty?
            d.sub(/^sha256:/, "")[0, 12]
        end

        # Emit the LOUD staging-drift warning to stdout (a bordered block) AND
        # push a drift flag/line to the Slack notify payload via GITHUB_OUTPUT.
        # Called from run_with_preflight_only after preflight, before the
        # destructive-confirm — so it is the last thing an operator sees before
        # the promote proceeds with the (drifted) running digest.
        #
        # No-op when @staging_drift is empty (staging == :latest, the common
        # case) — preserves existing behavior for the non-drift path.
        def emit_staging_drift_warnings
            drift = @staging_drift || []
            return if drift.empty?

            border = "=" * 72
            puts border
            puts "⚠️  STAGING DRIFT — staging is NOT serving the current :latest image"
            puts border
            drift.each do |d|
                puts "  #{d[:name]}: staging RUNNING #{short_digest(d[:running])} " \
                     "!= current :latest #{short_digest(d[:latest])}"
            end
            puts
            puts "  Promote PROCEEDS pinning prod to staging's RUNNING digest (what you"
            puts "  see in staging). Someone should investigate WHY staging is behind"
            puts "  :latest (a newer build pushed :latest after staging last deployed,"
            puts "  a failed/in-flight staging deploy, or a manual pin)."
            puts border

            # Machine-readable marker for the CI wrapper. promote-fleet.sh greps
            # stdout for `STAGING_DRIFT_MARKER:` lines, aggregates them across the
            # fleet loop, and writes ONE `staging_drift=` to $GITHUB_OUTPUT (a
            # per-child append would collide — GitHub keeps only the last value).
            # The workflow notify job then folds it into the Slack payload.
            line = drift.map { |d|
                "#{d[:name]}(running=#{short_digest(d[:running])},latest=#{short_digest(d[:latest])})"
            }.join(", ")
            puts "STAGING_DRIFT_MARKER: #{line}"
        end

        # P2 — for each staging service we are promoting, the most recent
        # staging deployment must be SUCCESS AND its image digest must match
        # the digest we are about to promote (otherwise a newer build is
        # in flight; refuse to avoid the race).
        def check_p2_staging_deployments(staging)
            findings = []
            (staging["services"] || []).each do |svc|
                next if svc["service_id"].nil? || svc["image"].nil?
                deployments = fetch_latest_staging_deployments(svc["service_id"])
                latest = deployments.first
                if latest.nil?
                    findings << "REFUSE: P2 (#{svc['name']}): no staging deployments found."
                    next
                end
                status = latest["status"]
                if status != "SUCCESS"
                    findings << "REFUSE: P2 (#{svc['name']}): latest staging deployment status is #{status}, not SUCCESS."
                    next
                end
                # Railway's Deployment.meta is a JSON scalar that may deserialize
                # as a String (not a Hash). Parse it first; fall back to the
                # WARN branch only if it is still non-Hash after the parse.
                meta = latest["meta"]
                meta = (JSON.parse(meta) rescue meta) if meta.is_a?(String)
                if meta.is_a?(Hash)
                    # The running digest lives in meta.imageDigest (a `sha256:…`
                    # value) — staging refs are tag-form (`…:latest`), so
                    # meta.image has no `@sha256:` and reading it would make
                    # deployed_digest perpetually nil (dead guard). Mirror
                    # staging_running_digest: prefer meta.imageDigest, fall back
                    # to meta.image's `@sha256:` split only if pinned, normalize
                    # empty→nil. Keep apples-to-apples with promote_digest, which
                    # is the bare `sha256:…` form (split on `@`).
                    deployed_digest = meta["imageDigest"]
                    if (deployed_digest.nil? || deployed_digest.empty?) && meta["image"].to_s.include?("@sha256:")
                        deployed_digest = meta["image"].to_s.split("@", 2).last
                    end
                    deployed_digest = nil if deployed_digest.is_a?(String) && deployed_digest.empty?
                    # --digest override: when the operator deliberately pinned an
                    # explicit digest for this single service (see
                    # resolved_prod_image ~L1511), @promote_refs holds the
                    # operator's chosen digest, which legitimately differs from
                    # staging's running digest. Skip the race comparison — the
                    # operator made the choice on purpose; comparing would emit a
                    # spurious REFUSE for the exact case --digest exists to support.
                    digest_override =
                        options[:digest] && options[:service] && svc["name"] == options[:service]
                    # Compare against the digest P1 resolved+verified (recorded
                    # in @promote_refs), NOT svc["digest"] — staging images are
                    # tag-form, so svc["digest"] is nil and the race-check would
                    # otherwise be dead code. P2 runs after P1 in the preflight,
                    # so the entry must exist; if it doesn't, P1 has already
                    # REFUSEd this service and we skip the race comparison.
                    promote_ref = (@promote_refs || {})[svc["name"]]
                    promote_digest = promote_ref && promote_ref.include?("@sha256:") ? promote_ref.split("@", 2).last : nil
                    if !digest_override && promote_digest && deployed_digest && deployed_digest != promote_digest
                        findings << "REFUSE: P2 (#{svc['name']}): in-flight race — latest staging deployment is " \
                                    "#{deployed_digest} but P1 resolved #{promote_digest}. Re-snapshot and retry."
                    end
                else
                    findings << "WARN: P2 (#{svc['name']}): in-flight race check skipped — " \
                                "deployment meta is #{meta.class}, expected Hash."
                end
            end
            findings
        end

        def fetch_latest_staging_deployments(service_id)
            data = gql.query(RollbackCommand::DEPLOYMENTS_QUERY,
                serviceId: service_id, envId: STAGING_ENV_ID)
            nodes = (data.dig("deployments", "edges") || []).map { |e| e["node"] }
            # Railway's deployments query has no explicit order; sort client-side
            # by createdAt descending so `.first` is genuinely the newest. nil
            # createdAt sorts last (treated as oldest).
            nodes.sort_by { |n| n["createdAt"].to_s }.reverse
        end

        # P3 — re-probe staging live at promote time. Authoritative; CI history
        # is not, because showcase_deploy.yml uses cancel-in-progress.
        # Default-on; can be disabled with --no-require-staging-green.
        def check_p3_staging_live_green(staging)
            unless options[:require_staging_green]
                puts "P3 SKIPPED (--no-require-staging-green set; staging is NOT being live-re-probed)."
                return []
            end
            services = (staging["services"] || []).map { |s| s["name"] }.compact.uniq
            return [] if services.empty?
            result = run_staging_probe(services: services)
            return [] if result[:ok]
            ["REFUSE: P3: staging is not green for #{services.join(', ')}: #{result[:summary]}"]
        end

        # Shell out to Workstream A's parameterized probe entrypoint.
        # Contract: exit 0 = green, non-zero = red; stdout is the human summary.
        #
        # Explicitly forward RAILWAY_TOKEN (the probe enumerates Railway state
        # over GraphQL) and GHCR_TOKEN / GITHUB_TOKEN (the probe may need to
        # cross-check GHCR for the digest under test). All three are inherited
        # from the parent process env when set; the explicit hash below makes
        # the dependency visible and survives any future child-env sanitizing.
        def run_staging_probe(services:)
            probe_bin = File.expand_path("../scripts/verify-deploy.ts", __dir__)
            unless File.exist?(probe_bin)
                return { ok: false, summary: "verify-deploy.ts not found at #{probe_bin} — Workstream A dependency missing." }
            end
            services_arg = services.join(",")
            child_env = {
                "RAILWAY_TOKEN" => ENV["RAILWAY_TOKEN"],
                "GHCR_TOKEN"    => ENV["GHCR_TOKEN"],
                "GITHUB_TOKEN"  => ENV["GITHUB_TOKEN"],
                "PATH"          => ENV["PATH"],
                "HOME"          => ENV["HOME"],
            }.compact
            # Use `tsx` (workspace dev dependency) for stdlib-free TS execution.
            # IO.popen preserves a clean child env (Kernel#`` would inherit the
            # parent shell verbatim — fine, but explicit is better for audit).
            # Wrap the launch in a rescue: a missing `npx` (Errno::ENOENT) or
            # any other spawn-time failure should produce a clean REFUSE
            # rather than a raw stack trace bubbling up out of P3.
            begin
                output = IO.popen(child_env, ["npx", "--yes", "tsx", probe_bin,
                    "--env", "staging",
                    "--services", services_arg,
                    err: [:child, :out]]) { |io| io.read }
            rescue Errno::ENOENT, StandardError => e
                return { ok: false, summary: "staging probe failed to launch: #{e.class}: #{e.message}" }
            end
            ok = $?.exitstatus == 0
            { ok: ok, summary: output.lines.last(10).join.strip }
        end

        # P6 — parity matrix.
        #   REFUSE:   startCommand, healthcheckPath, image shape.
        #   ADVISORY: region, replicas, restartPolicy (report-only, never block).
        #   IGNORE:   env-var KEY set (dropped — env-specific by default) and
        #             env-var VALUES (printed as NOTE every run; see
        #             run_with_preflight_only).
        def check_p6_parity(staging, prod)
            findings = []
            (staging["services"] || []).each do |svc|
                pmatch = Railway.find_service(prod, svc["name"])
                next unless pmatch  # service-set parity catches this separately.
                name = svc["name"]

                # REFUSE — startCommand
                if svc["start_command"] != pmatch["start_command"]
                    findings << "REFUSE: P6 (#{name}): startCommand divergence " \
                                "(staging=#{svc['start_command'].inspect} prod=#{pmatch['start_command'].inspect})"
                end

                # REFUSE — healthcheckPath
                if svc["healthcheck_path"] != pmatch["healthcheck_path"]
                    findings << "REFUSE: P6 (#{name}): healthcheckPath divergence " \
                                "(staging=#{svc['healthcheck_path'].inspect} prod=#{pmatch['healthcheck_path'].inspect})"
                end

                # REFUSE — image shape (staging=:tag mutable, prod=@sha256 pinned).
                staging_shape = image_shape(svc["image"])
                prod_shape    = image_shape(pmatch["image"])
                expected_staging_shape = :tag
                expected_prod_shape    = :digest
                if staging_shape != expected_staging_shape || prod_shape != expected_prod_shape
                    findings << "REFUSE: P6 (#{name}): image shape wrong " \
                                "(staging=#{staging_shape} expected=#{expected_staging_shape}; " \
                                "prod=#{prod_shape} expected=#{expected_prod_shape})"
                end

                # ADVISORY — region (operational placement; intentional per-env)
                if svc["region"] != pmatch["region"]
                    findings << "ADVISORY: P6 (#{name}): region divergence " \
                                "(staging=#{svc['region'].inspect} prod=#{pmatch['region'].inspect})"
                end

                # ADVISORY — replicas (capacity scaling; intentional per-env)
                if svc["replicas"] != pmatch["replicas"]
                    findings << "ADVISORY: P6 (#{name}): replicas divergence " \
                                "(staging=#{svc['replicas']} prod=#{pmatch['replicas']})"
                end

                # ADVISORY — restartPolicy (operational policy; not artifact behavior)
                if svc["restart_policy"] != pmatch["restart_policy"]
                    findings << "ADVISORY: P6 (#{name}): restartPolicy divergence " \
                                "(staging=#{svc['restart_policy'].inspect} prod=#{pmatch['restart_policy'].inspect})"
                end

                # env-var KEY-SET diff intentionally DROPPED (IGNORE tier).
                # Env vars are environment-specific by default (NODE_ENV,
                # CVDIAG_*, SHOWCASE_BACKEND_HOST_PATTERN, BROWSER_POOL_*); the
                # blanket set diff was a false-positive engine. The whitelist
                # presence assertion (check_critical_env_key_parity) is the sole
                # env-key gate.
            end
            findings
        end

        # Classify an image ref. :tag (mutable), :digest (immutable @sha256:),
        # :missing, or :other.
        def image_shape(ref)
            return :missing if ref.nil? || ref.empty?
            return :digest  if ref.include?("@sha256:")
            return :tag     if ref.include?(":") && ref.rsplit_colon.last.to_s !~ /\A\s*\z/
            :other
        end

        def check_service_set_parity(staging, prod, target: nil)
            findings = []
            s_names = (staging["services"] || []).map { |s| s["name"] }.sort
            p_names = (prod["services"] || []).map { |s| s["name"] }.sort
            staging_only = s_names - p_names
            prod_only    = p_names - s_names
            # For a single-service promote, the ONLY parity violation that must
            # REFUSE is one involving the TARGET itself — in BOTH directions:
            #   - target in staging but not prod (the silent-rc=0 footgun:
            #     narrowed prod would skip the absent target with no mutation);
            #   - target in prod but not staging (the mirror case).
            # Unrelated services that exist in only one env (e.g. the
            # staging-only harness-workers pool worker, or a deprecated
            # prod-only service) are legitimately single-env and are not this
            # promote's concern, so we scope BOTH arms to the target.
            # (NOTE: the 12 starter-<slug> demos are NO LONGER staging-only —
            # S2 brought them under the SSOT in BOTH envs, so they are normal
            # dual-env services that satisfy parity on their own; they are not
            # an exemption case here.) Full-fleet promotes (target nil) retain
            # full-strictness parity on both arms — there, any env-shape
            # asymmetry across the whole fleet is a genuine REFUSE.
            if target
                staging_only = staging_only & [target]
                prod_only    = prod_only & [target]
            end
            findings << "REFUSE: services in staging not in prod: #{staging_only.join(', ')}" unless staging_only.empty?
            findings << "REFUSE: services in prod not in staging: #{prod_only.join(', ')}" unless prod_only.empty?
            findings
        end

        def check_critical_env_key_parity(staging, prod)
            findings = []
            (staging["services"] || []).each do |svc|
                pmatch = Railway.find_service(prod, svc["name"])
                next unless pmatch
                # Staging-gated: flag a CRITICAL_ENV_KEYS member that staging
                # carries but prod is missing (a real, fixable divergence).
                # Infra/operator tokens (RAILWAY_TOKEN, GHCR_TOKEN, SHARED_SECRET,
                # OPS_TRIGGER_TOKEN, GITHUB_APP_PRIVATE_KEY) live in neither env's
                # container env_keys and must NOT be demanded of every service.
                missing = (CRITICAL_ENV_KEYS & (svc["env_keys"] || [])) - (pmatch["env_keys"] || [])
                findings << "REFUSE: #{svc['name']}: critical env keys missing in prod: #{missing.join(', ')}" unless missing.empty?
            end
            findings
        end

        # ── U5 (spec §5): env + service-ref + resource preflight (Stage 2) ──
        #
        # Concurrency env-var keys that are a RESOURCE/scaling knob rather than a
        # functional contract — divergence is DETECT-and-WARN, never REFUSE
        # (§5.4). Sourced as a literal set; add knobs here as they appear.
        CONCURRENCY_ENV_KEYS = %w[BROWSER_POOL_SIZE].freeze

        # Per-service SSOT entry (the generated.json closure record) by name.
        def ssot_service(name)
            (Railway::SSOT_DATA["services"] || []).find { |s| s["name"] == name }
        end

        # Union of per-service `replicateKeys` declared in the SSOT for the
        # services being promoted (§5.3.1). EMPTY for every service today — the
        # replicate-class env-write mechanism is opt-in and copies nothing until
        # a key is explicitly declared.
        def ssot_replicate_keys(staging)
            (staging["services"] || []).flat_map do |svc|
                (ssot_service(svc["name"]) || {})["replicateKeys"] || []
            end.uniq
        end

        # Union of per-service `prodSpecificKeys` declared in the SSOT for the
        # services being promoted (§5.3.2). These are ASSERTED present in prod,
        # NEVER copied. EMPTY for every service today.
        def ssot_prod_specific_keys(staging)
            (staging["services"] || []).flat_map do |svc|
                (ssot_service(svc["name"]) || {})["prodSpecificKeys"] || []
            end.uniq
        end

        # The env-LOCAL host the target service serves in `env` (the `domains`
        # value from the SSOT). `env` is "prod" or "staging".
        def ssot_target_host(target_name, env)
            entry = ssot_service(target_name)
            entry && entry.dig("domains", env)
        end

        # Read a single (service,env) variable VALUE transiently (never
        # persisted). Returns nil for an absent or sealed (unreadable) variable
        # — callers MUST treat nil as "not present", never as a value to copy.
        def read_service_var(service_id, env_id, name)
            data = gql.query(SERVICE_VARIABLES_QUERY,
                projectId: PROJECT_ID, serviceId: service_id, envId: env_id)
            vars = data && data["variables"]
            return nil unless vars.is_a?(Hash)
            v = vars[name]
            (v.nil? || v.to_s.empty?) ? nil : v
        end

        # §5.2 service-ref assertion (REFUSE). For each promote target carrying
        # `serviceRefs` in the SSOT, assert its PROD env var (e.g.
        # OPENAI_BASE_URL / ANTHROPIC_BASE_URL) points at the env-LOCAL target
        # host (prod->prod aimock). A prod ref resolving to the STAGING target
        # host (or a foreign host) is the `ms-agent-dotnet` cross-env leak class
        # => REFUSE. This is an ASSERTION; it NEVER writes/copies.
        def check_service_refs(staging, prod)
            findings = []
            (staging["services"] || []).each do |svc|
                name = svc["name"]
                pmatch = Railway.find_service(prod, name)
                next unless pmatch
                refs = (ssot_service(name) || {})["serviceRefs"] || []
                refs.each do |ref|
                    key    = ref["key"]
                    target = ref["target"]
                    next if key.nil? || target.nil?
                    prod_host = ssot_target_host(target, "prod")
                    if prod_host.nil?
                        findings << "REFUSE: §5.2 (#{name}): serviceRef #{key} -> #{target} has no prod host in SSOT"
                        next
                    end
                    actual = read_service_var(pmatch["service_id"], PRODUCTION_ENV_ID, key)
                    if actual.nil?
                        findings << "REFUSE: §5.2 (#{name}): prod #{key} is unset/unreadable " \
                                    "(expected to point at #{target}'s prod host #{prod_host})"
                        next
                    end
                    # Compare on HOST only — the env var may be a full URL.
                    unless actual.to_s.include?(prod_host)
                        findings << "REFUSE: §5.2 (#{name}): prod #{key}=#{actual.inspect} does NOT point at " \
                                    "#{target}'s env-LOCAL prod host #{prod_host.inspect} " \
                                    "(cross-env service-ref leak — the ms-agent-dotnet class)"
                    end
                end
            end
            findings
        end

        # §5.3.1 replicate-class env-write (NEW capability). For declared
        # replicate keys ONLY, copy staging's VALUE -> prod via the
        # variable*Upsert family (NOT serviceInstanceUpdate(source.image)),
        # verified by re-read. The default replicate set is EMPTY — the
        # mechanism is opt-in per key and fires for nothing today (R-A:
        # replicating a misclassified prod-specific key would clobber prod).
        def replicate_env_keys(staging, prod, replicate_keys: [])
            findings = []
            keys = (replicate_keys || []).uniq
            return findings if keys.empty?
            (staging["services"] || []).each do |svc|
                name = svc["name"]
                pmatch = Railway.find_service(prod, name)
                next unless pmatch
                keys.each do |key|
                    staging_val = read_service_var(svc["service_id"], STAGING_ENV_ID, key)
                    if staging_val.nil?
                        findings << "WARN: §5.3.1 (#{name}): replicate key #{key} is unset/unreadable in staging; skipping"
                        next
                    end
                    prod_val = read_service_var(pmatch["service_id"], PRODUCTION_ENV_ID, key)
                    if prod_val == staging_val
                        findings << "replicate §5.3.1 (#{name}): #{key} already in sync"
                        next
                    end
                    if options[:dry_run]
                        findings << "replicate §5.3.1 (#{name}): [dry-run] would upsert #{key} (staging->prod)"
                        next
                    end
                    gql.query(VARIABLE_UPSERT_MUTATION,
                        serviceId: pmatch["service_id"], envId: PRODUCTION_ENV_ID,
                        name: key, value: staging_val)
                    # Verify-by-re-read: the upsert must be observable on prod.
                    reread = read_service_var(pmatch["service_id"], PRODUCTION_ENV_ID, key)
                    if reread == staging_val
                        findings << "replicate §5.3.1 (#{name}): upserted #{key} (staging->prod), re-read OK"
                    else
                        findings << "REFUSE: §5.3.1 (#{name}): re-read after upsert of #{key} " \
                                    "did not observe staging value (got #{reread.inspect})"
                    end
                end
            end
            findings
        end

        # §5.3.2/§5.3.3 assert prod-specific keys present + prod-shaped (NEVER
        # copy) and assert secret KEY presence (never value). A prod-specific
        # key (e.g. a domain like NEXT_PUBLIC_POCKETBASE_URL) MUST be ASSERTED,
        # never written — copying staging's value would point prod at staging's
        # datastore (R-A, the highest-blast capability in this PR). This method
        # issues NO mutations by construction.
        def assert_prod_specific_keys(staging, prod, prod_specific_keys: [])
            findings = []
            keys = (prod_specific_keys || []).uniq
            return findings if keys.empty?
            (staging["services"] || []).each do |svc|
                name = svc["name"]
                pmatch = Railway.find_service(prod, name)
                next unless pmatch
                keys.each do |key|
                    prod_val = read_service_var(pmatch["service_id"], PRODUCTION_ENV_ID, key)
                    if prod_val.nil?
                        findings << "REFUSE: §5.3.2 (#{name}): prod-specific key #{key} is missing in prod " \
                                    "(assert-only; NEVER copied from staging)"
                    end
                end
            end
            findings
        end

        # §5.4 concurrency DETECT-and-WARN. Declared concurrency env vars (e.g.
        # BROWSER_POOL_SIZE) join the existing WARN-class divergence set
        # (region/replicas/restartPolicy already WARN in check_p6_parity).
        # DETECT ONLY — never auto-set.
        #
        # NOTE: a limitOverride (CPU/memory cap) WARN was specified here but is
        # NOT implementable: Railway's GraphQL schema exposes NO readable limit
        # field on ServiceInstance (verified by introspection 2026-06 — the only
        # related types are the write-only `serviceInstanceLimitsUpdate` mutation
        # taking ServiceInstanceLimitsUpdateInput{memoryGB,vCPUs}; the
        # ServiceInstanceLimit type is an opaque scalar with no readable fields).
        # Because build_snapshot cannot capture the cap, a divergence comparison
        # would always be nil==nil and the WARN could never fire — it was dead
        # code masked by a hand-injected test fixture. Dropped rather than
        # shipped dead. Re-add only if Railway adds a readable limit field to
        # ServiceInstance (then wire it into SERVICE_INSTANCE_QUERY + the
        # snapshot hash, and re-add a WARN here).
        def check_resource_divergence(staging, prod)
            findings = []
            (staging["services"] || []).each do |svc|
                name = svc["name"]
                pmatch = Railway.find_service(prod, name)
                next unless pmatch

                # ADVISORY — concurrency env-var key presence (e.g. BROWSER_POOL_SIZE).
                # Resource/scaling knob, not a functional contract; report-only.
                staging_keys = svc["env_keys"] || []
                prod_keys    = pmatch["env_keys"] || []
                CONCURRENCY_ENV_KEYS.each do |ck|
                    if staging_keys.include?(ck) != prod_keys.include?(ck)
                        findings << "ADVISORY: §5.4 (#{name}): concurrency env key #{ck} presence diverges " \
                                    "(staging=#{staging_keys.include?(ck)} prod=#{prod_keys.include?(ck)}) " \
                                    "[detect-only]"
                    end
                end
            end
            findings
        end

        def check_expected_prod_domains(prod)
            expected = EXPECTED_DOMAINS[PRODUCTION_ENV_ID] || []
            actual = (prod["services"] || []).flat_map { |s| s["custom_domains"] || [] }.uniq.sort
            missing = expected - actual
            return [] if missing.empty?
            # ADVISORY: a missing public host is a useful operational signal but
            # does not affect whether the promoted digest runs correctly.
            ["ADVISORY: production missing expected custom domains: #{missing.join(', ')}"]
        end

        def execute_promotion(staging, prod)
            # Hard guard: @promote_refs is populated by check_p1_ghcr_digests.
            # If it is nil, preflight did not run — refusing to silently pin
            # nothing (which a memoize default `||= {}` would have done).
            raise "internal error: execute_promotion invoked without preflight (@promote_refs nil)" if @promote_refs.nil?

            # Pre-validation pass: every prod-matched staging service MUST
            # have a digest-shaped @promote_refs entry BEFORE we pin anything.
            # Pre-fix, a missing entry was detected lazily mid-loop, which
            # could leave production partially-promoted (some services pinned
            # to new digest, others still on prior digest). Fail fast here.
            missing = []
            (staging["services"] || []).each do |svc|
                next unless Railway.find_service(prod, svc["name"])
                ref = @promote_refs[svc["name"]]
                if ref.nil? || !ref.include?("@sha256:")
                    missing << "#{svc['name']}=#{ref.inspect}"
                end
            end
            unless missing.empty?
                warn "REFUSE: promote: P1-verified digest ref missing or non-digest for " \
                     "#{missing.join(', ')} (internal error — preflight did not capture these services). " \
                     "Refusing to pin (no mutations issued)."
                return 1
            end

            already_pinned = []
            (staging["services"] || []).each do |svc|
                pmatch = Railway.find_service(prod, svc["name"])
                next unless pmatch
                # Use the EXACT ref P1 resolved+verified. Re-resolving here
                # would risk TOCTOU on staging's mutable `:latest`.
                image = @promote_refs[svc["name"]]
                if options[:dry_run]
                    puts "[dry-run] promote #{svc['name']} -> #{image}"
                    next
                end
                begin
                    # Re-assert the SSOT-tracked prod healthcheckPath on every
                    # promote (the durable fix: a prod instance whose path
                    # silently went null self-heals). Read it from the generated
                    # SSOT record; nil when the service tracks none (live-null) —
                    # pin_and_verify then OMITS the field rather than clearing it.
                    ssot_healthcheck = (ssot_service(svc["name"]) || {})
                        .dig("healthcheckPath", "prod")
                    PromoteCommand.pin_and_verify(gql,
                        service_id: pmatch["service_id"],
                        env_id: PRODUCTION_ENV_ID,
                        image: image,
                        healthcheck_path: ssot_healthcheck)
                    puts "promoted #{svc['name']} -> #{image}"
                    already_pinned << svc["name"]
                # Broadened rescue: a transient GraphQL or network error
                # mid-loop must NOT crash the script (which would lose the
                # PARTIAL-PROMOTION report — its entire reason for existing).
                rescue PromoteCommand::MutationError, Railway::GraphQL::Error, StandardError => e
                    warn "PARTIAL PROMOTION: already pinned #{already_pinned.inspect}; " \
                         "FAILED on #{svc['name']}: #{e.class}: #{e.message}. " \
                         "Production is in a mixed state — note that serviceInstanceUpdate " \
                         "runs BEFORE serviceInstanceDeployV2, so #{svc['name']}'s prod " \
                         "source.image may already be partially advanced on Railway's side. " \
                         "Run 'bin/railway rollback-commit' against the prior snapshot to revert, " \
                         "or re-run promote to retry."
                    return 1
                end
            end
            0
        end
    end

    # pin — pin a service to a specific image digest.
    class PinCommand < BaseCommand
        def parser
            OptionParser.new do |o|
                o.banner = <<~BANNER
                    Usage: bin/railway pin --env ENV --service NAME --image REF [--yes] [--dry-run]

                      Pin a service to a specific image (tag or @sha256:... digest).
                      If a tag is given, resolves it via GHCR first.
                BANNER
                o.on("--env ENV") { |v| options[:env] = v }
                o.on("--service NAME") { |v| options[:service] = v }
                o.on("--image REF") { |v| options[:image] = v }
                o.on("--yes") { options[:yes] = true }
                o.on("--non-interactive") { options[:non_interactive] = true }
                o.on("--dry-run") { options[:dry_run] = true }
                o.on("-h", "--help") { puts o; exit 0 }
            end
        end

        def run
            parser.parse!(argv)
            %i[env service image].each do |k|
                Railway.die!("--#{k} required") unless options[k]
            end

            env_id = Railway.env_id_for(options[:env])
            image = options[:image]

            unless image.include?("@sha256:")
                digest = ghcr.resolve_digest(image)
                Railway.die!("Could not resolve digest for #{image}.") unless digest
                base = image.rsplit_colon.first
                image = "#{base}@#{digest}"
            end

            Railway.confirm_destructive!(
                env_label: options[:env],
                action: "pin",
                non_interactive: options[:non_interactive],
                yes: options[:yes],
            )

            service_id = RollbackCommand.new([]).resolve_service_id(env_id, options[:service])

            if options[:dry_run]
                puts "[dry-run] would pin #{options[:service]} -> #{image}"
                return 0
            end

            RestoreCommand.pin_and_redeploy(gql,
                service_id: service_id, env_id: env_id, image: image)
            puts "pinned #{options[:service]} -> #{image}"
            0
        end
    end

    # env-diff — diff two environments and exit 1 on drift.
    class EnvDiffCommand < BaseCommand
        def parser
            OptionParser.new do |o|
                o.banner = <<~BANNER
                    Usage: bin/railway env-diff ENV_A ENV_B

                      Compare image digests, startCommand, env-var key sets, and
                      custom domains between two envs. Exits 0 if equal, 1 if
                      drift, 2 on error.
                BANNER
                o.on("-h", "--help") { puts o; exit 0 }
            end
        end

        def run
            parser.parse!(argv)
            Railway.die!("two env args required") if argv.length < 2

            a, b = argv[0], argv[1]
            id_a = Railway.env_id_for(a)
            id_b = Railway.env_id_for(b)

            snap_a = SnapshotCommand.new(["--env", a, "--dry-run"]).build_snapshot(id_a)
            snap_b = SnapshotCommand.new(["--env", b, "--dry-run"]).build_snapshot(id_b)

            drift = diff_services(snap_a, snap_b, a, b)

            drift.each { |line| puts line }
            puts drift.empty? ? "OK: #{a} and #{b} agree." : "DRIFT: #{drift.size} finding(s)."
            drift.empty? ? 0 : 1
        end

        # Pure comparison of two already-built snapshots. Returns an array of
        # human-readable drift lines (empty == envs agree). Compares image
        # digest, startCommand, env-var KEY sets, and custom domains.
        def diff_services(snap_a, snap_b, a, b)
            drift = []
            names = (((snap_a["services"] || []) + (snap_b["services"] || [])).map { |s| s["name"] }).uniq.sort
            names.each do |name|
                sa = Railway.find_service(snap_a, name)
                sb = Railway.find_service(snap_b, name)
                if sa.nil?
                    drift << "service #{name}: missing in #{a}"
                    next
                end
                if sb.nil?
                    drift << "service #{name}: missing in #{b}"
                    next
                end

                if sa["digest"] != sb["digest"]
                    drift << "service #{name}: digest #{sa['digest']} != #{sb['digest']}"
                end
                if sa["start_command"] != sb["start_command"]
                    drift << "service #{name}: startCommand differs"
                end
                env_keys_a = sa["env_keys"] || []
                env_keys_b = sb["env_keys"] || []
                missing_in_b = env_keys_a - env_keys_b
                missing_in_a = env_keys_b - env_keys_a
                drift << "service #{name}: env keys missing in #{b}: #{missing_in_b.join(', ')}" unless missing_in_b.empty?
                drift << "service #{name}: env keys missing in #{a}: #{missing_in_a.join(', ')}" unless missing_in_a.empty?

                domains_a = sa["custom_domains"] || []
                domains_b = sb["custom_domains"] || []
                domains_missing_in_b = domains_a - domains_b
                domains_missing_in_a = domains_b - domains_a
                drift << "service #{name}: custom domains missing in #{b}: #{domains_missing_in_b.join(', ')}" unless domains_missing_in_b.empty?
                drift << "service #{name}: custom domains missing in #{a}: #{domains_missing_in_a.join(', ')}" unless domains_missing_in_a.empty?
            end
            drift
        end
    end

    # resolve-digest — resolve an image reference to its digest via GHCR.
    class ResolveDigestCommand < BaseCommand
        def parser
            OptionParser.new do |o|
                o.banner = <<~BANNER
                    Usage: bin/railway resolve-digest IMAGE_REF

                      Resolve a tag like 'ghcr.io/copilotkit/showcase-shell:latest' to its
                      Docker-Content-Digest. Prints sha256:... on stdout. Exits 2 if absent.
                BANNER
                o.on("-h", "--help") { puts o; exit 0 }
            end
        end

        def run
            parser.parse!(argv)
            Railway.die!("image ref required") if argv.empty?

            digest = ghcr.resolve_digest(argv[0])
            Railway.die!("Could not resolve digest for #{argv[0]}.") unless digest
            puts digest
            0
        end
    end

    # lint-prod — verify every prod service is pinned to a digest.
    class LintProdCommand < BaseCommand
        def initialize(argv)
            super
            @exit_zero = false
            @format = "text"
        end

        def parser
            OptionParser.new do |o|
                o.banner = <<~BANNER
                    Usage: bin/railway lint-prod [--exit-zero] [--format text|json]

                      Fails (exit 1) if any production service is NOT pinned to
                      an immutable image digest (must be ghcr.io/...@sha256:...).
                      Intended as a CI gate on every PR touching showcase/.

                      --exit-zero      Always exit 0 even on findings (advisory mode).
                                       Findings still print to stdout.
                      --format FORMAT  Output format: 'text' (default) or 'json'.
                                       JSON shape:
                                         {services:[{name,source,status}],
                                          findings:N, timestamp:"ISO8601"}
                                       'source' is the raw Source.image / image-ref
                                       string. 'status' is 'pinned' or 'mutable-tag'.
                BANNER
                o.on("--exit-zero", "Advisory: exit 0 even when findings exist") { @exit_zero = true }
                o.on("--format FORMAT", %w[text json], "Output format (text|json)") { |v| @format = v }
                o.on("-h", "--help") { puts o; exit 0 }
            end
        end

        def run
            parser.parse!(argv)

            prod = SnapshotCommand.new(["--env", "production", "--dry-run"])
                .build_snapshot(PRODUCTION_ENV_ID)

            services = (prod["services"] || []).map do |svc|
                image = svc["image"].to_s
                status =
                    if image.empty?
                        "mutable-tag"
                    elsif !image.include?("@sha256:")
                        "mutable-tag"
                    else
                        "pinned"
                    end
                { "name" => svc["name"], "source" => image, "status" => status }
            end
            findings = services.reject { |s| s["status"] == "pinned" }

            if @format == "json"
                payload = {
                    "services" => services,
                    "findings" => findings.size,
                    "timestamp" => Time.now.utc.iso8601,
                }
                puts JSON.generate(payload)
                return 0 if findings.empty?
                return 0 if @exit_zero
                return 1
            end

            if findings.empty?
                puts "OK: all production services digest-pinned."
                return 0
            end

            findings.each do |f|
                src = f["source"]
                if src.empty?
                    puts "#{f['name']}: no image set"
                else
                    puts "#{f['name']}: not digest-pinned (image=#{src})"
                end
            end
            puts "DRIFT: #{findings.size} production service(s) not digest-pinned."
            if @exit_zero
                puts "(advisory mode: --exit-zero set; exiting 0)"
                return 0
            end
            1
        end
    end

    # reconcile-prod — detect prod columns that have drifted STALE vs a green
    # staging (Lever 1 of the promote-reliability hardening plan).
    #
    # The showcase deploy model: staging tracks a mutable `:latest` tag and is
    # continuously rebuilt; prod is pinned to an immutable `@sha256:` digest and
    # only advances on an explicit promote. So prod can silently fall BEHIND a
    # green staging — a dead/stale prod column that today is only noticed by
    # eyeballing it. This gate compares, per prod-eligible service, the prod
    # SERVING digest against the staging RUNNING digest and reds the run when
    # any service is stale, so drift is alerted automatically.
    #
    # For each prod-eligible (`probe.prod == true`, the same set
    # resolve-promote-targets.sh promotes) service:
    #   prod serving digest    = the `@sha256:` prod is pinned to (LintProd
    #                            path — SnapshotCommand.build_snapshot(prod)).
    #   staging running digest  = staging's latest SUCCESS deployment
    #                            meta.imageDigest (reuses
    #                            PromoteCommand#staging_running_digest).
    #
    # Classification:
    #   green  — prod == staging (in sync).
    #   stale  — prod != staging AND staging running digest IS resolvable
    #            (prod has drifted behind a green staging — what we alert on).
    #   gray   — staging running digest not resolvable (no SUCCESS deploy / no
    #            imageDigest), OR the service has no prod snapshot entry yet.
    #            Informational, NOT stale — we cannot prove drift.
    #
    # Exit codes: 0 = no stale, 1 = at least one stale, 2 = hard error.
    # Read-only — performs NO promotes / mutations.
    class ReconcileProdCommand < BaseCommand
        STATUS_GREEN = "green"
        STATUS_STALE = "stale"
        STATUS_GRAY  = "gray"

        def initialize(argv)
            super
            @json = false
        end

        def parser
            OptionParser.new do |o|
                o.banner = <<~BANNER
                    Usage: bin/railway reconcile-prod [--json]

                      Detect production services whose serving digest has drifted
                      STALE vs a green staging. For every prod-eligible service
                      (probe.prod == true) compares the prod SERVING digest
                      (immutable @sha256:) against the staging RUNNING digest
                      (staging's latest SUCCESS deployment). Classifies each:

                        green   prod digest == staging running digest (in sync)
                        stale   prod != staging AND staging IS resolvable
                                (prod drifted behind a green staging)
                        gray    staging running digest not resolvable, or the
                                service has no prod snapshot entry yet
                                (informational — NOT stale)

                      Exits 1 iff any service is STALE. Read-only: NO promotes.

                      --json   Emit machine-readable JSON instead of a table:
                                 {services:[{name,prod,staging,status}],
                                  green:N, stale:N, gray:N, timestamp:"ISO8601"}
                BANNER
                o.on("--json", "Emit machine-readable JSON") { @json = true }
                o.on("-h", "--help") { puts o; exit 0 }
            end
        end

        def run
            parser.parse!(argv)
            rows = classify_all

            if @json
                stale = rows.count { |r| r["status"] == STATUS_STALE }
                green = rows.count { |r| r["status"] == STATUS_GREEN }
                gray  = rows.count { |r| r["status"] == STATUS_GRAY }
                puts JSON.generate(
                    "services"  => rows,
                    "green"     => green,
                    "stale"     => stale,
                    "gray"      => gray,
                    "timestamp" => Time.now.utc.iso8601,
                )
                return run_classification(rows)
            end

            print_table(rows)
            run_classification(rows)
        end

        # Classify every prod-eligible service. Returns an array of
        # { "name", "service_id", "prod", "staging", "status" } hashes, sorted
        # by name. `prod`/`staging` are the `sha256:...` digests (or nil).
        def classify_all
            prod_by_sid = prod_digests_by_service_id
            eligible_services.map do |elig|
                sid  = elig["service_id"]
                name = elig["name"]
                prod_digest = prod_by_sid[sid]
                staging_digest = staging_running_digest_for(elig)
                {
                    "name"       => name,
                    "service_id" => sid,
                    "prod"       => prod_digest,
                    "staging"    => staging_digest,
                    "status"     => classify(prod_digest, staging_digest),
                }
            end.sort_by { |r| r["name"].to_s }
        end

        # green  — prod present AND staging present AND equal.
        # stale  — prod present AND staging present AND different.
        # gray   — staging unresolvable, OR prod absent (can't prove drift).
        def classify(prod_digest, staging_digest)
            return STATUS_GRAY if staging_digest.nil? || staging_digest.empty?
            return STATUS_GRAY if prod_digest.nil? || prod_digest.empty?
            prod_digest == staging_digest ? STATUS_GREEN : STATUS_STALE
        end

        # Exit-code contract: 1 iff any service is stale, else 0.
        def run_classification(rows)
            rows.any? { |r| r["status"] == STATUS_STALE } ? 1 : 0
        end

        private

        # prod serving digest per service_id, from the prod snapshot (the same
        # LintProd path). Prefer the parsed `digest` field; fall back to parsing
        # `@sha256:` out of the image ref (prod is always digest-pinned).
        def prod_digests_by_service_id
            snap = build_prod_snapshot
            (snap["services"] || []).each_with_object({}) do |svc, acc|
                sid = svc["service_id"]
                next if sid.nil?
                digest = svc["digest"]
                if (digest.nil? || digest.empty?)
                    image = svc["image"].to_s
                    digest = image.split("@", 2).last if image.include?("@sha256:")
                end
                acc[sid] = digest unless digest.nil? || digest.empty?
            end
        end

        # The prod snapshot — same source LintProdCommand reads. Isolated into a
        # method so tests can inject a fixture.
        def build_prod_snapshot
            SnapshotCommand.new(["--env", "production", "--dry-run"])
                .build_snapshot(PRODUCTION_ENV_ID)
        end

        # The prod-eligible service set: every SSOT service with
        # `probe.prod == true` (the set resolve-promote-targets.sh promotes).
        # Each element is { "name" =>, "service_id" => } — service_id is the
        # SSOT `serviceId`, which equals the snapshot `service_id` (same project
        # service node across envs), so it correlates prod ⇆ staging.
        def eligible_services
            SSOT_DATA.fetch("services")
                .select { |s| s.dig("probe", "prod") == true }
                .map { |s| { "name" => s.fetch("name"), "service_id" => s["serviceId"] } }
        end

        # The staging RUNNING digest for a prod-eligible service. Reuses
        # PromoteCommand#staging_running_digest (latest SUCCESS staging
        # deployment meta.imageDigest) — the same source the promote pin uses —
        # so the comparator and the promote agree on "what staging is serving".
        # Shares this command's GraphQL client so we don't double-auth.
        def staging_running_digest_for(svc)
            promote_helper.send(:staging_running_digest, svc)
        end

        # A PromoteCommand instance used purely as a library for its
        # staging-digest resolution. Shares our gql client. NEVER runs a
        # promote — we only call its read-only digest resolver.
        def promote_helper
            @promote_helper ||= begin
                p = PromoteCommand.new(["--non-interactive"])
                p.instance_variable_set(:@gql, gql)
                p
            end
        end

        def short(d)
            return "(none)" if d.nil? || d.to_s.empty?
            d.to_s.sub(/^sha256:/, "")[0, 12]
        end

        def print_table(rows)
            name_w = ([4] + rows.map { |r| r["name"].to_s.length }).max
            puts format("%-#{name_w}s  %-6s  %-12s  %-12s", "NAME", "STATUS", "PROD", "STAGING")
            rows.each do |r|
                puts format("%-#{name_w}s  %-6s  %-12s  %-12s",
                    r["name"], r["status"], short(r["prod"]), short(r["staging"]))
            end
            stale = rows.count { |r| r["status"] == STATUS_STALE }
            green = rows.count { |r| r["status"] == STATUS_GREEN }
            gray  = rows.count { |r| r["status"] == STATUS_GRAY }
            puts ""
            puts "Summary: #{green} green, #{stale} stale, #{gray} gray (#{rows.size} prod-eligible)."
            if stale.positive?
                names = rows.select { |r| r["status"] == STATUS_STALE }.map { |r| r["name"] }
                puts "STALE: prod has drifted behind green staging for: #{names.join(', ')}."
            else
                puts "OK: no production service is stale vs staging."
            end
        end
    end

    # ── Dispatcher ─────────────────────────────────────────────────────────────

    SUBCOMMANDS = {
        "snapshot"         => SnapshotCommand,
        "restore"          => RestoreCommand,
        "rollback"         => RollbackCommand,
        "rollback-commit"  => RollbackCommitCommand,
        "promote"          => PromoteCommand,
        "pin"              => PinCommand,
        "env-diff"         => EnvDiffCommand,
        "resolve-digest"   => ResolveDigestCommand,
        "lint-prod"        => LintProdCommand,
        "reconcile-prod"   => ReconcileProdCommand,
    }.freeze

    def self.usage
        <<~USAGE
            bin/railway — showcase Railway operations (Ruby, stdlib-only)

            Subcommands:
              snapshot          Capture an env's services + config into a YAML snapshot.
              restore           Restore an env to a snapshot (force-redeploy).
              rollback          Roll a single service back one deploy.
              rollback-commit   Restore an env to the snapshot committed at a given SHA.
              promote           Promote staging digests to production with prechecks.
              pin               Pin a service to a specific image digest.
              env-diff          Diff two envs; exit 1 if drift.
              resolve-digest    Resolve an image tag to its GHCR digest.
              lint-prod         CI gate: fail if any prod service is not digest-pinned.
              reconcile-prod    Drift gate: fail if any prod service is stale vs staging.

            Run any subcommand with --help for full flag list.

            Auth: RAILWAY_TOKEN env var (or ~/.railway/config.json).
            Exit codes: 0 clean, 1 drift/findings, 2 error.
        USAGE
    end

    def self.run(argv)
        if argv.empty? || %w[-h --help help].include?(argv.first)
            puts usage
            return 0
        end

        if argv.first == "--version"
            puts "railway #{VERSION}"
            return 0
        end

        cmd = argv.shift
        klass = SUBCOMMANDS[cmd]
        if klass.nil?
            warn "Unknown subcommand: #{cmd}"
            warn usage
            return 2
        end

        klass.call(argv)
    rescue GraphQL::Error => e
        warn "graphql error: #{e.message}"
        2
    rescue GHCR::Error => e
        warn "ghcr error: #{e.message}"
        2
    rescue StandardError => e
        warn "error: #{e.class}: #{e.message}"
        warn e.backtrace.first(5).join("\n") if ENV["RAILWAY_DEBUG"]
        2
    end
end

# String#rsplit_colon — split on last ':' (so tags with port-style refs are handled).
class String
    def rsplit_colon
        idx = rindex(":")
        return [self, nil] unless idx
        [self[0...idx], self[(idx + 1)..]]
    end
end

# Only run if invoked as a script (not when required by tests).
if $PROGRAM_NAME == __FILE__
    exit Railway.run(ARGV)
end
