>>>pipeline_explain.txt
=== CustomerExport
ID:          CustomerExport
Type:        export
Stage:       flink
Connector:   kafka
---
Inputs:
 - default_catalog.default_database.CustomerSubset

=== Customer
ID:          default_catalog.default_database.Customer
Type:        stream
Stage:       flink
Primary key: customerid, lastUpdated
Timestamp:   timestamp
Row count:   ~1e8
---
Schema:
 - customerid: BIGINT NOT NULL
 - email: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - name: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - lastUpdated: BIGINT NOT NULL
 - timestamp: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
Inputs:
 - default_catalog.default_database.Customer__base
Annotations:
 - stream-root: Customer
Plan:
LogicalWatermarkAssigner(rowtime=[timestamp], watermark=[-($4, 1:INTERVAL SECOND)])
  LogicalProject(customerid=[$0], email=[$1], name=[$2], lastUpdated=[$3], timestamp=[COALESCE(TO_TIMESTAMP_LTZ($3, 0), 1970-01-01 08:00:00:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))])
    LogicalTableScan(table=[[default_catalog, default_database, Customer]])
SQL:
CREATE TEMPORARY TABLE `Customer__schema` (
  `customerid` BIGINT NOT NULL,
  `email` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `lastUpdated` BIGINT NOT NULL
)
WITH (
  'connector' = 'datagen'
);
CREATE TABLE `Customer` (
  `timestamp` AS COALESCE(`TO_TIMESTAMP_LTZ`(`lastUpdated`, 0), CAST(TIMESTAMP '1970-01-01 00:00:00.000' AS TIMESTAMP(3) WITH LOCAL TIME ZONE)),
  PRIMARY KEY (`customerid`, `lastUpdated`) NOT ENFORCED,
  WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '0.001' SECOND
)
WITH (
  'format' = 'flexible-json',
  'path' = 'file:/mock',
  'source.monitor-interval' = '10 sec',
  'connector' = 'filesystem'
)
LIKE `Customer__schema`
=== CustomerSubset
ID:          default_catalog.default_database.CustomerSubset
Type:        stream
Stage:       flink
Primary key: -
Timestamp:   timestamp
Row count:   ~1e8
---
Schema:
 - customerid: BIGINT NOT NULL
 - name: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - timestamp: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
Inputs:
 - default_catalog.default_database.Customer
Annotations:
 - stream-root: Customer
Plan:
LogicalProject(customerid=[$0], name=[$2], timestamp=[$4])
  LogicalTableScan(table=[[default_catalog, default_database, Customer]])
SQL:
CREATE VIEW `CustomerSubset` AS  SELECT customerid, name, `timestamp` FROM Customer;

=== DistinctCustomer
ID:          default_catalog.default_database.DistinctCustomer
Type:        state
Stage:       flink
Primary key: customerid
Timestamp:   timestamp
Row count:   ~2e7
---
Schema:
 - customerid: BIGINT NOT NULL
 - email: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - name: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - lastUpdated: BIGINT NOT NULL
 - timestamp: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
Inputs:
 - default_catalog.default_database.Customer
Annotations:
 - mostRecentDistinct: true
 - stream-root: Customer
Plan:
LogicalProject(customerid=[$0], email=[$1], name=[$2], lastUpdated=[$3], timestamp=[$4])
  LogicalFilter(condition=[=($5, 1)])
    LogicalProject(customerid=[$0], email=[$1], name=[$2], lastUpdated=[$3], timestamp=[$4], __sqrlinternal_rownum=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $4 DESC NULLS LAST)])
      LogicalTableScan(table=[[default_catalog, default_database, Customer]])
SQL:
CREATE VIEW `DistinctCustomer`
AS
SELECT `customerid`, `email`, `name`, `lastUpdated`, `timestamp`
FROM (SELECT `customerid`, `email`, `name`, `lastUpdated`, `timestamp`, ROW_NUMBER() OVER (PARTITION BY `customerid` ORDER BY `timestamp` DESC NULLS LAST) AS `__sqrlinternal_rownum`
  FROM `default_catalog`.`default_database`.`Customer`) AS `t`
WHERE `__sqrlinternal_rownum` = 1
=== JoinStream
ID:          default_catalog.default_database.JoinStream
Type:        stream
Stage:       flink
Primary key: -
Timestamp:   -
Row count:   ~1e8
---
Schema:
 - id: BIGINT NOT NULL
 - name: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
Inputs:
 - default_catalog.default_database.Customer
 - default_catalog.default_database.Orders
Plan:
LogicalProject(id=[$0], name=[$6])
  LogicalJoin(condition=[=($4, $1)], joinType=[inner])
    LogicalTableScan(table=[[default_catalog, default_database, Orders]])
    LogicalTableScan(table=[[default_catalog, default_database, Customer]])
SQL:
CREATE VIEW `JoinStream` AS  SELECT o.id, c.name FROM Orders o JOIN Customer c on c.customerid = o.customerid;

=== Orders
ID:          default_catalog.default_database.Orders
Type:        stream
Stage:       flink
Primary key: id, time
Timestamp:   time
Row count:   ~1e8
---
Schema:
 - id: BIGINT NOT NULL
 - customerid: BIGINT NOT NULL
 - time: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
 - entries: RecordType:peek_no_expand(BIGINT NOT NULL productid, BIGINT NOT NULL quantity, DOUBLE NOT NULL unit_price, DOUBLE discount) NOT NULL ARRAY NOT NULL
Inputs:
 - default_catalog.default_database.Orders__base
Annotations:
 - features: DENORMALIZE (feature)
 - stream-root: Orders
Plan:
LogicalWatermarkAssigner(rowtime=[time], watermark=[-($2, 1:INTERVAL SECOND)])
  LogicalTableScan(table=[[default_catalog, default_database, Orders]])
SQL:
CREATE TEMPORARY TABLE `Orders__schema` (
  `id` BIGINT NOT NULL,
  `customerid` BIGINT NOT NULL,
  `time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  `entries` ROW(`productid` BIGINT NOT NULL, `quantity` BIGINT NOT NULL, `unit_price` DOUBLE NOT NULL, `discount` DOUBLE) NOT NULL ARRAY NOT NULL
)
WITH (
  'connector' = 'datagen'
);
CREATE TABLE `Orders` (
  PRIMARY KEY (`id`, `time`) NOT ENFORCED,
  WATERMARK FOR `time` AS `time` - INTERVAL '0.001' SECOND
)
WITH (
  'format' = 'flexible-json',
  'path' = 'file:/mock',
  'source.monitor-interval' = '10 sec',
  'connector' = 'filesystem'
)
LIKE `Orders__schema`
=== DistinctStream
ID:          log.DistinctStream
Type:        export
Stage:       flink
---
Inputs:
 - default_catalog.default_database.DistinctCustomer

=== JoinStream
ID:          log.JoinStream
Type:        export
Stage:       flink
---
Inputs:
 - default_catalog.default_database.JoinStream

>>>flink-sql-no-functions.sql
CREATE TEMPORARY TABLE `Customer__schema` (
  `customerid` BIGINT NOT NULL,
  `email` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `lastUpdated` BIGINT NOT NULL
)
WITH (
  'connector' = 'datagen'
);
CREATE TABLE `Customer` (
  `timestamp` AS COALESCE(`TO_TIMESTAMP_LTZ`(`lastUpdated`, 0), TIMESTAMP '1970-01-01 00:00:00.000'),
  PRIMARY KEY (`customerid`, `lastUpdated`) NOT ENFORCED,
  WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '0.001' SECOND
)
WITH (
  'format' = 'flexible-json',
  'path' = 'file:/mock',
  'source.monitor-interval' = '10 sec',
  'connector' = 'filesystem'
)
LIKE `Customer__schema`;
CREATE TEMPORARY TABLE `Orders__schema` (
  `id` BIGINT NOT NULL,
  `customerid` BIGINT NOT NULL,
  `time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  `entries` ROW(`productid` BIGINT NOT NULL, `quantity` BIGINT NOT NULL, `unit_price` DOUBLE NOT NULL, `discount` DOUBLE) NOT NULL ARRAY NOT NULL
)
WITH (
  'connector' = 'datagen'
);
CREATE TABLE `Orders` (
  PRIMARY KEY (`id`, `time`) NOT ENFORCED,
  WATERMARK FOR `time` AS `time` - INTERVAL '0.001' SECOND
)
WITH (
  'format' = 'flexible-json',
  'path' = 'file:/mock',
  'source.monitor-interval' = '10 sec',
  'connector' = 'filesystem'
)
LIKE `Orders__schema`;
CREATE VIEW `JoinStream`
AS
SELECT `o`.`id`, `c`.`name`
FROM `Orders` AS `o`
 INNER JOIN `Customer` AS `c` ON `c`.`customerid` = `o`.`customerid`;
CREATE VIEW `DistinctCustomer`
AS
SELECT `customerid`, `email`, `name`, `lastUpdated`, `timestamp`
FROM (SELECT `customerid`, `email`, `name`, `lastUpdated`, `timestamp`, ROW_NUMBER() OVER (PARTITION BY `customerid` ORDER BY `timestamp` DESC NULLS LAST) AS `__sqrlinternal_rownum`
  FROM `default_catalog`.`default_database`.`Customer`) AS `t`
WHERE `__sqrlinternal_rownum` = 1;
CREATE VIEW `CustomerSubset`
AS
SELECT `customerid`, `name`, `timestamp`
FROM `Customer`;
CREATE TABLE `CustomerExport` (
  `customerid` BIGINT,
  `name` STRING,
  `timestamp` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
  WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '0.0' SECOND
)
WITH (
  'connector' = 'kafka',
  'format' = 'flexible-json',
  'properties.auto.offset.reset' = 'earliest',
  'properties.bootstrap.servers' = '${KAFKA_BOOTSTRAP_SERVERS}',
  'properties.compression.type' = 'zstd',
  'properties.group.id' = '${KAFKA_GROUP_ID}',
  'topic' = 'kafka-mutation-CustomerExport'
);
CREATE TABLE `Customer_1` (
  `customerid` BIGINT NOT NULL,
  `email` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `lastUpdated` BIGINT NOT NULL,
  `timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  PRIMARY KEY (`customerid`, `lastUpdated`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'sink.on-conflict.action' = 'IGNORE',
  'table-name' = 'Customer_1',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
CREATE TABLE `CustomerSubset_2` (
  `customerid` BIGINT NOT NULL,
  `name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  `__pk_hash` CHAR(32) CHARACTER SET `UTF-16LE`,
  PRIMARY KEY (`__pk_hash`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'sink.on-conflict.action' = 'IGNORE',
  'table-name' = 'CustomerSubset_2',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
CREATE TABLE `DistinctStream_3` (
  `customerid` BIGINT NOT NULL,
  `email` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `lastUpdated` BIGINT NOT NULL,
  `timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  PRIMARY KEY (`customerid`) NOT ENFORCED
)
WITH (
  'connector' = 'upsert-kafka',
  'key.format' = 'flexible-json',
  'properties.bootstrap.servers' = '${KAFKA_BOOTSTRAP_SERVERS}',
  'properties.compression.type' = 'zstd',
  'properties.group.id' = '${KAFKA_GROUP_ID}',
  'topic' = 'kafka-DistinctStream',
  'value.fields-include' = 'ALL',
  'value.format' = 'flexible-json'
);
CREATE TABLE `DistinctCustomer_4` (
  `customerid` BIGINT NOT NULL,
  `email` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `lastUpdated` BIGINT NOT NULL,
  `timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  PRIMARY KEY (`customerid`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'sink.on-conflict.action' = 'TIMESTAMP',
  'sink.on-conflict.timestamp-column' = 'timestamp',
  'table-name' = 'DistinctCustomer_4',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
CREATE TABLE `JoinStream_5` (
  `id` BIGINT NOT NULL,
  `name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL
)
WITH (
  'connector' = 'kafka',
  'format' = 'flexible-json',
  'properties.bootstrap.servers' = '${KAFKA_BOOTSTRAP_SERVERS}',
  'properties.compression.type' = 'zstd',
  'properties.group.id' = '${KAFKA_GROUP_ID}',
  'topic' = 'kafka-JoinStream'
);
CREATE TABLE `JoinStream_6` (
  `id` BIGINT NOT NULL,
  `name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `__pk_hash` CHAR(32) CHARACTER SET `UTF-16LE`,
  PRIMARY KEY (`__pk_hash`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'sink.on-conflict.action' = 'IGNORE',
  'table-name' = 'JoinStream_6',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
CREATE TABLE `Orders_7` (
  `id` BIGINT NOT NULL,
  `customerid` BIGINT NOT NULL,
  `time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  `entries` RAW('com.datasqrl.flinkrunner.stdlib.json.FlinkJsonType', 'AERjb20uZGF0YXNxcmwuZmxpbmtydW5uZXIuc3RkbGliLmpzb24uRmxpbmtKc29uVHlwZVNlcmlhbGl6ZXJTbmFwc2hvdAAAAAM='),
  PRIMARY KEY (`id`, `time`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'sink.on-conflict.action' = 'IGNORE',
  'table-name' = 'Orders_7',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
EXECUTE STATEMENT SET BEGIN
INSERT INTO `default_catalog`.`default_database`.`Customer_1`
SELECT *
 FROM `default_catalog`.`default_database`.`Customer`
;
INSERT INTO `default_catalog`.`default_database`.`CustomerExport`
 SELECT *
  FROM `default_catalog`.`default_database`.`CustomerSubset`
 ;
 INSERT INTO `default_catalog`.`default_database`.`CustomerSubset_2`
  SELECT `customerid`, `name`, `timestamp`, `hash_columns`(`customerid`, `name`, `timestamp`) AS `__pk_hash`
   FROM `default_catalog`.`default_database`.`CustomerSubset`
  ;
  INSERT INTO `default_catalog`.`default_database`.`DistinctStream_3`
   SELECT *
    FROM `default_catalog`.`default_database`.`DistinctCustomer`
   ;
   INSERT INTO `default_catalog`.`default_database`.`DistinctCustomer_4`
    SELECT *
     FROM `default_catalog`.`default_database`.`Customer`
    ;
    INSERT INTO `default_catalog`.`default_database`.`JoinStream_5`
     SELECT *
      FROM `default_catalog`.`default_database`.`JoinStream`
     ;
     INSERT INTO `default_catalog`.`default_database`.`JoinStream_6`
      SELECT `id`, `name`, `hash_columns`(`id`, `name`) AS `__pk_hash`
       FROM `default_catalog`.`default_database`.`JoinStream`
      ;
      INSERT INTO `default_catalog`.`default_database`.`Orders_7`
       SELECT `id`, `customerid`, `time`, `to_jsonb`(`entries`) AS `entries`
        FROM `default_catalog`.`default_database`.`Orders`
       ;
       END
>>>kafka.json
{
  "topics" : [
    {
      "topicName" : "kafka-DistinctStream",
      "tableName" : "DistinctStream_3",
      "format" : "flexible-json",
      "numPartitions" : 1,
      "replicationFactor" : 3,
      "type" : "SUBSCRIPTION",
      "messageKeys" : [
        "customerid"
      ],
      "messageSchema" : "",
      "config" : { }
    },
    {
      "topicName" : "kafka-JoinStream",
      "tableName" : "JoinStream_5",
      "format" : "flexible-json",
      "numPartitions" : 1,
      "replicationFactor" : 3,
      "type" : "SUBSCRIPTION",
      "messageKeys" : [ ],
      "messageSchema" : "",
      "config" : { }
    },
    {
      "topicName" : "kafka-mutation-CustomerExport",
      "tableName" : "CustomerExport",
      "format" : "flexible-json",
      "numPartitions" : 1,
      "replicationFactor" : 3,
      "type" : "MUTATION",
      "messageKeys" : [ ],
      "messageSchema" : "",
      "config" : { }
    }
  ],
  "testRunnerTopics" : [ ]
}
>>>postgres.json
{
  "statements" : [
    {
      "name" : "Customer_1",
      "type" : "TABLE",
      "sql" : "CREATE TABLE IF NOT EXISTS \"Customer_1\" (\"customerid\" BIGINT NOT NULL, \"email\" TEXT NOT NULL, \"name\" TEXT NOT NULL, \"lastUpdated\" BIGINT NOT NULL, \"timestamp\" TIMESTAMP WITH TIME ZONE NOT NULL, PRIMARY KEY (\"customerid\",\"lastUpdated\"))",
      "fields" : [
        {
          "name" : "customerid",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "email",
          "type" : "TEXT",
          "nullable" : false
        },
        {
          "name" : "name",
          "type" : "TEXT",
          "nullable" : false
        },
        {
          "name" : "lastUpdated",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "timestamp",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : false
        }
      ],
      "primaryKey" : [
        "customerid",
        "lastUpdated"
      ],
      "partitionKey" : [ ],
      "partitionType" : "NONE",
      "numPartitions" : 0,
      "ttl" : 0.0
    },
    {
      "name" : "CustomerSubset_2",
      "type" : "TABLE",
      "sql" : "CREATE TABLE IF NOT EXISTS \"CustomerSubset_2\" (\"customerid\" BIGINT NOT NULL, \"name\" TEXT NOT NULL, \"timestamp\" TIMESTAMP WITH TIME ZONE NOT NULL, \"__pk_hash\" TEXT, PRIMARY KEY (\"__pk_hash\"))",
      "fields" : [
        {
          "name" : "customerid",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "name",
          "type" : "TEXT",
          "nullable" : false
        },
        {
          "name" : "timestamp",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : false
        },
        {
          "name" : "__pk_hash",
          "type" : "TEXT",
          "nullable" : true
        }
      ],
      "primaryKey" : [
        "__pk_hash"
      ],
      "partitionKey" : [ ],
      "partitionType" : "NONE",
      "numPartitions" : 0,
      "ttl" : 0.0
    },
    {
      "name" : "DistinctCustomer_4",
      "type" : "TABLE",
      "sql" : "CREATE TABLE IF NOT EXISTS \"DistinctCustomer_4\" (\"customerid\" BIGINT NOT NULL, \"email\" TEXT NOT NULL, \"name\" TEXT NOT NULL, \"lastUpdated\" BIGINT NOT NULL, \"timestamp\" TIMESTAMP WITH TIME ZONE NOT NULL, PRIMARY KEY (\"customerid\"))",
      "fields" : [
        {
          "name" : "customerid",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "email",
          "type" : "TEXT",
          "nullable" : false
        },
        {
          "name" : "name",
          "type" : "TEXT",
          "nullable" : false
        },
        {
          "name" : "lastUpdated",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "timestamp",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : false
        }
      ],
      "primaryKey" : [
        "customerid"
      ],
      "partitionKey" : [ ],
      "partitionType" : "NONE",
      "numPartitions" : 0,
      "ttl" : 0.0
    },
    {
      "name" : "JoinStream_6",
      "type" : "TABLE",
      "sql" : "CREATE TABLE IF NOT EXISTS \"JoinStream_6\" (\"id\" BIGINT NOT NULL, \"name\" TEXT NOT NULL, \"__pk_hash\" TEXT, PRIMARY KEY (\"__pk_hash\"))",
      "fields" : [
        {
          "name" : "id",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "name",
          "type" : "TEXT",
          "nullable" : false
        },
        {
          "name" : "__pk_hash",
          "type" : "TEXT",
          "nullable" : true
        }
      ],
      "primaryKey" : [
        "__pk_hash"
      ],
      "partitionKey" : [ ],
      "partitionType" : "NONE",
      "numPartitions" : 0,
      "ttl" : 0.0
    },
    {
      "name" : "Orders_7",
      "type" : "TABLE",
      "sql" : "CREATE TABLE IF NOT EXISTS \"Orders_7\" (\"id\" BIGINT NOT NULL, \"customerid\" BIGINT NOT NULL, \"time\" TIMESTAMP WITH TIME ZONE NOT NULL, \"entries\" JSONB, PRIMARY KEY (\"id\",\"time\"))",
      "fields" : [
        {
          "name" : "id",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "customerid",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "time",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : false
        },
        {
          "name" : "entries",
          "type" : "JSONB",
          "nullable" : true
        }
      ],
      "primaryKey" : [
        "id",
        "time"
      ],
      "partitionKey" : [ ],
      "partitionType" : "NONE",
      "numPartitions" : 0,
      "ttl" : 0.0
    },
    {
      "name" : "Customer",
      "type" : "VIEW",
      "sql" : "CREATE OR REPLACE VIEW \"Customer\"(\"customerid\", \"email\", \"name\", \"lastUpdated\", \"timestamp\") AS SELECT *\nFROM \"Customer_1\"",
      "fields" : [
        {
          "name" : "customerid",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "email",
          "type" : "TEXT",
          "nullable" : false
        },
        {
          "name" : "name",
          "type" : "TEXT",
          "nullable" : false
        },
        {
          "name" : "lastUpdated",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "timestamp",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : false
        }
      ]
    },
    {
      "name" : "CustomerSubset",
      "type" : "VIEW",
      "sql" : "CREATE OR REPLACE VIEW \"CustomerSubset\"(\"customerid\", \"name\", \"timestamp\") AS SELECT \"customerid\", \"name\", \"timestamp\"\nFROM \"CustomerSubset_2\"",
      "fields" : [
        {
          "name" : "customerid",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "name",
          "type" : "TEXT",
          "nullable" : false
        },
        {
          "name" : "timestamp",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : false
        }
      ]
    },
    {
      "name" : "DistinctCustomer",
      "type" : "VIEW",
      "sql" : "CREATE OR REPLACE VIEW \"DistinctCustomer\"(\"customerid\", \"email\", \"name\", \"lastUpdated\", \"timestamp\") AS SELECT *\nFROM \"DistinctCustomer_4\"",
      "fields" : [
        {
          "name" : "customerid",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "email",
          "type" : "TEXT",
          "nullable" : false
        },
        {
          "name" : "name",
          "type" : "TEXT",
          "nullable" : false
        },
        {
          "name" : "lastUpdated",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "timestamp",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : false
        }
      ]
    },
    {
      "name" : "JoinStream",
      "type" : "VIEW",
      "sql" : "CREATE OR REPLACE VIEW \"JoinStream\"(\"id\", \"name\") AS SELECT \"id\", \"name\"\nFROM \"JoinStream_6\"",
      "fields" : [
        {
          "name" : "id",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "name",
          "type" : "TEXT",
          "nullable" : false
        }
      ]
    },
    {
      "name" : "Orders",
      "type" : "VIEW",
      "sql" : "CREATE OR REPLACE VIEW \"Orders\"(\"id\", \"customerid\", \"time\", \"entries\") AS SELECT *\nFROM \"Orders_7\"",
      "fields" : [
        {
          "name" : "id",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "customerid",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "time",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : false
        },
        {
          "name" : "entries",
          "type" : "JSONB",
          "nullable" : true
        }
      ]
    }
  ]
}
>>>vertx.json
{
  "models" : {
    "v1" : {
      "queries" : [
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "Customer",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM \"Customer_1\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        },
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "CustomerSubset",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT \"customerid\", \"name\", \"timestamp\"\nFROM \"CustomerSubset_2\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        },
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "DistinctCustomer",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM \"DistinctCustomer_4\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        },
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "JoinStream",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT \"id\", \"name\"\nFROM \"JoinStream_6\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        },
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "Orders",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM \"Orders_7\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        }
      ],
      "mutations" : [
        {
          "type" : "kafka",
          "fieldName" : "CustomerExport",
          "returnList" : false,
          "topic" : "kafka-mutation-CustomerExport",
          "keyColumns" : [ ],
          "computedColumns" : {
            "timestamp" : {
              "metadataType" : "TIMESTAMP",
              "name" : "",
              "required" : false
            }
          },
          "transactional" : false,
          "sinkConfig" : { }
        }
      ],
      "subscriptions" : [ ],
      "operations" : [
        {
          "function" : {
            "name" : "GetCustomer",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query Customer($limit: Int = 10, $offset: Int = 0) {\nCustomer(limit: $limit, offset: $offset) {\ncustomerid\nemail\nname\nlastUpdated\ntimestamp\n}\n\n}",
            "queryName" : "Customer",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/Customer{?offset,limit}"
        },
        {
          "function" : {
            "name" : "GetCustomerSubset",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query CustomerSubset($limit: Int = 10, $offset: Int = 0) {\nCustomerSubset(limit: $limit, offset: $offset) {\ncustomerid\nname\ntimestamp\n}\n\n}",
            "queryName" : "CustomerSubset",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/CustomerSubset{?offset,limit}"
        },
        {
          "function" : {
            "name" : "GetDistinctCustomer",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query DistinctCustomer($limit: Int = 10, $offset: Int = 0) {\nDistinctCustomer(limit: $limit, offset: $offset) {\ncustomerid\nemail\nname\nlastUpdated\ntimestamp\n}\n\n}",
            "queryName" : "DistinctCustomer",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/DistinctCustomer{?offset,limit}"
        },
        {
          "function" : {
            "name" : "GetJoinStream",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query JoinStream($limit: Int = 10, $offset: Int = 0) {\nJoinStream(limit: $limit, offset: $offset) {\nid\nname\n}\n\n}",
            "queryName" : "JoinStream",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/JoinStream{?offset,limit}"
        },
        {
          "function" : {
            "name" : "GetOrders",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query Orders($limit: Int = 10, $offset: Int = 0) {\nOrders(limit: $limit, offset: $offset) {\nid\ncustomerid\ntime\nentries {\nproductid\nquantity\nunit_price\ndiscount\n}\n}\n\n}",
            "queryName" : "Orders",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/Orders{?offset,limit}"
        },
        {
          "function" : {
            "name" : "AddCustomerExport",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "customerid" : {
                  "type" : "integer"
                },
                "name" : {
                  "type" : "string"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "mutation CustomerExport($customerid: Long, $name: String) {\nCustomerExport(event: { customerid: $customerid, name: $name }) {\ncustomerid\nname\ntimestamp\n}\n\n}",
            "queryName" : "CustomerExport",
            "operationType" : "MUTATION"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "POST",
          "uriTemplate" : "mutations/CustomerExport"
        }
      ],
      "schema" : {
        "type" : "string",
        "schema" : "type Customer {\n  customerid: Long!\n  email: String!\n  name: String!\n  lastUpdated: Long!\n  timestamp: DateTime!\n}\n\ninput CustomerExportInput {\n  customerid: Long\n  name: String\n}\n\ntype CustomerExportResultOutput {\n  customerid: Long\n  name: String\n  timestamp: DateTime\n}\n\ntype CustomerSubset {\n  customerid: Long!\n  name: String!\n  timestamp: DateTime!\n}\n\n\"An RFC-3339 compliant Full Date Scalar\"\nscalar Date\n\n\"A DateTime scalar that handles both full RFC3339 and shorter timestamp formats\"\nscalar DateTime\n\ntype DistinctCustomer {\n  customerid: Long!\n  email: String!\n  name: String!\n  lastUpdated: Long!\n  timestamp: DateTime!\n}\n\n\"A JSON scalar\"\nscalar JSON\n\ntype JoinStream {\n  id: Long!\n  name: String!\n}\n\n\"24-hour clock time value string in the format `hh:mm:ss` or `hh:mm:ss.sss`.\"\nscalar LocalTime\n\n\"A 64-bit signed integer\"\nscalar Long\n\ntype Mutation {\n  CustomerExport(event: CustomerExportInput!): CustomerExportResultOutput!\n}\n\ntype Orders {\n  id: Long!\n  customerid: Long!\n  time: DateTime!\n  entries: [Orders_entriesOutput]!\n}\n\ntype Orders_entriesOutput {\n  productid: Long!\n  quantity: Long!\n  unit_price: Float!\n  discount: Float\n}\n\ntype Query {\n  Customer(limit: Int = 10, offset: Int = 0): [Customer!]\n  CustomerSubset(limit: Int = 10, offset: Int = 0): [CustomerSubset!]\n  DistinctCustomer(limit: Int = 10, offset: Int = 0): [DistinctCustomer!]\n  JoinStream(limit: Int = 10, offset: Int = 0): [JoinStream!]\n  Orders(limit: Int = 10, offset: Int = 0): [Orders!]\n}\n\nenum _McpMethodType {\n  NONE\n  TOOL\n  RESOURCE\n}\n\nenum _RestMethodType {\n  NONE\n  GET\n  POST\n}\n\ndirective @api(mcp: _McpMethodType, rest: _RestMethodType, uri: String) on QUERY | MUTATION | FIELD_DEFINITION\n"
      }
    }
  }
}
