>>>pipeline_explain.txt
=== CustomerData
ID:          default_catalog.default_database.CustomerData
Type:        stream
Stage:       flink
Primary key: -
Timestamp:   -
Row count:   ~9e7
---
Schema:
 - customerid: BIGINT NOT NULL
 - payload: RecordType:peek_no_expand(INTEGER a, INTEGER b, INTEGER c)
Inputs:
 - default_catalog.default_database.NestedCustomers
 - default_catalog.default_database.Orders
Annotations:
 - stream-root: Orders
Plan:
LogicalProject(customerid=[$1], payload=[$5])
  LogicalCorrelate(correlation=[$cor1], joinType=[inner], requiredColumns=[{1, 2}])
    LogicalTableScan(table=[[default_catalog, default_database, Orders]])
    LogicalFilter(condition=[=($cor1.customerid, CAST($0):BIGINT NOT NULL)])
      LogicalSnapshot(period=[$cor1.time])
        LogicalTableScan(table=[[default_catalog, default_database, NestedCustomers]])
SQL:
CREATE VIEW `CustomerData` AS  SELECT o.customerid, c.payload
                  FROM Orders o JOIN NestedCustomers FOR SYSTEM_TIME AS OF o.`time` c ON o.customerid = c.customerid;

=== NestedCustomers
ID:          default_catalog.default_database.NestedCustomers
Type:        state
Stage:       flink
Primary key: customerid
Timestamp:   updatedTime
Row count:   ~1e8
---
Schema:
 - customerid: INTEGER NOT NULL
 - payload: RecordType:peek_no_expand(INTEGER a, INTEGER b, INTEGER c)
 - updatedTime: TIMESTAMP_LTZ(3) *ROWTIME*
Inputs:
 - default_catalog.default_database.NestedCustomers__base
Annotations:
 - features: DENORMALIZE (feature)
Plan:
LogicalWatermarkAssigner(rowtime=[updatedTime], watermark=[-($2, 0:INTERVAL SECOND)])
  LogicalProject(customerid=[$0], payload=[$1], updatedTime=[CAST($2):TIMESTAMP_LTZ(3) *ROWTIME*])
    LogicalTableScan(table=[[default_catalog, default_database, NestedCustomers, metadata=[timestamp]]])
SQL:
CREATE TABLE `NestedCustomers` (
  `customerid` INTEGER NOT NULL,
  `payload` ROW< `a` INTEGER, `b` INTEGER, `c` INTEGER >,
  `updatedTime` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
  PRIMARY KEY (`customerid`) NOT ENFORCED,
  WATERMARK FOR `updatedTime` AS `updatedTime` - INTERVAL '0.0' SECOND
)
WITH (
  'connector' = 'upsert-kafka',
  'key.format' = 'flexible-json',
  'properties.auto.offset.reset' = 'earliest',
  'properties.bootstrap.servers' = '${KAFKA_BOOTSTRAP_SERVERS}',
  'properties.compression.type' = 'zstd',
  'properties.group.id' = '${KAFKA_GROUP_ID}',
  'topic' = 'kafka-mutation-NestedCustomers',
  'value.fields-include' = 'ALL',
  'value.format' = 'flexible-json'
)
=== Orders
ID:          default_catalog.default_database.Orders
Type:        stream
Stage:       flink
Primary key: id, time
Timestamp:   time
Row count:   ~1e8
---
Schema:
 - id: BIGINT NOT NULL
 - customerid: BIGINT NOT NULL
 - time: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
 - entries: RecordType:peek_no_expand(BIGINT NOT NULL productid, BIGINT NOT NULL quantity, DOUBLE NOT NULL unit_price, DOUBLE discount) NOT NULL ARRAY NOT NULL
Inputs:
 - default_catalog.default_database.Orders__base
Annotations:
 - features: DENORMALIZE (feature)
 - stream-root: Orders
Plan:
LogicalWatermarkAssigner(rowtime=[time], watermark=[-($2, 1:INTERVAL SECOND)])
  LogicalTableScan(table=[[default_catalog, default_database, Orders]])
SQL:
CREATE TEMPORARY TABLE `Orders__schema` (
  `id` BIGINT NOT NULL,
  `customerid` BIGINT NOT NULL,
  `time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  `entries` ROW(`productid` BIGINT NOT NULL, `quantity` BIGINT NOT NULL, `unit_price` DOUBLE NOT NULL, `discount` DOUBLE) NOT NULL ARRAY NOT NULL
)
WITH (
  'connector' = 'datagen'
);
CREATE TABLE `Orders` (
  PRIMARY KEY (`id`, `time`) NOT ENFORCED,
  WATERMARK FOR `time` AS `time` - INTERVAL '0.001' SECOND
)
WITH (
  'format' = 'flexible-json',
  'path' = 'file:/mock',
  'source.monitor-interval' = '10 sec',
  'connector' = 'filesystem'
)
LIKE `Orders__schema`
>>>flink-sql-no-functions.sql
CREATE TEMPORARY TABLE `Orders__schema` (
  `id` BIGINT NOT NULL,
  `customerid` BIGINT NOT NULL,
  `time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  `entries` ROW(`productid` BIGINT NOT NULL, `quantity` BIGINT NOT NULL, `unit_price` DOUBLE NOT NULL, `discount` DOUBLE) NOT NULL ARRAY NOT NULL
)
WITH (
  'connector' = 'datagen'
);
CREATE TABLE `Orders` (
  PRIMARY KEY (`id`, `time`) NOT ENFORCED,
  WATERMARK FOR `time` AS `time` - INTERVAL '0.001' SECOND
)
WITH (
  'format' = 'flexible-json',
  'path' = 'file:/mock',
  'source.monitor-interval' = '10 sec',
  'connector' = 'filesystem'
)
LIKE `Orders__schema`;
CREATE TABLE `NestedCustomers` (
  `customerid` INTEGER,
  `payload` ROW< `a` INTEGER, `b` INTEGER, `c` INTEGER >,
  `updatedTime` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
  PRIMARY KEY (`customerid`) NOT ENFORCED,
  WATERMARK FOR `updatedTime` AS `updatedTime` - INTERVAL '0.0' SECOND
)
WITH (
  'connector' = 'upsert-kafka',
  'key.format' = 'flexible-json',
  'properties.auto.offset.reset' = 'earliest',
  'properties.bootstrap.servers' = '${KAFKA_BOOTSTRAP_SERVERS}',
  'properties.compression.type' = 'zstd',
  'properties.group.id' = '${KAFKA_GROUP_ID}',
  'topic' = 'kafka-mutation-NestedCustomers',
  'value.fields-include' = 'ALL',
  'value.format' = 'flexible-json'
);
CREATE VIEW `CustomerData`
AS
SELECT `o`.`customerid`, `c`.`payload`
FROM `Orders` AS `o`
 INNER JOIN `NestedCustomers` FOR SYSTEM_TIME AS OF `o`.`time` AS `c` ON `o`.`customerid` = `c`.`customerid`;
CREATE TABLE `CustomerData_1` (
  `customerid` BIGINT NOT NULL,
  `payload` RAW('com.datasqrl.flinkrunner.stdlib.json.FlinkJsonType', 'AERjb20uZGF0YXNxcmwuZmxpbmtydW5uZXIuc3RkbGliLmpzb24uRmxpbmtKc29uVHlwZVNlcmlhbGl6ZXJTbmFwc2hvdAAAAAM='),
  `__pk_hash` CHAR(32) CHARACTER SET `UTF-16LE`,
  PRIMARY KEY (`__pk_hash`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'sink.on-conflict.action' = 'IGNORE',
  'table-name' = 'CustomerData_1',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
CREATE TABLE `NestedCustomers_2` (
  `customerid` INTEGER NOT NULL,
  `payload` RAW('com.datasqrl.flinkrunner.stdlib.json.FlinkJsonType', 'AERjb20uZGF0YXNxcmwuZmxpbmtydW5uZXIuc3RkbGliLmpzb24uRmxpbmtKc29uVHlwZVNlcmlhbGl6ZXJTbmFwc2hvdAAAAAM='),
  `updatedTime` TIMESTAMP(3) WITH LOCAL TIME ZONE,
  PRIMARY KEY (`customerid`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'sink.on-conflict.action' = 'TIMESTAMP',
  'sink.on-conflict.timestamp-column' = 'updatedTime',
  'table-name' = 'NestedCustomers_2',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
CREATE TABLE `Orders_3` (
  `id` BIGINT NOT NULL,
  `customerid` BIGINT NOT NULL,
  `time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  `entries` RAW('com.datasqrl.flinkrunner.stdlib.json.FlinkJsonType', 'AERjb20uZGF0YXNxcmwuZmxpbmtydW5uZXIuc3RkbGliLmpzb24uRmxpbmtKc29uVHlwZVNlcmlhbGl6ZXJTbmFwc2hvdAAAAAM='),
  PRIMARY KEY (`id`, `time`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'sink.on-conflict.action' = 'IGNORE',
  'table-name' = 'Orders_3',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
EXECUTE STATEMENT SET BEGIN
INSERT INTO `default_catalog`.`default_database`.`CustomerData_1`
SELECT `customerid`, `to_jsonb`(`payload`) AS `payload`, `hash_columns`(`customerid`, `payload`) AS `__pk_hash`
 FROM `default_catalog`.`default_database`.`CustomerData`
;
INSERT INTO `default_catalog`.`default_database`.`NestedCustomers_2`
 SELECT `customerid`, `to_jsonb`(`payload`) AS `payload`, `updatedTime`
  FROM `default_catalog`.`default_database`.`NestedCustomers`
 ;
 INSERT INTO `default_catalog`.`default_database`.`Orders_3`
  SELECT `id`, `customerid`, `time`, `to_jsonb`(`entries`) AS `entries`
   FROM `default_catalog`.`default_database`.`Orders`
  ;
  END
>>>kafka.json
{
  "topics" : [
    {
      "topicName" : "kafka-mutation-NestedCustomers",
      "tableName" : "NestedCustomers",
      "format" : "flexible-json",
      "numPartitions" : 1,
      "replicationFactor" : 3,
      "type" : "MUTATION",
      "messageKeys" : [
        "customerid"
      ],
      "messageSchema" : "",
      "config" : { }
    }
  ],
  "testRunnerTopics" : [ ]
}
>>>postgres.json
{
  "statements" : [
    {
      "name" : "CustomerData_1",
      "type" : "TABLE",
      "sql" : "CREATE TABLE IF NOT EXISTS \"CustomerData_1\" (\"customerid\" BIGINT NOT NULL, \"payload\" JSONB, \"__pk_hash\" TEXT, PRIMARY KEY (\"__pk_hash\"))",
      "fields" : [
        {
          "name" : "customerid",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "payload",
          "type" : "JSONB",
          "nullable" : true
        },
        {
          "name" : "__pk_hash",
          "type" : "TEXT",
          "nullable" : true
        }
      ],
      "primaryKey" : [
        "__pk_hash"
      ],
      "partitionKey" : [ ],
      "partitionType" : "NONE",
      "numPartitions" : 0,
      "ttl" : 0.0
    },
    {
      "name" : "NestedCustomers_2",
      "type" : "TABLE",
      "sql" : "CREATE TABLE IF NOT EXISTS \"NestedCustomers_2\" (\"customerid\" INTEGER NOT NULL, \"payload\" JSONB, \"updatedTime\" TIMESTAMP WITH TIME ZONE, PRIMARY KEY (\"customerid\"))",
      "fields" : [
        {
          "name" : "customerid",
          "type" : "INTEGER",
          "nullable" : false
        },
        {
          "name" : "payload",
          "type" : "JSONB",
          "nullable" : true
        },
        {
          "name" : "updatedTime",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : true
        }
      ],
      "primaryKey" : [
        "customerid"
      ],
      "partitionKey" : [ ],
      "partitionType" : "NONE",
      "numPartitions" : 0,
      "ttl" : 0.0
    },
    {
      "name" : "Orders_3",
      "type" : "TABLE",
      "sql" : "CREATE TABLE IF NOT EXISTS \"Orders_3\" (\"id\" BIGINT NOT NULL, \"customerid\" BIGINT NOT NULL, \"time\" TIMESTAMP WITH TIME ZONE NOT NULL, \"entries\" JSONB, PRIMARY KEY (\"id\",\"time\"))",
      "fields" : [
        {
          "name" : "id",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "customerid",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "time",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : false
        },
        {
          "name" : "entries",
          "type" : "JSONB",
          "nullable" : true
        }
      ],
      "primaryKey" : [
        "id",
        "time"
      ],
      "partitionKey" : [ ],
      "partitionType" : "NONE",
      "numPartitions" : 0,
      "ttl" : 0.0
    },
    {
      "name" : "CustomerData",
      "type" : "VIEW",
      "sql" : "CREATE OR REPLACE VIEW \"CustomerData\"(\"customerid\", \"payload\") AS SELECT \"customerid\", \"payload\"\nFROM \"CustomerData_1\"",
      "fields" : [
        {
          "name" : "customerid",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "payload",
          "type" : "JSONB",
          "nullable" : true
        }
      ]
    },
    {
      "name" : "NestedCustomers",
      "type" : "VIEW",
      "sql" : "CREATE OR REPLACE VIEW \"NestedCustomers\"(\"customerid\", \"payload\", \"updatedTime\") AS SELECT *\nFROM \"NestedCustomers_2\"",
      "fields" : [
        {
          "name" : "customerid",
          "type" : "INTEGER",
          "nullable" : false
        },
        {
          "name" : "payload",
          "type" : "JSONB",
          "nullable" : true
        },
        {
          "name" : "updatedTime",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : true
        }
      ]
    },
    {
      "name" : "Orders",
      "type" : "VIEW",
      "sql" : "CREATE OR REPLACE VIEW \"Orders\"(\"id\", \"customerid\", \"time\", \"entries\") AS SELECT *\nFROM \"Orders_3\"",
      "fields" : [
        {
          "name" : "id",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "customerid",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "time",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : false
        },
        {
          "name" : "entries",
          "type" : "JSONB",
          "nullable" : true
        }
      ]
    }
  ]
}
>>>vertx.json
{
  "models" : {
    "v1" : {
      "queries" : [
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "CustomerData",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT \"customerid\", \"payload\"\nFROM \"CustomerData_1\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        },
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "NestedCustomers",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM \"NestedCustomers_2\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        },
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "Orders",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM \"Orders_3\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        }
      ],
      "mutations" : [
        {
          "type" : "kafka",
          "fieldName" : "NestedCustomers",
          "returnList" : false,
          "topic" : "kafka-mutation-NestedCustomers",
          "keyColumns" : [
            "customerid"
          ],
          "computedColumns" : {
            "updatedTime" : {
              "metadataType" : "TIMESTAMP",
              "name" : "",
              "required" : false
            }
          },
          "transactional" : false,
          "sinkConfig" : { }
        }
      ],
      "subscriptions" : [ ],
      "operations" : [
        {
          "function" : {
            "name" : "GetCustomerData",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query CustomerData($limit: Int = 10, $offset: Int = 0) {\nCustomerData(limit: $limit, offset: $offset) {\ncustomerid\npayload {\na\nb\nc\n}\n}\n\n}",
            "queryName" : "CustomerData",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/CustomerData{?offset,limit}"
        },
        {
          "function" : {
            "name" : "GetNestedCustomers",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query NestedCustomers($limit: Int = 10, $offset: Int = 0) {\nNestedCustomers(limit: $limit, offset: $offset) {\ncustomerid\npayload {\na\nb\nc\n}\nupdatedTime\n}\n\n}",
            "queryName" : "NestedCustomers",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/NestedCustomers{?offset,limit}"
        },
        {
          "function" : {
            "name" : "GetOrders",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query Orders($limit: Int = 10, $offset: Int = 0) {\nOrders(limit: $limit, offset: $offset) {\nid\ncustomerid\ntime\nentries {\nproductid\nquantity\nunit_price\ndiscount\n}\n}\n\n}",
            "queryName" : "Orders",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/Orders{?offset,limit}"
        },
        {
          "function" : {
            "name" : "AddNestedCustomers",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "payload" : {
                  "type" : "object",
                  "properties" : {
                    "a" : {
                      "type" : "integer"
                    },
                    "b" : {
                      "type" : "integer"
                    },
                    "c" : {
                      "type" : "integer"
                    }
                  },
                  "required" : [ ]
                },
                "customerid" : {
                  "type" : "integer"
                }
              },
              "required" : [
                "customerid"
              ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "mutation NestedCustomers($customerid: Int!, $payload: NestedCustomers_payloadInput) {\nNestedCustomers(event: { customerid: $customerid, payload: $payload }) {\ncustomerid\npayload {\na\nb\nc\n}\nupdatedTime\n}\n\n}",
            "queryName" : "NestedCustomers",
            "operationType" : "MUTATION"
          },
          "mcpMethod" : "NONE",
          "restMethod" : "POST",
          "uriTemplate" : "mutations/NestedCustomers"
        }
      ],
      "schema" : {
        "type" : "string",
        "schema" : "type CustomerData {\n  customerid: Long!\n  payload: CustomerData_payloadOutput\n}\n\ntype CustomerData_payloadOutput {\n  a: Int\n  b: Int\n  c: Int\n}\n\n\"An RFC-3339 compliant Full Date Scalar\"\nscalar Date\n\n\"A DateTime scalar that handles both full RFC3339 and shorter timestamp formats\"\nscalar DateTime\n\n\"A JSON scalar\"\nscalar JSON\n\n\"24-hour clock time value string in the format `hh:mm:ss` or `hh:mm:ss.sss`.\"\nscalar LocalTime\n\n\"A 64-bit signed integer\"\nscalar Long\n\ntype Mutation {\n  NestedCustomers(event: NestedCustomersInput!): NestedCustomersResultOutput!\n}\n\ntype NestedCustomers {\n  customerid: Int!\n  payload: NestedCustomers_payloadOutput\n  updatedTime: DateTime\n}\n\ninput NestedCustomersInput {\n  customerid: Int!\n  payload: NestedCustomers_payloadInput\n}\n\ntype NestedCustomersResultOutput {\n  customerid: Int!\n  payload: NestedCustomersResult_payloadOutput\n  updatedTime: DateTime\n}\n\ntype NestedCustomersResult_payloadOutput {\n  a: Int\n  b: Int\n  c: Int\n}\n\ninput NestedCustomers_payloadInput {\n  a: Int\n  b: Int\n  c: Int\n}\n\ntype NestedCustomers_payloadOutput {\n  a: Int\n  b: Int\n  c: Int\n}\n\ntype Orders {\n  id: Long!\n  customerid: Long!\n  time: DateTime!\n  entries: [Orders_entriesOutput]!\n}\n\ntype Orders_entriesOutput {\n  productid: Long!\n  quantity: Long!\n  unit_price: Float!\n  discount: Float\n}\n\ntype Query {\n  CustomerData(limit: Int = 10, offset: Int = 0): [CustomerData!]\n  NestedCustomers(limit: Int = 10, offset: Int = 0): [NestedCustomers!]\n  Orders(limit: Int = 10, offset: Int = 0): [Orders!]\n}\n\nenum _McpMethodType {\n  NONE\n  TOOL\n  RESOURCE\n}\n\nenum _RestMethodType {\n  NONE\n  GET\n  POST\n}\n\ndirective @api(mcp: _McpMethodType, rest: _RestMethodType, uri: String) on QUERY | MUTATION | FIELD_DEFINITION\n"
      }
    }
  }
}
