>>>pipeline_explain.txt
=== Customer
ID:          default_catalog.default_database.Customer
Type:        stream
Stage:       flink
Primary key: customerid, lastUpdated
Timestamp:   _ingest_time
Row count:   ~1e8
---
Schema:
 - customerid: BIGINT NOT NULL
 - email: VARCHAR(2147483647) CHARACTER SET "UTF-16LE"
 - name: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - lastUpdated: BIGINT NOT NULL
 - _ingest_time: TIMESTAMP_LTZ(3) *PROCTIME* NOT NULL
Inputs:
 - default_catalog.default_database.Customer__base
Annotations:
 - stream-root: Customer
Plan:
LogicalProject(customerid=[$0], email=[$1], name=[$2], lastUpdated=[$3], _ingest_time=[PROCTIME()])
  LogicalTableScan(table=[[default_catalog, default_database, Customer]])
SQL:
CREATE TEMPORARY TABLE `Customer__schema` (
  `customerid` BIGINT NOT NULL,
  `email` VARCHAR(2147483647) CHARACTER SET `UTF-16LE`,
  `name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `lastUpdated` BIGINT NOT NULL
)
WITH (
  'connector' = 'datagen'
);
CREATE TABLE `Customer` (
  `_ingest_time` AS PROCTIME(),
  PRIMARY KEY (`customerid`, `lastUpdated`) NOT ENFORCED
)
WITH (
  'format' = 'json',
  'path' = 'file:/mock',
  'source.monitor-interval' = '10 sec',
  'connector' = 'filesystem'
)
LIKE `Customer__schema`
=== DistinctCustomerWoutTs
ID:          default_catalog.default_database.DistinctCustomerWoutTs
Type:        state
Stage:       flink
Primary key: customerid
Timestamp:   _ingest_time
Row count:   ~2e6
---
Schema:
 - customerid: BIGINT NOT NULL
 - email: VARCHAR(2147483647) CHARACTER SET "UTF-16LE"
 - name: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - lastUpdated: BIGINT NOT NULL
 - _ingest_time: TIMESTAMP_LTZ(3) *PROCTIME* NOT NULL
Inputs:
 - default_catalog.default_database.Customer
Annotations:
 - stream-root: Customer
Plan:
LogicalProject(customerid=[$0], email=[$1], name=[$2], lastUpdated=[$3], _ingest_time=[$4])
  LogicalFilter(condition=[=($5, 1)])
    LogicalProject(customerid=[$0], email=[$1], name=[$2], lastUpdated=[$3], _ingest_time=[$4], $f5=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 DESC NULLS LAST)])
      LogicalProject(customerid=[$0], email=[$1], name=[$2], lastUpdated=[$3], _ingest_time=[$4])
        LogicalFilter(condition=[OR(AND(IS NULL($5), IS NULL($6)), <>($1, $5), AND(IS NULL($1), IS NOT NULL($5)), AND(IS NULL($5), IS NOT NULL($1)), <>($2, $6))])
          LogicalProject(customerid=[$0], email=[$1], name=[$2], lastUpdated=[$3], _ingest_time=[$4], $f5=[LAG($1, 1) OVER (PARTITION BY $0 ORDER BY $4 NULLS FIRST)], $f6=[LAG($2, 1) OVER (PARTITION BY $0 ORDER BY $4 NULLS FIRST)])
            LogicalFilter(condition=[>=($3, $5)])
              LogicalProject(customerid=[$0], email=[$1], name=[$2], lastUpdated=[$3], _ingest_time=[$4], $f5=[MAX($3) OVER (PARTITION BY $0 ORDER BY $4 NULLS FIRST)])
                LogicalTableScan(table=[[default_catalog, default_database, Customer]])
SQL:
CREATE VIEW `DistinctCustomerWoutTs`
AS
SELECT `customerid`, `email`, `name`, `lastUpdated`, `_ingest_time`
FROM (SELECT `customerid`, `email`, `name`, `lastUpdated`, `_ingest_time`, ROW_NUMBER() OVER (PARTITION BY `customerid` ORDER BY `lastUpdated` DESC NULLS LAST) AS `$f5`
  FROM (SELECT `customerid`, `email`, `name`, `lastUpdated`, `_ingest_time`
    FROM (SELECT `customerid`, `email`, `name`, `lastUpdated`, `_ingest_time`, LAG(`email`, 1) OVER (PARTITION BY `customerid` ORDER BY `_ingest_time`) AS `$f5`, LAG(`name`, 1) OVER (PARTITION BY `customerid` ORDER BY `_ingest_time`) AS `$f6`
      FROM (SELECT `customerid`, `email`, `name`, `lastUpdated`, `_ingest_time`, MAX(`lastUpdated`) OVER (PARTITION BY `customerid` ORDER BY `_ingest_time` RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `$f5`
        FROM `default_catalog`.`default_database`.`Customer`) AS `t`
      WHERE `lastUpdated` >= `$f5`) AS `t1`
    WHERE `$f5` IS NULL AND `$f6` IS NULL OR `email` <> `$f5` OR `email` IS NULL AND `$f5` IS NOT NULL OR `$f5` IS NULL AND `email` IS NOT NULL OR `name` <> `$f6`) AS `t3`) AS `t4`
WHERE `$f5` = 1
=== DistinctOrderFieldsWoutTs
ID:          default_catalog.default_database.DistinctOrderFieldsWoutTs
Type:        state
Stage:       flink
Primary key: id
Timestamp:   _ingest_time
Row count:   ~2e6
---
Schema:
 - id: BIGINT NOT NULL
 - time: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL
 - customerid: BIGINT NOT NULL
 - _ingest_time: TIMESTAMP_LTZ(3) *PROCTIME* NOT NULL
Inputs:
 - default_catalog.default_database.OrderFields
Annotations:
 - stream-root: Orders
Plan:
LogicalProject(id=[$0], time=[$1], customerid=[$2], _ingest_time=[$3])
  LogicalFilter(condition=[=($4, 1)])
    LogicalProject(id=[$0], time=[$1], customerid=[$2], _ingest_time=[$3], $f4=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $1 DESC NULLS LAST)])
      LogicalProject(id=[$0], time=[$1], customerid=[$2], _ingest_time=[$3])
        LogicalFilter(condition=[OR(IS NULL($4), <>($2, $4))])
          LogicalProject(id=[$0], time=[$1], customerid=[$2], _ingest_time=[$3], $f4=[LAG($2, 1) OVER (PARTITION BY $0 ORDER BY $3 NULLS FIRST)])
            LogicalFilter(condition=[>=($1, $4)])
              LogicalProject(id=[$0], time=[$1], customerid=[$2], _ingest_time=[$3], $f4=[MAX($1) OVER (PARTITION BY $0 ORDER BY $3 NULLS FIRST)])
                LogicalTableScan(table=[[default_catalog, default_database, OrderFields]])
SQL:
CREATE VIEW `DistinctOrderFieldsWoutTs`
AS
SELECT `id`, `time`, `customerid`, `_ingest_time`
FROM (SELECT `id`, `time`, `customerid`, `_ingest_time`, ROW_NUMBER() OVER (PARTITION BY `id` ORDER BY `time` DESC NULLS LAST) AS `$f4`
  FROM (SELECT `id`, `time`, `customerid`, `_ingest_time`
    FROM (SELECT `id`, `time`, `customerid`, `_ingest_time`, LAG(`customerid`, 1) OVER (PARTITION BY `id` ORDER BY `_ingest_time`) AS `$f4`
      FROM (SELECT `id`, `time`, `customerid`, `_ingest_time`, MAX(`time`) OVER (PARTITION BY `id` ORDER BY `_ingest_time` RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `$f4`
        FROM `default_catalog`.`default_database`.`OrderFields`) AS `t`
      WHERE `time` >= `$f4`) AS `t1`
    WHERE `$f4` IS NULL OR `customerid` <> `$f4`) AS `t3`) AS `t4`
WHERE `$f4` = 1
=== DistinctOrderWithTs
ID:          default_catalog.default_database.DistinctOrderWithTs
Type:        state
Stage:       flink
Primary key: id
Timestamp:   _ingest_time
Row count:   ~2e7
---
Schema:
 - id: BIGINT NOT NULL
 - customerid: BIGINT NOT NULL
 - time: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL
 - entries: RecordType:peek_no_expand(BIGINT NOT NULL productid, BIGINT NOT NULL quantity, DOUBLE NOT NULL unit_price, DOUBLE discount) NOT NULL ARRAY NOT NULL
 - _ingest_time: TIMESTAMP_LTZ(3) *PROCTIME* NOT NULL
Inputs:
 - default_catalog.default_database.Orders
Annotations:
 - mostRecentDistinct: true
 - stream-root: Orders
Plan:
LogicalProject(id=[$0], customerid=[$1], time=[$2], entries=[$3], _ingest_time=[$4])
  LogicalFilter(condition=[=($5, 1)])
    LogicalProject(id=[$0], customerid=[$1], time=[$2], entries=[$3], _ingest_time=[$4], __sqrlinternal_rownum=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $4 DESC NULLS LAST)])
      LogicalTableScan(table=[[default_catalog, default_database, Orders]])
SQL:
CREATE VIEW `DistinctOrderWithTs`
AS
SELECT `id`, `customerid`, `time`, `entries`, `_ingest_time`
FROM (SELECT `id`, `customerid`, `time`, `entries`, `_ingest_time`, ROW_NUMBER() OVER (PARTITION BY `id` ORDER BY `_ingest_time` DESC NULLS LAST) AS `__sqrlinternal_rownum`
  FROM `default_catalog`.`default_database`.`Orders`) AS `t`
WHERE `__sqrlinternal_rownum` = 1
=== OrderFields
ID:          default_catalog.default_database.OrderFields
Type:        stream
Stage:       flink
Primary key: id, time
Timestamp:   _ingest_time
Row count:   ~1e8
---
Schema:
 - id: BIGINT NOT NULL
 - time: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL
 - customerid: BIGINT NOT NULL
 - _ingest_time: TIMESTAMP_LTZ(3) *PROCTIME* NOT NULL
Inputs:
 - default_catalog.default_database.Orders
Annotations:
 - stream-root: Orders
Plan:
LogicalProject(id=[$0], time=[$2], customerid=[$1], _ingest_time=[$4])
  LogicalTableScan(table=[[default_catalog, default_database, Orders]])
SQL:
CREATE VIEW `OrderFields` AS  SELECT id, `time`, customerid, _ingest_time FROM Orders;

=== Orders
ID:          default_catalog.default_database.Orders
Type:        stream
Stage:       flink
Primary key: id, time
Timestamp:   _ingest_time
Row count:   ~1e8
---
Schema:
 - id: BIGINT NOT NULL
 - customerid: BIGINT NOT NULL
 - time: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL
 - entries: RecordType:peek_no_expand(BIGINT NOT NULL productid, BIGINT NOT NULL quantity, DOUBLE NOT NULL unit_price, DOUBLE discount) NOT NULL ARRAY NOT NULL
 - _ingest_time: TIMESTAMP_LTZ(3) *PROCTIME* NOT NULL
Inputs:
 - default_catalog.default_database.Orders__base
Annotations:
 - features: DENORMALIZE (feature)
 - stream-root: Orders
Plan:
LogicalProject(id=[$0], customerid=[$1], time=[$2], entries=[$3], _ingest_time=[PROCTIME()])
  LogicalTableScan(table=[[default_catalog, default_database, Orders]])
SQL:
CREATE TEMPORARY TABLE `Orders__schema` (
  `id` BIGINT NOT NULL,
  `customerid` BIGINT NOT NULL,
  `time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  `entries` ROW(`productid` BIGINT NOT NULL, `quantity` BIGINT NOT NULL, `unit_price` DOUBLE NOT NULL, `discount` DOUBLE) NOT NULL ARRAY NOT NULL
)
WITH (
  'connector' = 'datagen'
);
CREATE TABLE `Orders` (
  `_ingest_time` AS PROCTIME(),
  PRIMARY KEY (`id`, `time`) NOT ENFORCED
)
WITH (
  'format' = 'flexible-json',
  'path' = 'file:/mock',
  'source.monitor-interval' = '10 sec',
  'connector' = 'filesystem'
)
LIKE `Orders__schema`
>>>flink-sql-no-functions.sql
CREATE TEMPORARY TABLE `Orders__schema` (
  `id` BIGINT NOT NULL,
  `customerid` BIGINT NOT NULL,
  `time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  `entries` ROW(`productid` BIGINT NOT NULL, `quantity` BIGINT NOT NULL, `unit_price` DOUBLE NOT NULL, `discount` DOUBLE) NOT NULL ARRAY NOT NULL
)
WITH (
  'connector' = 'datagen'
);
CREATE TABLE `Orders` (
  `_ingest_time` AS `PROCTIME`(),
  PRIMARY KEY (`id`, `time`) NOT ENFORCED
)
WITH (
  'format' = 'flexible-json',
  'path' = 'file:/mock',
  'source.monitor-interval' = '10 sec',
  'connector' = 'filesystem'
)
LIKE `Orders__schema`;
CREATE TEMPORARY TABLE `Customer__schema` (
  `customerid` BIGINT NOT NULL,
  `email` VARCHAR(2147483647) CHARACTER SET `UTF-16LE`,
  `name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `lastUpdated` BIGINT NOT NULL
)
WITH (
  'connector' = 'datagen'
);
CREATE TABLE `Customer` (
  `_ingest_time` AS `PROCTIME`(),
  PRIMARY KEY (`customerid`, `lastUpdated`) NOT ENFORCED
)
WITH (
  'format' = 'json',
  'path' = 'file:/mock',
  'source.monitor-interval' = '10 sec',
  'connector' = 'filesystem'
)
LIKE `Customer__schema`;
CREATE VIEW `OrderFields`
AS
SELECT `id`, `time`, `customerid`, `_ingest_time`
FROM `Orders`;
CREATE VIEW `DistinctOrderWithTs`
AS
SELECT `id`, `customerid`, `time`, `entries`, `_ingest_time`
FROM (SELECT `id`, `customerid`, `time`, `entries`, `_ingest_time`, ROW_NUMBER() OVER (PARTITION BY `id` ORDER BY `_ingest_time` DESC NULLS LAST) AS `__sqrlinternal_rownum`
  FROM `default_catalog`.`default_database`.`Orders`) AS `t`
WHERE `__sqrlinternal_rownum` = 1;
CREATE VIEW `DistinctOrderFieldsWoutTs`
AS
SELECT `id`, `time`, `customerid`, `_ingest_time`
FROM (SELECT `id`, `time`, `customerid`, `_ingest_time`, ROW_NUMBER() OVER (PARTITION BY `id` ORDER BY `time` DESC NULLS LAST) AS `$f4`
  FROM (SELECT `id`, `time`, `customerid`, `_ingest_time`
    FROM (SELECT `id`, `time`, `customerid`, `_ingest_time`, LAG(`customerid`, 1) OVER (PARTITION BY `id` ORDER BY `_ingest_time`) AS `$f4`
      FROM (SELECT `id`, `time`, `customerid`, `_ingest_time`, MAX(`time`) OVER (PARTITION BY `id` ORDER BY `_ingest_time` RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `$f4`
        FROM `default_catalog`.`default_database`.`OrderFields`) AS `t`
      WHERE `time` >= `$f4`) AS `t1`
    WHERE `$f4` IS NULL OR `customerid` <> `$f4`) AS `t3`) AS `t4`
WHERE `$f4` = 1;
CREATE VIEW `DistinctCustomerWoutTs`
AS
SELECT `customerid`, `email`, `name`, `lastUpdated`, `_ingest_time`
FROM (SELECT `customerid`, `email`, `name`, `lastUpdated`, `_ingest_time`, ROW_NUMBER() OVER (PARTITION BY `customerid` ORDER BY `lastUpdated` DESC NULLS LAST) AS `$f5`
  FROM (SELECT `customerid`, `email`, `name`, `lastUpdated`, `_ingest_time`
    FROM (SELECT `customerid`, `email`, `name`, `lastUpdated`, `_ingest_time`, LAG(`email`, 1) OVER (PARTITION BY `customerid` ORDER BY `_ingest_time`) AS `$f5`, LAG(`name`, 1) OVER (PARTITION BY `customerid` ORDER BY `_ingest_time`) AS `$f6`
      FROM (SELECT `customerid`, `email`, `name`, `lastUpdated`, `_ingest_time`, MAX(`lastUpdated`) OVER (PARTITION BY `customerid` ORDER BY `_ingest_time` RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `$f5`
        FROM `default_catalog`.`default_database`.`Customer`) AS `t`
      WHERE `lastUpdated` >= `$f5`) AS `t1`
    WHERE `$f5` IS NULL AND `$f6` IS NULL OR `email` <> `$f5` OR `email` IS NULL AND `$f5` IS NOT NULL OR `$f5` IS NULL AND `email` IS NOT NULL OR `name` <> `$f6`) AS `t3`) AS `t4`
WHERE `$f5` = 1;
CREATE TABLE `Customer_1` (
  `customerid` BIGINT NOT NULL,
  `email` VARCHAR(2147483647) CHARACTER SET `UTF-16LE`,
  `name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `lastUpdated` BIGINT NOT NULL,
  `_ingest_time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  PRIMARY KEY (`customerid`, `lastUpdated`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'sink.on-conflict.action' = 'IGNORE',
  'table-name' = 'Customer_1',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
CREATE TABLE `DistinctCustomerWoutTs_2` (
  `customerid` BIGINT NOT NULL,
  `email` VARCHAR(2147483647) CHARACTER SET `UTF-16LE`,
  `name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `lastUpdated` BIGINT NOT NULL,
  `_ingest_time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  PRIMARY KEY (`customerid`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'sink.on-conflict.action' = 'TIMESTAMP',
  'sink.on-conflict.timestamp-column' = '_ingest_time',
  'table-name' = 'DistinctCustomerWoutTs_2',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
CREATE TABLE `DistinctOrderFieldsWoutTs_3` (
  `id` BIGINT NOT NULL,
  `time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  `customerid` BIGINT NOT NULL,
  `_ingest_time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  PRIMARY KEY (`id`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'sink.on-conflict.action' = 'TIMESTAMP',
  'sink.on-conflict.timestamp-column' = '_ingest_time',
  'table-name' = 'DistinctOrderFieldsWoutTs_3',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
CREATE TABLE `DistinctOrderWithTs_4` (
  `id` BIGINT NOT NULL,
  `customerid` BIGINT NOT NULL,
  `time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  `entries` RAW('com.datasqrl.flinkrunner.stdlib.json.FlinkJsonType', 'AERjb20uZGF0YXNxcmwuZmxpbmtydW5uZXIuc3RkbGliLmpzb24uRmxpbmtKc29uVHlwZVNlcmlhbGl6ZXJTbmFwc2hvdAAAAAM='),
  `_ingest_time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  PRIMARY KEY (`id`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'sink.on-conflict.action' = 'TIMESTAMP',
  'sink.on-conflict.timestamp-column' = '_ingest_time',
  'table-name' = 'DistinctOrderWithTs_4',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
CREATE TABLE `OrderFields_5` (
  `id` BIGINT NOT NULL,
  `time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  `customerid` BIGINT NOT NULL,
  `_ingest_time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  PRIMARY KEY (`id`, `time`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'sink.on-conflict.action' = 'IGNORE',
  'table-name' = 'OrderFields_5',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
CREATE TABLE `Orders_6` (
  `id` BIGINT NOT NULL,
  `customerid` BIGINT NOT NULL,
  `time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  `entries` RAW('com.datasqrl.flinkrunner.stdlib.json.FlinkJsonType', 'AERjb20uZGF0YXNxcmwuZmxpbmtydW5uZXIuc3RkbGliLmpzb24uRmxpbmtKc29uVHlwZVNlcmlhbGl6ZXJTbmFwc2hvdAAAAAM='),
  `_ingest_time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  PRIMARY KEY (`id`, `time`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'sink.on-conflict.action' = 'IGNORE',
  'table-name' = 'Orders_6',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
EXECUTE STATEMENT SET BEGIN
INSERT INTO `default_catalog`.`default_database`.`Customer_1`
SELECT *
 FROM `default_catalog`.`default_database`.`Customer`
;
INSERT INTO `default_catalog`.`default_database`.`DistinctCustomerWoutTs_2`
 SELECT *
  FROM `default_catalog`.`default_database`.`DistinctCustomerWoutTs`
 ;
 INSERT INTO `default_catalog`.`default_database`.`DistinctOrderFieldsWoutTs_3`
  SELECT *
   FROM `default_catalog`.`default_database`.`DistinctOrderFieldsWoutTs`
  ;
  INSERT INTO `default_catalog`.`default_database`.`DistinctOrderWithTs_4`
   SELECT `id`, `customerid`, `time`, `to_jsonb`(`entries`) AS `entries`, `_ingest_time`
    FROM `default_catalog`.`default_database`.`Orders`
   ;
   INSERT INTO `default_catalog`.`default_database`.`OrderFields_5`
    SELECT *
     FROM `default_catalog`.`default_database`.`OrderFields`
    ;
    INSERT INTO `default_catalog`.`default_database`.`Orders_6`
     SELECT `id`, `customerid`, `time`, `to_jsonb`(`entries`) AS `entries`, `_ingest_time`
      FROM `default_catalog`.`default_database`.`Orders`
     ;
     END
>>>kafka.json
{
  "topics" : [ ],
  "testRunnerTopics" : [ ]
}
>>>postgres.json
{
  "statements" : [
    {
      "name" : "Customer_1",
      "type" : "TABLE",
      "sql" : "CREATE TABLE IF NOT EXISTS \"Customer_1\" (\"customerid\" BIGINT NOT NULL, \"email\" TEXT, \"name\" TEXT NOT NULL, \"lastUpdated\" BIGINT NOT NULL, \"_ingest_time\" TIMESTAMP WITH TIME ZONE NOT NULL, PRIMARY KEY (\"customerid\",\"lastUpdated\"))",
      "fields" : [
        {
          "name" : "customerid",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "email",
          "type" : "TEXT",
          "nullable" : true
        },
        {
          "name" : "name",
          "type" : "TEXT",
          "nullable" : false
        },
        {
          "name" : "lastUpdated",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "_ingest_time",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : false
        }
      ],
      "primaryKey" : [
        "customerid",
        "lastUpdated"
      ],
      "partitionKey" : [ ],
      "partitionType" : "NONE",
      "numPartitions" : 0,
      "ttl" : 0.0
    },
    {
      "name" : "DistinctCustomerWoutTs_2",
      "type" : "TABLE",
      "sql" : "CREATE TABLE IF NOT EXISTS \"DistinctCustomerWoutTs_2\" (\"customerid\" BIGINT NOT NULL, \"email\" TEXT, \"name\" TEXT NOT NULL, \"lastUpdated\" BIGINT NOT NULL, \"_ingest_time\" TIMESTAMP WITH TIME ZONE NOT NULL, PRIMARY KEY (\"customerid\"))",
      "fields" : [
        {
          "name" : "customerid",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "email",
          "type" : "TEXT",
          "nullable" : true
        },
        {
          "name" : "name",
          "type" : "TEXT",
          "nullable" : false
        },
        {
          "name" : "lastUpdated",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "_ingest_time",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : false
        }
      ],
      "primaryKey" : [
        "customerid"
      ],
      "partitionKey" : [ ],
      "partitionType" : "NONE",
      "numPartitions" : 0,
      "ttl" : 0.0
    },
    {
      "name" : "DistinctOrderFieldsWoutTs_3",
      "type" : "TABLE",
      "sql" : "CREATE TABLE IF NOT EXISTS \"DistinctOrderFieldsWoutTs_3\" (\"id\" BIGINT NOT NULL, \"time\" TIMESTAMP WITH TIME ZONE NOT NULL, \"customerid\" BIGINT NOT NULL, \"_ingest_time\" TIMESTAMP WITH TIME ZONE NOT NULL, PRIMARY KEY (\"id\"))",
      "fields" : [
        {
          "name" : "id",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "time",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : false
        },
        {
          "name" : "customerid",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "_ingest_time",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : false
        }
      ],
      "primaryKey" : [
        "id"
      ],
      "partitionKey" : [ ],
      "partitionType" : "NONE",
      "numPartitions" : 0,
      "ttl" : 0.0
    },
    {
      "name" : "DistinctOrderWithTs_4",
      "type" : "TABLE",
      "sql" : "CREATE TABLE IF NOT EXISTS \"DistinctOrderWithTs_4\" (\"id\" BIGINT NOT NULL, \"customerid\" BIGINT NOT NULL, \"time\" TIMESTAMP WITH TIME ZONE NOT NULL, \"entries\" JSONB, \"_ingest_time\" TIMESTAMP WITH TIME ZONE NOT NULL, PRIMARY KEY (\"id\"))",
      "fields" : [
        {
          "name" : "id",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "customerid",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "time",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : false
        },
        {
          "name" : "entries",
          "type" : "JSONB",
          "nullable" : true
        },
        {
          "name" : "_ingest_time",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : false
        }
      ],
      "primaryKey" : [
        "id"
      ],
      "partitionKey" : [ ],
      "partitionType" : "NONE",
      "numPartitions" : 0,
      "ttl" : 0.0
    },
    {
      "name" : "OrderFields_5",
      "type" : "TABLE",
      "sql" : "CREATE TABLE IF NOT EXISTS \"OrderFields_5\" (\"id\" BIGINT NOT NULL, \"time\" TIMESTAMP WITH TIME ZONE NOT NULL, \"customerid\" BIGINT NOT NULL, \"_ingest_time\" TIMESTAMP WITH TIME ZONE NOT NULL, PRIMARY KEY (\"id\",\"time\"))",
      "fields" : [
        {
          "name" : "id",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "time",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : false
        },
        {
          "name" : "customerid",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "_ingest_time",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : false
        }
      ],
      "primaryKey" : [
        "id",
        "time"
      ],
      "partitionKey" : [ ],
      "partitionType" : "NONE",
      "numPartitions" : 0,
      "ttl" : 0.0
    },
    {
      "name" : "Orders_6",
      "type" : "TABLE",
      "sql" : "CREATE TABLE IF NOT EXISTS \"Orders_6\" (\"id\" BIGINT NOT NULL, \"customerid\" BIGINT NOT NULL, \"time\" TIMESTAMP WITH TIME ZONE NOT NULL, \"entries\" JSONB, \"_ingest_time\" TIMESTAMP WITH TIME ZONE NOT NULL, PRIMARY KEY (\"id\",\"time\"))",
      "fields" : [
        {
          "name" : "id",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "customerid",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "time",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : false
        },
        {
          "name" : "entries",
          "type" : "JSONB",
          "nullable" : true
        },
        {
          "name" : "_ingest_time",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : false
        }
      ],
      "primaryKey" : [
        "id",
        "time"
      ],
      "partitionKey" : [ ],
      "partitionType" : "NONE",
      "numPartitions" : 0,
      "ttl" : 0.0
    },
    {
      "name" : "Customer",
      "type" : "VIEW",
      "sql" : "CREATE OR REPLACE VIEW \"Customer\"(\"customerid\", \"email\", \"name\", \"lastUpdated\", \"_ingest_time\") AS SELECT *\nFROM \"Customer_1\"",
      "fields" : [
        {
          "name" : "customerid",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "email",
          "type" : "TEXT",
          "nullable" : true
        },
        {
          "name" : "name",
          "type" : "TEXT",
          "nullable" : false
        },
        {
          "name" : "lastUpdated",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "_ingest_time",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : false
        }
      ]
    },
    {
      "name" : "DistinctCustomerWoutTs",
      "type" : "VIEW",
      "sql" : "CREATE OR REPLACE VIEW \"DistinctCustomerWoutTs\"(\"customerid\", \"email\", \"name\", \"lastUpdated\", \"_ingest_time\") AS SELECT *\nFROM \"DistinctCustomerWoutTs_2\"",
      "fields" : [
        {
          "name" : "customerid",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "email",
          "type" : "TEXT",
          "nullable" : true
        },
        {
          "name" : "name",
          "type" : "TEXT",
          "nullable" : false
        },
        {
          "name" : "lastUpdated",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "_ingest_time",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : false
        }
      ]
    },
    {
      "name" : "DistinctOrderFieldsWoutTs",
      "type" : "VIEW",
      "sql" : "CREATE OR REPLACE VIEW \"DistinctOrderFieldsWoutTs\"(\"id\", \"time\", \"customerid\", \"_ingest_time\") AS SELECT *\nFROM \"DistinctOrderFieldsWoutTs_3\"",
      "fields" : [
        {
          "name" : "id",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "time",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : false
        },
        {
          "name" : "customerid",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "_ingest_time",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : false
        }
      ]
    },
    {
      "name" : "DistinctOrderWithTs",
      "type" : "VIEW",
      "sql" : "CREATE OR REPLACE VIEW \"DistinctOrderWithTs\"(\"id\", \"customerid\", \"time\", \"entries\", \"_ingest_time\") AS SELECT *\nFROM \"DistinctOrderWithTs_4\"",
      "fields" : [
        {
          "name" : "id",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "customerid",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "time",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : false
        },
        {
          "name" : "entries",
          "type" : "JSONB",
          "nullable" : true
        },
        {
          "name" : "_ingest_time",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : false
        }
      ]
    },
    {
      "name" : "OrderFields",
      "type" : "VIEW",
      "sql" : "CREATE OR REPLACE VIEW \"OrderFields\"(\"id\", \"time\", \"customerid\", \"_ingest_time\") AS SELECT *\nFROM \"OrderFields_5\"",
      "fields" : [
        {
          "name" : "id",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "time",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : false
        },
        {
          "name" : "customerid",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "_ingest_time",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : false
        }
      ]
    },
    {
      "name" : "Orders",
      "type" : "VIEW",
      "sql" : "CREATE OR REPLACE VIEW \"Orders\"(\"id\", \"customerid\", \"time\", \"entries\", \"_ingest_time\") AS SELECT *\nFROM \"Orders_6\"",
      "fields" : [
        {
          "name" : "id",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "customerid",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "time",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : false
        },
        {
          "name" : "entries",
          "type" : "JSONB",
          "nullable" : true
        },
        {
          "name" : "_ingest_time",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : false
        }
      ]
    }
  ]
}
>>>vertx.json
{
  "models" : {
    "v1" : {
      "queries" : [
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "Customer",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM \"Customer_1\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        },
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "DistinctCustomerWoutTs",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM \"DistinctCustomerWoutTs_2\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        },
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "DistinctOrderFieldsWoutTs",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM \"DistinctOrderFieldsWoutTs_3\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        },
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "DistinctOrderWithTs",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM \"DistinctOrderWithTs_4\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        },
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "OrderFields",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM \"OrderFields_5\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        },
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "Orders",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM \"Orders_6\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        }
      ],
      "mutations" : [ ],
      "subscriptions" : [ ],
      "operations" : [
        {
          "function" : {
            "name" : "GetCustomer",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query Customer($limit: Int = 10, $offset: Int = 0) {\nCustomer(limit: $limit, offset: $offset) {\ncustomerid\nemail\nname\nlastUpdated\n}\n\n}",
            "queryName" : "Customer",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/Customer{?offset,limit}"
        },
        {
          "function" : {
            "name" : "GetDistinctCustomerWoutTs",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query DistinctCustomerWoutTs($limit: Int = 10, $offset: Int = 0) {\nDistinctCustomerWoutTs(limit: $limit, offset: $offset) {\ncustomerid\nemail\nname\nlastUpdated\n}\n\n}",
            "queryName" : "DistinctCustomerWoutTs",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/DistinctCustomerWoutTs{?offset,limit}"
        },
        {
          "function" : {
            "name" : "GetDistinctOrderFieldsWoutTs",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query DistinctOrderFieldsWoutTs($limit: Int = 10, $offset: Int = 0) {\nDistinctOrderFieldsWoutTs(limit: $limit, offset: $offset) {\nid\ntime\ncustomerid\n}\n\n}",
            "queryName" : "DistinctOrderFieldsWoutTs",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/DistinctOrderFieldsWoutTs{?offset,limit}"
        },
        {
          "function" : {
            "name" : "GetDistinctOrderWithTs",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query DistinctOrderWithTs($limit: Int = 10, $offset: Int = 0) {\nDistinctOrderWithTs(limit: $limit, offset: $offset) {\nid\ncustomerid\ntime\nentries {\nproductid\nquantity\nunit_price\ndiscount\n}\n}\n\n}",
            "queryName" : "DistinctOrderWithTs",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/DistinctOrderWithTs{?offset,limit}"
        },
        {
          "function" : {
            "name" : "GetOrderFields",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query OrderFields($limit: Int = 10, $offset: Int = 0) {\nOrderFields(limit: $limit, offset: $offset) {\nid\ntime\ncustomerid\n}\n\n}",
            "queryName" : "OrderFields",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/OrderFields{?offset,limit}"
        },
        {
          "function" : {
            "name" : "GetOrders",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query Orders($limit: Int = 10, $offset: Int = 0) {\nOrders(limit: $limit, offset: $offset) {\nid\ncustomerid\ntime\nentries {\nproductid\nquantity\nunit_price\ndiscount\n}\n}\n\n}",
            "queryName" : "Orders",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/Orders{?offset,limit}"
        }
      ],
      "schema" : {
        "type" : "string",
        "schema" : "type Customer {\n  customerid: Long!\n  email: String\n  name: String!\n  lastUpdated: Long!\n}\n\n\"An RFC-3339 compliant Full Date Scalar\"\nscalar Date\n\n\"A DateTime scalar that handles both full RFC3339 and shorter timestamp formats\"\nscalar DateTime\n\ntype DistinctCustomerWoutTs {\n  customerid: Long!\n  email: String\n  name: String!\n  lastUpdated: Long!\n}\n\ntype DistinctOrderFieldsWoutTs {\n  id: Long!\n  time: DateTime!\n  customerid: Long!\n}\n\ntype DistinctOrderWithTs {\n  id: Long!\n  customerid: Long!\n  time: DateTime!\n  entries: [DistinctOrderWithTs_entriesOutput]!\n}\n\ntype DistinctOrderWithTs_entriesOutput {\n  productid: Long!\n  quantity: Long!\n  unit_price: Float!\n  discount: Float\n}\n\n\"A JSON scalar\"\nscalar JSON\n\n\"24-hour clock time value string in the format `hh:mm:ss` or `hh:mm:ss.sss`.\"\nscalar LocalTime\n\n\"A 64-bit signed integer\"\nscalar Long\n\ntype OrderFields {\n  id: Long!\n  time: DateTime!\n  customerid: Long!\n}\n\ntype Orders {\n  id: Long!\n  customerid: Long!\n  time: DateTime!\n  entries: [Orders_entriesOutput]!\n}\n\ntype Orders_entriesOutput {\n  productid: Long!\n  quantity: Long!\n  unit_price: Float!\n  discount: Float\n}\n\ntype Query {\n  Customer(limit: Int = 10, offset: Int = 0): [Customer!]\n  DistinctCustomerWoutTs(limit: Int = 10, offset: Int = 0): [DistinctCustomerWoutTs!]\n  DistinctOrderFieldsWoutTs(limit: Int = 10, offset: Int = 0): [DistinctOrderFieldsWoutTs!]\n  DistinctOrderWithTs(limit: Int = 10, offset: Int = 0): [DistinctOrderWithTs!]\n  OrderFields(limit: Int = 10, offset: Int = 0): [OrderFields!]\n  Orders(limit: Int = 10, offset: Int = 0): [Orders!]\n}\n\nenum _McpMethodType {\n  NONE\n  TOOL\n  RESOURCE\n}\n\nenum _RestMethodType {\n  NONE\n  GET\n  POST\n}\n\ndirective @api(mcp: _McpMethodType, rest: _RestMethodType, uri: String) on QUERY | MUTATION | FIELD_DEFINITION\n"
      }
    }
  }
}
