>>>inferred_schema.graphqls
"An RFC-3339 compliant Full Date Scalar"
scalar Date

"A DateTime scalar that handles both full RFC3339 and shorter timestamp formats"
scalar DateTime

"A JSON scalar"
scalar JSON

"24-hour clock time value string in the format `hh:mm:ss` or `hh:mm:ss.sss`."
scalar LocalTime

"A 64-bit signed integer"
scalar Long

type OrderCount {
  id: Long!
  number: Long!
  volume: Int!
}

type Orders {
  id: Long!
  customerid: Long!
  time: String!
  items: [Orders_itemsOutput]!
}

type Orders_itemsOutput {
  productid: Int!
  quantity: Int!
  unit_price: Float!
  discount: Float
}

type Query {
  OrderCount(limit: Int = 10, offset: Int = 0): [OrderCount!]
  Orders(limit: Int = 10, offset: Int = 0): [Orders!]
}

enum _McpMethodType {
  NONE
  TOOL
  RESOURCE
}

enum _RestMethodType {
  NONE
  GET
  POST
}

directive @api(mcp: _McpMethodType, rest: _RestMethodType, uri: String) on QUERY | MUTATION | FIELD_DEFINITION

>>>pipeline_explain.txt
=== OrderCount
ID:          default_catalog.default_database.OrderCount
Type:        state
Stage:       flink
Primary key: id
Timestamp:   -
Row count:   ~1e7
---
Schema:
 - id: BIGINT NOT NULL
 - number: BIGINT NOT NULL
 - volume: INTEGER NOT NULL
Inputs:
 - default_catalog.default_database.Orders

=== Orders
ID:          default_catalog.default_database.Orders
Type:        stream
Stage:       flink
Primary key: -
Timestamp:   _source_time
Row count:   ~1e8
---
Schema:
 - id: BIGINT NOT NULL
 - customerid: BIGINT NOT NULL
 - time: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - items: RecordType:peek_no_expand(INTEGER NOT NULL productid, INTEGER NOT NULL quantity, DOUBLE NOT NULL unit_price, DOUBLE discount) NOT NULL ARRAY NOT NULL
 - _source_time: TIMESTAMP_LTZ(3) *ROWTIME*
Inputs:
 - default_catalog.default_database.Orders__base
Annotations:
 - features: DENORMALIZE (feature)

>>>flink-sql-no-functions.sql
CREATE TEMPORARY TABLE `Orders__schema` (
  `id` BIGINT NOT NULL,
  `customerid` BIGINT NOT NULL,
  `time` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `items` ROW(`productid` INTEGER NOT NULL, `quantity` INTEGER NOT NULL, `unit_price` DOUBLE NOT NULL, `discount` DOUBLE) NOT NULL ARRAY NOT NULL
)
WITH (
  'connector' = 'datagen'
);
CREATE TABLE `Orders` (
  `_source_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
  WATERMARK FOR `_source_time` AS `_source_time`
)
WITH (
  'format' = 'avro',
  'properties.bootstrap.servers' = '${KAFKA_BOOTSTRAP_SERVERS}',
  'properties.group.id' = 'datasqrl-orders',
  'topic' = 'dummy-topic',
  'connector' = 'kafka',
  'avro.timestamp_mapping.legacy' = 'false'
)
LIKE `Orders__schema`;
CREATE VIEW `OrderCount`
AS
SELECT `id`, COUNT(1) AS `number`, SUM(`i`.`quantity`) AS `volume`
FROM `Orders` AS `o`
 CROSS JOIN UNNEST(`o`.`items`) AS `i`
GROUP BY `id`;
CREATE TABLE `OrderCount_1` (
  `id` BIGINT NOT NULL,
  `number` BIGINT NOT NULL,
  `volume` INTEGER NOT NULL,
  PRIMARY KEY (`id`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'table-name' = 'OrderCount',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
CREATE TABLE `Orders_2` (
  `id` BIGINT NOT NULL,
  `customerid` BIGINT NOT NULL,
  `time` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `items` RAW('com.datasqrl.flinkrunner.stdlib.json.FlinkJsonType', 'AERjb20uZGF0YXNxcmwuZmxpbmtydW5uZXIuc3RkbGliLmpzb24uRmxpbmtKc29uVHlwZVNlcmlhbGl6ZXJTbmFwc2hvdAAAAAM='),
  `_source_time` TIMESTAMP(3) WITH LOCAL TIME ZONE,
  `__pk_hash` CHAR(32) CHARACTER SET `UTF-16LE`,
  PRIMARY KEY (`__pk_hash`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'sink.on-conflict.action' = 'IGNORE',
  'table-name' = 'Orders',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
EXECUTE STATEMENT SET BEGIN
INSERT INTO `default_catalog`.`default_database`.`OrderCount_1`
SELECT *
 FROM `default_catalog`.`default_database`.`OrderCount`
;
INSERT INTO `default_catalog`.`default_database`.`Orders_2`
 SELECT `id`, `customerid`, `time`, `to_jsonb`(`items`) AS `items`, `_source_time`, `hash_columns`(`id`, `customerid`, `time`, `items`, `_source_time`) AS `__pk_hash`
  FROM `default_catalog`.`default_database`.`Orders`
 ;
 END
>>>kafka.json
{
  "topics" : [ ],
  "testRunnerTopics" : [ ]
}
>>>postgres-schema.sql
CREATE TABLE IF NOT EXISTS "OrderCount" ("id" BIGINT NOT NULL, "number" BIGINT NOT NULL, "volume" INTEGER NOT NULL, PRIMARY KEY ("id"));
CREATE TABLE IF NOT EXISTS "Orders" ("id" BIGINT NOT NULL, "customerid" BIGINT NOT NULL, "time" TEXT NOT NULL, "items" JSONB, "_source_time" TIMESTAMP WITH TIME ZONE, "__pk_hash" TEXT, PRIMARY KEY ("__pk_hash"))
>>>postgres-views.sql

>>>vertx.json
{
  "models" : {
    "v1" : {
      "queries" : [
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "OrderCount",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM \"OrderCount\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        },
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "Orders",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT \"id\", \"customerid\", \"time\", \"items\", \"_source_time\"\nFROM \"Orders\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        }
      ],
      "mutations" : [ ],
      "subscriptions" : [ ],
      "operations" : [
        {
          "function" : {
            "name" : "GetOrderCount",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query OrderCount($limit: Int = 10, $offset: Int = 0) {\nOrderCount(limit: $limit, offset: $offset) {\nid\nnumber\nvolume\n}\n\n}",
            "queryName" : "OrderCount",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/OrderCount{?offset,limit}"
        },
        {
          "function" : {
            "name" : "GetOrders",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query Orders($limit: Int = 10, $offset: Int = 0) {\nOrders(limit: $limit, offset: $offset) {\nid\ncustomerid\ntime\nitems {\nproductid\nquantity\nunit_price\ndiscount\n}\n}\n\n}",
            "queryName" : "Orders",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/Orders{?offset,limit}"
        }
      ],
      "schema" : {
        "type" : "string",
        "schema" : "\"An RFC-3339 compliant Full Date Scalar\"\nscalar Date\n\n\"A DateTime scalar that handles both full RFC3339 and shorter timestamp formats\"\nscalar DateTime\n\n\"A JSON scalar\"\nscalar JSON\n\n\"24-hour clock time value string in the format `hh:mm:ss` or `hh:mm:ss.sss`.\"\nscalar LocalTime\n\n\"A 64-bit signed integer\"\nscalar Long\n\ntype OrderCount {\n  id: Long!\n  number: Long!\n  volume: Int!\n}\n\ntype Orders {\n  id: Long!\n  customerid: Long!\n  time: String!\n  items: [Orders_itemsOutput]!\n}\n\ntype Orders_itemsOutput {\n  productid: Int!\n  quantity: Int!\n  unit_price: Float!\n  discount: Float\n}\n\ntype Query {\n  OrderCount(limit: Int = 10, offset: Int = 0): [OrderCount!]\n  Orders(limit: Int = 10, offset: Int = 0): [Orders!]\n}\n\nenum _McpMethodType {\n  NONE\n  TOOL\n  RESOURCE\n}\n\nenum _RestMethodType {\n  NONE\n  GET\n  POST\n}\n\ndirective @api(mcp: _McpMethodType, rest: _RestMethodType, uri: String) on QUERY | MUTATION | FIELD_DEFINITION\n"
      }
    }
  }
}
