>>>pipeline_explain.txt
=== CustomerAggregate1
ID:          default_catalog.default_database.CustomerAggregate1
Type:        state
Stage:       flink
Primary key: customerid
Timestamp:   -
Row count:   ~1e8
---
Schema:
 - customerid: BIGINT NOT NULL
 - updates: BIGINT NOT NULL
Inputs:
 - default_catalog.sources.CustomerAggregate
Plan:
LogicalProject(customerid=[$0], updates=[$1])
  LogicalTableScan(table=[[default_catalog, sources, CustomerAggregate]])
SQL:
CREATE VIEW `CustomerAggregate1` AS  SELECT customerid, num_updates * 1 AS updates FROM sources.CustomerAggregate;

=== CustomerAggregate2
ID:          default_catalog.default_database.CustomerAggregate2
Type:        state
Stage:       flink
Primary key: customerid
Timestamp:   -
Row count:   ~1e8
---
Schema:
 - customerid: BIGINT NOT NULL
 - updates: BIGINT NOT NULL
Inputs:
 - default_catalog.mysources.CustomerAggregate
Plan:
LogicalProject(customerid=[$0], updates=[*($1, 2)])
  LogicalTableScan(table=[[default_catalog, mysources, CustomerAggregate]])
SQL:
CREATE VIEW `CustomerAggregate2` AS  SELECT customerid, num_updates * 2 AS updates FROM mysources.CustomerAggregate;

=== Customer
ID:          default_catalog.mysources.Customer
Type:        stream
Stage:       flink
Primary key: customerid, lastUpdated
Timestamp:   timestamp
Row count:   ~1e8
---
Schema:
 - customerid: BIGINT NOT NULL
 - email: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - name: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - lastUpdated: BIGINT NOT NULL
 - timestamp: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
Inputs:
 - default_catalog.mysources.Customer__base
Annotations:
 - stream-root: Customer
Plan:
LogicalWatermarkAssigner(rowtime=[timestamp], watermark=[-($4, 1:INTERVAL SECOND)])
  LogicalProject(customerid=[$0], email=[$1], name=[$2], lastUpdated=[$3], timestamp=[COALESCE(TO_TIMESTAMP_LTZ($3, 0), 1970-01-01 08:00:00:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))])
    LogicalTableScan(table=[[default_catalog, mysources, Customer]])
SQL:
CREATE TEMPORARY TABLE `Customer__schema` (
  `customerid` BIGINT NOT NULL,
  `email` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `lastUpdated` BIGINT NOT NULL
)
WITH (
  'connector' = 'datagen'
);
CREATE TABLE `Customer` (
  `timestamp` AS COALESCE(`TO_TIMESTAMP_LTZ`(`lastUpdated`, 0), CAST(TIMESTAMP '1970-01-01 00:00:00.000' AS TIMESTAMP(3) WITH LOCAL TIME ZONE)),
  PRIMARY KEY (`customerid`, `lastUpdated`) NOT ENFORCED,
  WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '0.001' SECOND
)
WITH (
  'format' = 'flexible-json',
  'path' = 'file:/mock',
  'source.monitor-interval' = '10 sec',
  'connector' = 'filesystem'
)
LIKE `Customer__schema`
=== CustomerAggregate
ID:          default_catalog.mysources.CustomerAggregate
Type:        state
Stage:       flink
Primary key: customerid
Timestamp:   -
Row count:   ~1e7
---
Schema:
 - customerid: BIGINT NOT NULL
 - num_updates: BIGINT NOT NULL
Inputs:
 - default_catalog.mysources.Customer
Plan:
LogicalAggregate(group=[{0}], num_updates=[COUNT()])
  LogicalProject(customerid=[$0])
    LogicalTableScan(table=[[default_catalog, mysources, Customer]])
SQL:
CREATE VIEW `CustomerAggregate` AS  SELECT customerid, COUNT(*) AS num_updates FROM Customer GROUP BY customerid;

=== Customer
ID:          default_catalog.sources.Customer
Type:        stream
Stage:       flink
Primary key: customerid, lastUpdated
Timestamp:   timestamp
Row count:   ~1e8
---
Schema:
 - customerid: BIGINT NOT NULL
 - email: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - name: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - lastUpdated: BIGINT NOT NULL
 - timestamp: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
Inputs:
 - default_catalog.sources.Customer__base
Annotations:
 - stream-root: Customer
Plan:
LogicalWatermarkAssigner(rowtime=[timestamp], watermark=[-($4, 1:INTERVAL SECOND)])
  LogicalProject(customerid=[$0], email=[$1], name=[$2], lastUpdated=[$3], timestamp=[COALESCE(TO_TIMESTAMP_LTZ($3, 0), 1970-01-01 08:00:00:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))])
    LogicalTableScan(table=[[default_catalog, sources, Customer]])
SQL:
CREATE TEMPORARY TABLE `Customer__schema` (
  `customerid` BIGINT NOT NULL,
  `email` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `lastUpdated` BIGINT NOT NULL
)
WITH (
  'connector' = 'datagen'
);
CREATE TABLE `Customer` (
  `timestamp` AS COALESCE(`TO_TIMESTAMP_LTZ`(`lastUpdated`, 0), CAST(TIMESTAMP '1970-01-01 00:00:00.000' AS TIMESTAMP(3) WITH LOCAL TIME ZONE)),
  PRIMARY KEY (`customerid`, `lastUpdated`) NOT ENFORCED,
  WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '0.001' SECOND
)
WITH (
  'format' = 'flexible-json',
  'path' = 'file:/mock',
  'source.monitor-interval' = '10 sec',
  'connector' = 'filesystem'
)
LIKE `Customer__schema`
=== CustomerAggregate
ID:          default_catalog.sources.CustomerAggregate
Type:        state
Stage:       flink
Primary key: customerid
Timestamp:   -
Row count:   ~1e7
---
Schema:
 - customerid: BIGINT NOT NULL
 - num_updates: BIGINT NOT NULL
Inputs:
 - default_catalog.sources.Customer
Plan:
LogicalAggregate(group=[{0}], num_updates=[COUNT()])
  LogicalProject(customerid=[$0])
    LogicalTableScan(table=[[default_catalog, sources, Customer]])
SQL:
CREATE VIEW `CustomerAggregate` AS  SELECT customerid, COUNT(*) AS num_updates FROM Customer GROUP BY customerid;

>>>flink-sql-no-functions.sql
CREATE DATABASE IF NOT EXISTS `sources`;
USE `sources`;
CREATE TEMPORARY TABLE `Customer__schema` (
  `customerid` BIGINT NOT NULL,
  `email` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `lastUpdated` BIGINT NOT NULL
)
WITH (
  'connector' = 'datagen'
);
CREATE TABLE `Customer` (
  `timestamp` AS COALESCE(`TO_TIMESTAMP_LTZ`(`lastUpdated`, 0), TIMESTAMP '1970-01-01 00:00:00.000'),
  PRIMARY KEY (`customerid`, `lastUpdated`) NOT ENFORCED,
  WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '0.001' SECOND
)
WITH (
  'format' = 'flexible-json',
  'path' = 'file:/mock',
  'source.monitor-interval' = '10 sec',
  'connector' = 'filesystem'
)
LIKE `Customer__schema`;
CREATE VIEW `CustomerAggregate`
AS
SELECT `customerid`, COUNT(*) AS `num_updates`
FROM `Customer`
GROUP BY `customerid`;
USE CATALOG `default_catalog`;
CREATE DATABASE IF NOT EXISTS `default_database`;
USE `default_database`;
CREATE DATABASE IF NOT EXISTS `mysources`;
USE `mysources`;
CREATE TEMPORARY TABLE `Customer__schema` (
  `customerid` BIGINT NOT NULL,
  `email` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `lastUpdated` BIGINT NOT NULL
)
WITH (
  'connector' = 'datagen'
);
CREATE TABLE `Customer` (
  `timestamp` AS COALESCE(`TO_TIMESTAMP_LTZ`(`lastUpdated`, 0), TIMESTAMP '1970-01-01 00:00:00.000'),
  PRIMARY KEY (`customerid`, `lastUpdated`) NOT ENFORCED,
  WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '0.001' SECOND
)
WITH (
  'format' = 'flexible-json',
  'path' = 'file:/mock',
  'source.monitor-interval' = '10 sec',
  'connector' = 'filesystem'
)
LIKE `Customer__schema`;
CREATE VIEW `CustomerAggregate`
AS
SELECT `customerid`, COUNT(*) AS `num_updates`
FROM `Customer`
GROUP BY `customerid`;
USE CATALOG `default_catalog`;
USE `default_database`;
CREATE VIEW `CustomerAggregate1`
AS
SELECT `customerid`, `num_updates` * 1 AS `updates`
FROM `sources`.`CustomerAggregate`;
CREATE VIEW `CustomerAggregate2`
AS
SELECT `customerid`, `num_updates` * 2 AS `updates`
FROM `mysources`.`CustomerAggregate`;
CREATE TABLE `CustomerAggregate1_1` (
  `customerid` BIGINT NOT NULL,
  `updates` BIGINT NOT NULL,
  PRIMARY KEY (`customerid`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'table-name' = 'CustomerAggregate1_1',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
CREATE TABLE `CustomerAggregate2_2` (
  `customerid` BIGINT NOT NULL,
  `updates` BIGINT NOT NULL,
  PRIMARY KEY (`customerid`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'table-name' = 'CustomerAggregate2_2',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
EXECUTE STATEMENT SET BEGIN
INSERT INTO `default_catalog`.`default_database`.`CustomerAggregate1_1`
SELECT *
 FROM `default_catalog`.`default_database`.`CustomerAggregate1`
;
INSERT INTO `default_catalog`.`default_database`.`CustomerAggregate2_2`
 SELECT *
  FROM `default_catalog`.`default_database`.`CustomerAggregate2`
 ;
 END
>>>kafka.json
{
  "topics" : [ ],
  "testRunnerTopics" : [ ]
}
>>>postgres.json
{
  "statements" : [
    {
      "name" : "CustomerAggregate1_1",
      "type" : "TABLE",
      "sql" : "CREATE TABLE IF NOT EXISTS \"CustomerAggregate1_1\" (\"customerid\" BIGINT NOT NULL, \"updates\" BIGINT NOT NULL, PRIMARY KEY (\"customerid\"))",
      "fields" : [
        {
          "name" : "customerid",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "updates",
          "type" : "BIGINT",
          "nullable" : false
        }
      ],
      "primaryKey" : [
        "customerid"
      ],
      "partitionKey" : [ ],
      "partitionType" : "NONE",
      "numPartitions" : 0,
      "ttl" : 0.0
    },
    {
      "name" : "CustomerAggregate2_2",
      "type" : "TABLE",
      "sql" : "CREATE TABLE IF NOT EXISTS \"CustomerAggregate2_2\" (\"customerid\" BIGINT NOT NULL, \"updates\" BIGINT NOT NULL, PRIMARY KEY (\"customerid\"))",
      "fields" : [
        {
          "name" : "customerid",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "updates",
          "type" : "BIGINT",
          "nullable" : false
        }
      ],
      "primaryKey" : [
        "customerid"
      ],
      "partitionKey" : [ ],
      "partitionType" : "NONE",
      "numPartitions" : 0,
      "ttl" : 0.0
    },
    {
      "name" : "CustomerAggregate1",
      "type" : "VIEW",
      "sql" : "CREATE OR REPLACE VIEW \"CustomerAggregate1\"(\"customerid\", \"updates\") AS SELECT *\nFROM \"CustomerAggregate1_1\"",
      "fields" : [
        {
          "name" : "customerid",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "updates",
          "type" : "BIGINT",
          "nullable" : false
        }
      ]
    },
    {
      "name" : "CustomerAggregate2",
      "type" : "VIEW",
      "sql" : "CREATE OR REPLACE VIEW \"CustomerAggregate2\"(\"customerid\", \"updates\") AS SELECT *\nFROM \"CustomerAggregate2_2\"",
      "fields" : [
        {
          "name" : "customerid",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "updates",
          "type" : "BIGINT",
          "nullable" : false
        }
      ]
    }
  ]
}
>>>vertx.json
{
  "models" : {
    "v1" : {
      "queries" : [
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "CustomerAggregate1",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM \"CustomerAggregate1_1\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        },
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "CustomerAggregate2",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM \"CustomerAggregate2_2\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        }
      ],
      "mutations" : [ ],
      "subscriptions" : [ ],
      "operations" : [
        {
          "function" : {
            "name" : "GetCustomerAggregate1",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query CustomerAggregate1($limit: Int = 10, $offset: Int = 0) {\nCustomerAggregate1(limit: $limit, offset: $offset) {\ncustomerid\nupdates\n}\n\n}",
            "queryName" : "CustomerAggregate1",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/CustomerAggregate1{?offset,limit}"
        },
        {
          "function" : {
            "name" : "GetCustomerAggregate2",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query CustomerAggregate2($limit: Int = 10, $offset: Int = 0) {\nCustomerAggregate2(limit: $limit, offset: $offset) {\ncustomerid\nupdates\n}\n\n}",
            "queryName" : "CustomerAggregate2",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/CustomerAggregate2{?offset,limit}"
        }
      ],
      "schema" : {
        "type" : "string",
        "schema" : "type CustomerAggregate1 {\n  customerid: Long!\n  updates: Long!\n}\n\ntype CustomerAggregate2 {\n  customerid: Long!\n  updates: Long!\n}\n\n\"An RFC-3339 compliant Full Date Scalar\"\nscalar Date\n\n\"A DateTime scalar that handles both full RFC3339 and shorter timestamp formats\"\nscalar DateTime\n\n\"A JSON scalar\"\nscalar JSON\n\n\"24-hour clock time value string in the format `hh:mm:ss` or `hh:mm:ss.sss`.\"\nscalar LocalTime\n\n\"A 64-bit signed integer\"\nscalar Long\n\ntype Query {\n  CustomerAggregate1(limit: Int = 10, offset: Int = 0): [CustomerAggregate1!]\n  CustomerAggregate2(limit: Int = 10, offset: Int = 0): [CustomerAggregate2!]\n}\n\nenum _McpMethodType {\n  NONE\n  TOOL\n  RESOURCE\n}\n\nenum _RestMethodType {\n  NONE\n  GET\n  POST\n}\n\ndirective @api(mcp: _McpMethodType, rest: _RestMethodType, uri: String) on QUERY | MUTATION | FIELD_DEFINITION\n"
      }
    }
  }
}
