>>>inferred_schema.graphqls
"An RFC-3339 compliant Full Date Scalar"
scalar Date

"A DateTime scalar that handles both full RFC3339 and shorter timestamp formats"
scalar DateTime

type DistinctOrders {
  id: Long!
  customerid: Long!
  time: DateTime!
  productid: Long!
  quantity: Long!
  unit_price: Float!
}

"A JSON scalar"
scalar JSON

"24-hour clock time value string in the format `hh:mm:ss` or `hh:mm:ss.sss`."
scalar LocalTime

"A 64-bit signed integer"
scalar Long

type Orders {
  id: Long!
  customerid: Long!
  time: DateTime!
  productid: Long!
  quantity: Long!
  unit_price: Float!
}

type Query {
  DistinctOrders(limit: Int = 10, offset: Int = 0): [DistinctOrders!]
  Orders(limit: Int = 10, offset: Int = 0): [Orders!]
}

enum _McpMethodType {
  NONE
  TOOL
  RESOURCE
}

enum _RestMethodType {
  NONE
  GET
  POST
}

directive @api(mcp: _McpMethodType, rest: _RestMethodType, uri: String) on QUERY | MUTATION | FIELD_DEFINITION

>>>pipeline_explain.txt
=== DistinctOrders
ID:          default_catalog.default_database.DistinctOrders
Type:        state
Stage:       flink
Primary key: id
Timestamp:   _ingest_time
Row count:   ~2e6
---
Schema:
 - id: BIGINT NOT NULL
 - customerid: BIGINT NOT NULL
 - time: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL
 - productid: BIGINT NOT NULL
 - quantity: BIGINT NOT NULL
 - unit_price: DOUBLE NOT NULL
 - _ingest_time: TIMESTAMP_LTZ(3) *PROCTIME* NOT NULL
Inputs:
 - default_catalog.default_database.Orders
Annotations:
 - stream-root: Orders

=== Orders
ID:          default_catalog.default_database.Orders
Type:        stream
Stage:       flink
Primary key: id, customerid, time
Timestamp:   _ingest_time
Row count:   ~1e8
---
Schema:
 - id: BIGINT NOT NULL
 - customerid: BIGINT NOT NULL
 - time: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL
 - productid: BIGINT NOT NULL
 - quantity: BIGINT NOT NULL
 - unit_price: DOUBLE NOT NULL
 - _ingest_time: TIMESTAMP_LTZ(3) *PROCTIME* NOT NULL
Inputs:
 - default_catalog.default_database.Orders__base
Annotations:
 - stream-root: Orders

>>>flink-sql-no-functions.sql
CREATE TEMPORARY TABLE `Orders__schema` (
  `id` BIGINT NOT NULL,
  `customerid` BIGINT NOT NULL,
  `time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  `productid` BIGINT NOT NULL,
  `quantity` BIGINT NOT NULL,
  `unit_price` DOUBLE NOT NULL
)
WITH (
  'connector' = 'filesystem',
  'format' = 'flexible-json',
  'path' = '${DATA_PATH}/orders.jsonl'
);
CREATE TABLE `Orders` (
  `_ingest_time` AS `PROCTIME`(),
  PRIMARY KEY (`id`, `customerid`, `time`) NOT ENFORCED
)
WITH (
  'source.monitor-interval' = '10000'
)
LIKE `Orders__schema`;
CREATE VIEW `DistinctOrders`
AS
SELECT `id`, `customerid`, `time`, `productid`, `quantity`, `unit_price`, `_ingest_time`
FROM (SELECT `id`, `customerid`, `time`, `productid`, `quantity`, `unit_price`, `_ingest_time`, ROW_NUMBER() OVER (PARTITION BY `id` ORDER BY `time` DESC NULLS LAST) AS `$f7`
  FROM (SELECT `id`, `customerid`, `time`, `productid`, `quantity`, `unit_price`, `_ingest_time`
    FROM (SELECT `id`, `customerid`, `time`, `productid`, `quantity`, `unit_price`, `_ingest_time`, LAG(`customerid`, 1) OVER (PARTITION BY `id` ORDER BY `_ingest_time`) AS `$f7`, LAG(`productid`, 1) OVER (PARTITION BY `id` ORDER BY `_ingest_time`) AS `$f8`, LAG(`quantity`, 1) OVER (PARTITION BY `id` ORDER BY `_ingest_time`) AS `$f9`, LAG(`unit_price`, 1) OVER (PARTITION BY `id` ORDER BY `_ingest_time`) AS `$f10`
      FROM (SELECT `id`, `customerid`, `time`, `productid`, `quantity`, `unit_price`, `_ingest_time`, MAX(`time`) OVER (PARTITION BY `id` ORDER BY `_ingest_time` RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `$f7`
        FROM `default_catalog`.`default_database`.`Orders`) AS `t`
      WHERE `time` >= `$f7`) AS `t1`
    WHERE `$f7` IS NULL AND `$f8` IS NULL AND `$f9` IS NULL AND `$f10` IS NULL OR `customerid` <> `$f7` OR `productid` <> `$f8` OR `quantity` <> `$f9` OR `unit_price` <> `$f10`) AS `t3`) AS `t4`
WHERE `$f7` = 1;
CREATE TABLE `DistinctOrders_1` (
  `id` BIGINT NOT NULL,
  `customerid` BIGINT NOT NULL,
  `time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  `productid` BIGINT NOT NULL,
  `quantity` BIGINT NOT NULL,
  `unit_price` DOUBLE NOT NULL,
  `_ingest_time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  PRIMARY KEY (`id`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'sink.on-conflict.action' = 'TIMESTAMP',
  'sink.on-conflict.timestamp-column' = '_ingest_time',
  'table-name' = 'DistinctOrders',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
CREATE TABLE `Orders_2` (
  `id` BIGINT NOT NULL,
  `customerid` BIGINT NOT NULL,
  `time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  `productid` BIGINT NOT NULL,
  `quantity` BIGINT NOT NULL,
  `unit_price` DOUBLE NOT NULL,
  `_ingest_time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  PRIMARY KEY (`id`, `customerid`, `time`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'sink.on-conflict.action' = 'IGNORE',
  'table-name' = 'Orders',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
EXECUTE STATEMENT SET BEGIN
INSERT INTO `default_catalog`.`default_database`.`DistinctOrders_1`
SELECT *
 FROM `default_catalog`.`default_database`.`DistinctOrders`
;
INSERT INTO `default_catalog`.`default_database`.`Orders_2`
 SELECT *
  FROM `default_catalog`.`default_database`.`Orders`
 ;
 END
>>>kafka.json
{
  "topics" : [ ],
  "testRunnerTopics" : [ ]
}
>>>postgres-schema.sql
CREATE TABLE IF NOT EXISTS "DistinctOrders" ("id" BIGINT NOT NULL, "customerid" BIGINT NOT NULL, "time" TIMESTAMP WITH TIME ZONE NOT NULL, "productid" BIGINT NOT NULL, "quantity" BIGINT NOT NULL, "unit_price" DOUBLE PRECISION NOT NULL, "_ingest_time" TIMESTAMP WITH TIME ZONE NOT NULL, PRIMARY KEY ("id"));
CREATE TABLE IF NOT EXISTS "Orders" ("id" BIGINT NOT NULL, "customerid" BIGINT NOT NULL, "time" TIMESTAMP WITH TIME ZONE NOT NULL, "productid" BIGINT NOT NULL, "quantity" BIGINT NOT NULL, "unit_price" DOUBLE PRECISION NOT NULL, "_ingest_time" TIMESTAMP WITH TIME ZONE NOT NULL, PRIMARY KEY ("id","customerid","time"))
>>>postgres-views.sql

>>>vertx.json
{
  "models" : {
    "v1" : {
      "queries" : [
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "DistinctOrders",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM \"DistinctOrders\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        },
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "Orders",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM \"Orders\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        }
      ],
      "mutations" : [ ],
      "subscriptions" : [ ],
      "operations" : [
        {
          "function" : {
            "name" : "GetDistinctOrders",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query DistinctOrders($limit: Int = 10, $offset: Int = 0) {\nDistinctOrders(limit: $limit, offset: $offset) {\nid\ncustomerid\ntime\nproductid\nquantity\nunit_price\n}\n\n}",
            "queryName" : "DistinctOrders",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/DistinctOrders{?offset,limit}"
        },
        {
          "function" : {
            "name" : "GetOrders",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query Orders($limit: Int = 10, $offset: Int = 0) {\nOrders(limit: $limit, offset: $offset) {\nid\ncustomerid\ntime\nproductid\nquantity\nunit_price\n}\n\n}",
            "queryName" : "Orders",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/Orders{?offset,limit}"
        }
      ],
      "schema" : {
        "type" : "string",
        "schema" : "\"An RFC-3339 compliant Full Date Scalar\"\nscalar Date\n\n\"A DateTime scalar that handles both full RFC3339 and shorter timestamp formats\"\nscalar DateTime\n\ntype DistinctOrders {\n  id: Long!\n  customerid: Long!\n  time: DateTime!\n  productid: Long!\n  quantity: Long!\n  unit_price: Float!\n}\n\n\"A JSON scalar\"\nscalar JSON\n\n\"24-hour clock time value string in the format `hh:mm:ss` or `hh:mm:ss.sss`.\"\nscalar LocalTime\n\n\"A 64-bit signed integer\"\nscalar Long\n\ntype Orders {\n  id: Long!\n  customerid: Long!\n  time: DateTime!\n  productid: Long!\n  quantity: Long!\n  unit_price: Float!\n}\n\ntype Query {\n  DistinctOrders(limit: Int = 10, offset: Int = 0): [DistinctOrders!]\n  Orders(limit: Int = 10, offset: Int = 0): [Orders!]\n}\n\nenum _McpMethodType {\n  NONE\n  TOOL\n  RESOURCE\n}\n\nenum _RestMethodType {\n  NONE\n  GET\n  POST\n}\n\ndirective @api(mcp: _McpMethodType, rest: _RestMethodType, uri: String) on QUERY | MUTATION | FIELD_DEFINITION\n"
      }
    }
  }
}
