>>>pipeline_explain.txt
=== Customer
ID:          default_catalog.default_database.Customer
Type:        stream
Stage:       flink
Primary key: customerid, lastUpdated
Timestamp:   timestamp
Row count:   ~1e8
---
Schema:
 - customerid: BIGINT NOT NULL
 - email: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - name: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - lastUpdated: BIGINT NOT NULL
 - timestamp: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
Inputs:
 - default_catalog.default_database.Customer__base
Annotations:
 - stream-root: Customer
Plan:
LogicalWatermarkAssigner(rowtime=[timestamp], watermark=[-($4, 1:INTERVAL SECOND)])
  LogicalProject(customerid=[$0], email=[$1], name=[$2], lastUpdated=[$3], timestamp=[COALESCE(TO_TIMESTAMP_LTZ($3, 0), 1970-01-01 08:00:00:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))])
    LogicalTableScan(table=[[default_catalog, default_database, Customer]])
SQL:
CREATE TEMPORARY TABLE `Customer__schema` (
  `customerid` BIGINT NOT NULL,
  `email` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `lastUpdated` BIGINT NOT NULL
)
WITH (
  'connector' = 'datagen'
);
CREATE TABLE `Customer` (
  `timestamp` AS COALESCE(`TO_TIMESTAMP_LTZ`(`lastUpdated`, 0), CAST(TIMESTAMP '1970-01-01 00:00:00.000' AS TIMESTAMP(3) WITH LOCAL TIME ZONE)),
  PRIMARY KEY (`customerid`, `lastUpdated`) NOT ENFORCED,
  WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '0.001' SECOND
)
WITH (
  'format' = 'flexible-json',
  'path' = 'file:/mock',
  'source.monitor-interval' = '10 sec',
  'connector' = 'filesystem'
)
LIKE `Customer__schema`
=== CustomerByEmail
ID:          default_catalog.default_database.CustomerByEmail
Type:        query
Stage:       postgres
---
Inputs:
 - default_catalog.default_database.Customer
Annotations:
 - stream-root: Customer
 - parameters: email, id
 - base-table: Customer
Plan:
LogicalProject(customerid=[$0], email=[$1], name=[$2], lastUpdated=[$3], timestamp=[$4])
  LogicalFilter(condition=[AND(=(?0, $1), OR(IS NULL(?1), =(?1, $0)))])
    LogicalTableScan(table=[[default_catalog, default_database, Customer]])
SQL:
CREATE VIEW `CustomerByEmail` AS  SELECT * FROM Customer WHERE ?      = email AND (?   IS NULL OR ?   = customerid);

=== CustomerByIds
ID:          default_catalog.default_database.CustomerByIds
Type:        query
Stage:       postgres
---
Inputs:
 - default_catalog.default_database.Customer
Annotations:
 - stream-root: Customer
 - parameters: ids
 - base-table: Customer
Plan:
LogicalSort(fetch=[100])
  LogicalProject(customerid=[$0], email=[$1], name=[$2], lastUpdated=[$3], timestamp=[$4])
    LogicalFilter(condition=[array_contains(CAST(?0):BIGINT NOT NULL ARRAY, $0)])
      LogicalTableScan(table=[[default_catalog, default_database, Customer]])
SQL:
CREATE VIEW `CustomerByIds` AS  SELECT * FROM Customer WHERE array_contains(CAST(?    AS BIGINT ARRAY), customerid) LIMIT 100;

=== CustomerByNothing
ID:          default_catalog.default_database.CustomerByNothing
Type:        stream
Stage:       flink
Primary key: customerid, lastUpdated
Timestamp:   timestamp
Row count:   ~1e8
---
Schema:
 - customerid: BIGINT NOT NULL
 - email: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - name: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - lastUpdated: BIGINT NOT NULL
 - timestamp: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
Inputs:
 - default_catalog.default_database.Customer
Annotations:
 - stream-root: Customer
Plan:
LogicalProject(customerid=[$0], email=[$1], name=[$2], lastUpdated=[$3], timestamp=[$4])
  LogicalTableScan(table=[[default_catalog, default_database, Customer]])
SQL:
CREATE VIEW `CustomerByNothing` AS  SELECT * FROM Customer;

=== CustomersByTime
ID:          default_catalog.default_database.CustomersByTime
Type:        query
Stage:       postgres
---
Inputs:
 - default_catalog.default_database.Customer
Annotations:
 - stream-root: Customer
 - parameters: customerid, fromTime
 - base-table: Customer
Plan:
LogicalSort(sort0=[$4], dir0=[ASC-nulls-first])
  LogicalProject(customerid=[$0], email=[$1], name=[$2], lastUpdated=[$3], timestamp=[$4])
    LogicalFilter(condition=[AND(=($0, ?0), >=(+($4, 14400000:INTERVAL HOUR), ?1))])
      LogicalTableScan(table=[[default_catalog, default_database, Customer]])
SQL:
CREATE VIEW `CustomersByTime` AS 
    SELECT * FROM Customer WHERE customerid = ?           AND `timestamp` + INTERVAL '4' HOUR >= ?        
    ORDER BY `timestamp` ASC;

=== SelectCustomer
ID:          default_catalog.default_database.SelectCustomer
Type:        query
Stage:       postgres
---
Inputs:
 - default_catalog.default_database.CustomerByEmail
Annotations:
 - features: TABLE_FUNCTION_SCAN (feature)
 - stream-root: Customer
 - parameters: id
 - base-table: Customer
Plan:
LogicalProject(customerid=[$0], email=[$1], name=[$2], lastUpdated=[$3], timestamp=[$4])
  LogicalTableFunctionScan(invocation=[CustomerByEmail('john@doe.com', ?0)], rowType=[RecordType(BIGINT customerid, VARCHAR(2147483647) email, VARCHAR(2147483647) name, BIGINT lastUpdated, TIMESTAMP_LTZ(3) *ROWTIME* timestamp)], elementType=[class [Ljava.lang.Object;])
SQL:
CREATE VIEW `SelectCustomer` AS  SELECT * FROM TABLE(CustomerByEmail(email => 'john@doe.com', id => ?  ));

>>>flink-sql-no-functions.sql
CREATE TEMPORARY TABLE `Customer__schema` (
  `customerid` BIGINT NOT NULL,
  `email` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `lastUpdated` BIGINT NOT NULL
)
WITH (
  'connector' = 'datagen'
);
CREATE TABLE `Customer` (
  `timestamp` AS COALESCE(`TO_TIMESTAMP_LTZ`(`lastUpdated`, 0), TIMESTAMP '1970-01-01 00:00:00.000'),
  PRIMARY KEY (`customerid`, `lastUpdated`) NOT ENFORCED,
  WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '0.001' SECOND
)
WITH (
  'format' = 'flexible-json',
  'path' = 'file:/mock',
  'source.monitor-interval' = '10 sec',
  'connector' = 'filesystem'
)
LIKE `Customer__schema`;
CREATE VIEW `CustomerByNothing`
AS
SELECT *
FROM `Customer`;
CREATE TABLE `Customer_1` (
  `customerid` BIGINT NOT NULL,
  `email` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `lastUpdated` BIGINT NOT NULL,
  `timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  PRIMARY KEY (`customerid`, `lastUpdated`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'sink.on-conflict.action' = 'IGNORE',
  'table-name' = 'Customer_1',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
CREATE TABLE `CustomerByNothing_2` (
  `customerid` BIGINT NOT NULL,
  `email` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `lastUpdated` BIGINT NOT NULL,
  `timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL
)
WITH (
  'connector' = 'kafka',
  'format' = 'flexible-json',
  'properties.bootstrap.servers' = '${KAFKA_BOOTSTRAP_SERVERS}',
  'properties.compression.type' = 'zstd',
  'properties.group.id' = '${KAFKA_GROUP_ID}',
  'topic' = 'kafka-CustomerByNothing'
);
EXECUTE STATEMENT SET BEGIN
INSERT INTO `default_catalog`.`default_database`.`Customer_1`
SELECT *
 FROM `default_catalog`.`default_database`.`Customer`
;
INSERT INTO `default_catalog`.`default_database`.`CustomerByNothing_2`
 SELECT *
  FROM `default_catalog`.`default_database`.`CustomerByNothing`
 ;
 END
>>>kafka.json
{
  "topics" : [
    {
      "topicName" : "kafka-CustomerByNothing",
      "tableName" : "CustomerByNothing_2",
      "format" : "flexible-json",
      "numPartitions" : 1,
      "replicationFactor" : 3,
      "type" : "SUBSCRIPTION",
      "messageKeys" : [ ],
      "messageSchema" : "",
      "config" : { }
    }
  ],
  "testRunnerTopics" : [ ]
}
>>>postgres.json
{
  "statements" : [
    {
      "name" : "Customer_1",
      "type" : "TABLE",
      "sql" : "CREATE TABLE IF NOT EXISTS \"Customer_1\" (\"customerid\" BIGINT NOT NULL, \"email\" TEXT NOT NULL, \"name\" TEXT NOT NULL, \"lastUpdated\" BIGINT NOT NULL, \"timestamp\" TIMESTAMP WITH TIME ZONE NOT NULL, PRIMARY KEY (\"customerid\",\"lastUpdated\"))",
      "fields" : [
        {
          "name" : "customerid",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "email",
          "type" : "TEXT",
          "nullable" : false
        },
        {
          "name" : "name",
          "type" : "TEXT",
          "nullable" : false
        },
        {
          "name" : "lastUpdated",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "timestamp",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : false
        }
      ],
      "primaryKey" : [
        "customerid",
        "lastUpdated"
      ],
      "partitionKey" : [ ],
      "partitionType" : "NONE",
      "numPartitions" : 0,
      "ttl" : 0.0
    },
    {
      "name" : "Customer",
      "type" : "VIEW",
      "sql" : "CREATE OR REPLACE VIEW \"Customer\"(\"customerid\", \"email\", \"name\", \"lastUpdated\", \"timestamp\") AS SELECT *\nFROM \"Customer_1\"",
      "fields" : [
        {
          "name" : "customerid",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "email",
          "type" : "TEXT",
          "nullable" : false
        },
        {
          "name" : "name",
          "type" : "TEXT",
          "nullable" : false
        },
        {
          "name" : "lastUpdated",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "timestamp",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : false
        }
      ]
    },
    {
      "name" : "Customer_1_btree_c0c4",
      "type" : "INDEX",
      "sql" : "CREATE INDEX IF NOT EXISTS \"Customer_1_btree_c0c4\" ON \"Customer_1\" USING btree (\"customerid\",\"timestamp\")"
    },
    {
      "name" : "Customer_1_btree_c1c0",
      "type" : "INDEX",
      "sql" : "CREATE INDEX IF NOT EXISTS \"Customer_1_btree_c1c0\" ON \"Customer_1\" USING btree (\"email\",\"customerid\")"
    }
  ]
}
>>>vertx.json
{
  "models" : {
    "v1" : {
      "queries" : [
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "Customer",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM \"Customer_1\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        },
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "CustomerByEmail",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "email"
              },
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "id"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM \"Customer_1\"\nWHERE $1 = \"email\" AND ($2 IS NULL OR $2 = \"customerid\")",
              "parameters" : [
                {
                  "type" : "arg",
                  "path" : "email",
                  "sqlType" : "VARCHAR"
                },
                {
                  "type" : "arg",
                  "path" : "id",
                  "sqlType" : "BIGINT"
                }
              ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        },
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "CustomerByIds",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "ids"
              },
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM \"Customer_1\"\nWHERE (\"customerid\" = ANY (CAST($1 AS BIGINT ARRAY)))\nFETCH NEXT 100 ROWS ONLY",
              "parameters" : [
                {
                  "type" : "arg",
                  "path" : "ids",
                  "sqlType" : "ARRAY"
                }
              ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        },
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "CustomersByTime",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "customerid"
              },
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              },
              {
                "type" : "variable",
                "path" : "fromTime"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM \"Customer_1\"\nWHERE \"customerid\" = $1 AND \"timestamp\" + INTERVAL '4' HOUR >= $2\nORDER BY \"timestamp\" NULLS FIRST",
              "parameters" : [
                {
                  "type" : "arg",
                  "path" : "customerid",
                  "sqlType" : "BIGINT"
                },
                {
                  "type" : "arg",
                  "path" : "fromTime",
                  "sqlType" : "TIMESTAMP"
                }
              ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        },
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "SelectCustomer",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "id"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM \"Customer_1\"\nWHERE 'john@doe.com' = \"email\" AND ($1 IS NULL OR $1 = \"customerid\")",
              "parameters" : [
                {
                  "type" : "arg",
                  "path" : "id",
                  "sqlType" : "BIGINT"
                }
              ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        }
      ],
      "mutations" : [ ],
      "subscriptions" : [
        {
          "type" : "kafka",
          "fieldName" : "CustomerByNothing",
          "topic" : "kafka-CustomerByNothing",
          "sinkConfig" : { },
          "equalityConditions" : { }
        }
      ],
      "operations" : [
        {
          "function" : {
            "name" : "GetCustomer",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query Customer($limit: Int = 10, $offset: Int = 0) {\nCustomer(limit: $limit, offset: $offset) {\ncustomerid\nemail\nname\nlastUpdated\ntimestamp\n}\n\n}",
            "queryName" : "Customer",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/Customer{?offset,limit}"
        },
        {
          "function" : {
            "name" : "GetCustomerByEmail",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                },
                "id" : {
                  "type" : "integer"
                },
                "email" : {
                  "type" : "string"
                }
              },
              "required" : [
                "email"
              ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query CustomerByEmail($email: String!, $id: Long, $limit: Int = 10, $offset: Int = 0) {\nCustomerByEmail(email: $email, id: $id, limit: $limit, offset: $offset) {\ncustomerid\nemail\nname\nlastUpdated\ntimestamp\n}\n\n}",
            "queryName" : "CustomerByEmail",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/CustomerByEmail{?offset,limit,id,email}"
        },
        {
          "function" : {
            "name" : "GetCustomerByIds",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                },
                "ids" : {
                  "type" : "array",
                  "items" : {
                    "type" : "integer"
                  }
                }
              },
              "required" : [
                "ids"
              ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query CustomerByIds($ids: [Long]!, $limit: Int = 100, $offset: Int = 0) {\nCustomerByIds(ids: $ids, limit: $limit, offset: $offset) {\ncustomerid\nemail\nname\nlastUpdated\ntimestamp\n}\n\n}",
            "queryName" : "CustomerByIds",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/CustomerByIds{?offset,limit,ids}"
        },
        {
          "function" : {
            "name" : "GetCustomersByTime",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "customerid" : {
                  "type" : "integer"
                },
                "fromTime" : {
                  "type" : "string"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [
                "customerid",
                "fromTime"
              ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query CustomersByTime($customerid: Long!, $fromTime: DateTime!, $limit: Int = 10, $offset: Int = 0) {\nCustomersByTime(customerid: $customerid, fromTime: $fromTime, limit: $limit, offset: $offset) {\ncustomerid\nemail\nname\nlastUpdated\ntimestamp\n}\n\n}",
            "queryName" : "CustomersByTime",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/CustomersByTime{?offset,customerid,fromTime,limit}"
        },
        {
          "function" : {
            "name" : "GetSelectCustomer",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                },
                "id" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query SelectCustomer($id: Long, $limit: Int = 10, $offset: Int = 0) {\nSelectCustomer(id: $id, limit: $limit, offset: $offset) {\ncustomerid\nemail\nname\nlastUpdated\ntimestamp\n}\n\n}",
            "queryName" : "SelectCustomer",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/SelectCustomer{?offset,limit,id}"
        }
      ],
      "schema" : {
        "type" : "string",
        "schema" : "type Customer {\n  customerid: Long!\n  email: String!\n  name: String!\n  lastUpdated: Long!\n  timestamp: DateTime!\n}\n\n\"An RFC-3339 compliant Full Date Scalar\"\nscalar Date\n\n\"A DateTime scalar that handles both full RFC3339 and shorter timestamp formats\"\nscalar DateTime\n\n\"A JSON scalar\"\nscalar JSON\n\n\"24-hour clock time value string in the format `hh:mm:ss` or `hh:mm:ss.sss`.\"\nscalar LocalTime\n\n\"A 64-bit signed integer\"\nscalar Long\n\ntype Query {\n  Customer(limit: Int = 10, offset: Int = 0): [Customer!]\n  CustomerByEmail(email: String!, id: Long, limit: Int = 10, offset: Int = 0): [Customer!]\n  CustomerByIds(ids: [Long]!, limit: Int = 100, offset: Int = 0): [Customer!]\n  CustomersByTime(customerid: Long!, fromTime: DateTime!, limit: Int = 10, offset: Int = 0): [Customer!]\n  SelectCustomer(id: Long, limit: Int = 10, offset: Int = 0): [Customer!]\n}\n\ntype Subscription {\n  CustomerByNothing: Customer\n}\n\nenum _McpMethodType {\n  NONE\n  TOOL\n  RESOURCE\n}\n\nenum _RestMethodType {\n  NONE\n  GET\n  POST\n}\n\ndirective @api(mcp: _McpMethodType, rest: _RestMethodType, uri: String) on QUERY | MUTATION | FIELD_DEFINITION\n"
      }
    }
  }
}
