>>>inferred_schema.graphqls
type AppendStream {
  k: String!
  v_sum: Int!
}

"An RFC-3339 compliant Full Date Scalar"
scalar Date

"A DateTime scalar that handles both full RFC3339 and shorter timestamp formats"
scalar DateTime

type InputAgg {
  k: String!
  v_sum: Int!
}

"A JSON scalar"
scalar JSON

"24-hour clock time value string in the format `hh:mm:ss` or `hh:mm:ss.sss`."
scalar LocalTime

"A 64-bit signed integer"
scalar Long

type Query {
  AppendStream(limit: Int = 10, offset: Int = 0): [AppendStream!]
  InputAgg(limit: Int = 10, offset: Int = 0): [InputAgg!]
}

enum _McpMethodType {
  NONE
  TOOL
  RESOURCE
}

enum _RestMethodType {
  NONE
  GET
  POST
}

directive @api(mcp: _McpMethodType, rest: _RestMethodType, uri: String) on QUERY | MUTATION | FIELD_DEFINITION

>>>pipeline_explain.txt
=== AppendStream
ID:          default_catalog.default_database.AppendStream
Type:        relation
Stage:       flink
Primary key: k, v_sum
Timestamp:   -
Row count:   ~3
---
Schema:
 - k: CHAR(1) CHARACTER SET "UTF-16LE" NOT NULL
 - v_sum: INTEGER NOT NULL

=== InputAgg
ID:          default_catalog.default_database.InputAgg
Type:        state
Stage:       flink
Primary key: k
Timestamp:   -
Row count:   ~1e7
---
Schema:
 - k: CHAR(1) CHARACTER SET "UTF-16LE" NOT NULL
 - v_sum: INTEGER NOT NULL
Inputs:
 - default_catalog.default_database._InputValues

=== _InputValues
ID:          default_catalog.default_database._InputValues
Type:        state
Stage:       flink
Primary key: k
Timestamp:   -
Row count:   ~1e1
---
Schema:
 - k: CHAR(1) CHARACTER SET "UTF-16LE" NOT NULL
 - v: INTEGER NOT NULL

>>>flink-sql-no-functions.sql
CREATE VIEW `_InputValues`
AS
SELECT `k`, `v`
FROM (VALUES ROW('a', 1),
   ROW('b', 2),
   ROW('a', 1),
   ROW('c', 4),
   ROW('b', 3),
   ROW('a', 4),
   ROW('c', 1),
   ROW('b', 5),
   ROW('d', 2),
   ROW('e', 1)) AS `t` (`k`, `v`);
CREATE VIEW `InputAgg`
AS
SELECT `k`, SUM(`v`) AS `v_sum`
FROM `_InputValues`
GROUP BY `k`;
CREATE VIEW `AppendStream`
AS
SELECT `k`, `v_sum`
FROM TABLE(`to_append_stream`(TABLE `InputAgg`));
CREATE TABLE `AppendStream_1` (
  `k` CHAR(1) CHARACTER SET `UTF-16LE` NOT NULL,
  `v_sum` INTEGER NOT NULL,
  PRIMARY KEY (`k`, `v_sum`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'table-name' = 'AppendStream',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
CREATE TABLE `InputAgg_2` (
  `k` CHAR(1) CHARACTER SET `UTF-16LE` NOT NULL,
  `v_sum` INTEGER NOT NULL,
  PRIMARY KEY (`k`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'table-name' = 'InputAgg',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
EXECUTE STATEMENT SET BEGIN
INSERT INTO `default_catalog`.`default_database`.`AppendStream_1`
SELECT *
 FROM `default_catalog`.`default_database`.`AppendStream`
;
INSERT INTO `default_catalog`.`default_database`.`InputAgg_2`
 SELECT *
  FROM `default_catalog`.`default_database`.`InputAgg`
 ;
 END
>>>kafka.json
{
  "topics" : [ ],
  "testRunnerTopics" : [ ]
}
>>>postgres-schema.sql
CREATE TABLE IF NOT EXISTS "AppendStream" ("k" TEXT NOT NULL, "v_sum" INTEGER NOT NULL, PRIMARY KEY ("k","v_sum"));
CREATE TABLE IF NOT EXISTS "InputAgg" ("k" TEXT NOT NULL, "v_sum" INTEGER NOT NULL, PRIMARY KEY ("k"))
>>>postgres-views.sql

>>>vertx.json
{
  "models" : {
    "v1" : {
      "queries" : [
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "AppendStream",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM \"AppendStream\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        },
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "InputAgg",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM \"InputAgg\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        }
      ],
      "mutations" : [ ],
      "subscriptions" : [ ],
      "operations" : [
        {
          "function" : {
            "name" : "GetAppendStream",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query AppendStream($limit: Int = 10, $offset: Int = 0) {\nAppendStream(limit: $limit, offset: $offset) {\nk\nv_sum\n}\n\n}",
            "queryName" : "AppendStream",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/AppendStream{?offset,limit}"
        },
        {
          "function" : {
            "name" : "GetInputAgg",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query InputAgg($limit: Int = 10, $offset: Int = 0) {\nInputAgg(limit: $limit, offset: $offset) {\nk\nv_sum\n}\n\n}",
            "queryName" : "InputAgg",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/InputAgg{?offset,limit}"
        }
      ],
      "schema" : {
        "type" : "string",
        "schema" : "type AppendStream {\n  k: String!\n  v_sum: Int!\n}\n\n\"An RFC-3339 compliant Full Date Scalar\"\nscalar Date\n\n\"A DateTime scalar that handles both full RFC3339 and shorter timestamp formats\"\nscalar DateTime\n\ntype InputAgg {\n  k: String!\n  v_sum: Int!\n}\n\n\"A JSON scalar\"\nscalar JSON\n\n\"24-hour clock time value string in the format `hh:mm:ss` or `hh:mm:ss.sss`.\"\nscalar LocalTime\n\n\"A 64-bit signed integer\"\nscalar Long\n\ntype Query {\n  AppendStream(limit: Int = 10, offset: Int = 0): [AppendStream!]\n  InputAgg(limit: Int = 10, offset: Int = 0): [InputAgg!]\n}\n\nenum _McpMethodType {\n  NONE\n  TOOL\n  RESOURCE\n}\n\nenum _RestMethodType {\n  NONE\n  GET\n  POST\n}\n\ndirective @api(mcp: _McpMethodType, rest: _RestMethodType, uri: String) on QUERY | MUTATION | FIELD_DEFINITION\n"
      }
    }
  }
}
