>>>pipeline_explain.txt
=== OrderEntryJoin
ID:          default_catalog.default_database.OrderEntryJoin
Type:        stream
Stage:       postgres
Primary key: -
Timestamp:   -
Row count:   ~1e8
---
Schema:
 - id1: BIGINT NOT NULL
 - id2: BIGINT NOT NULL
 - multi: BIGINT NOT NULL
Inputs:
 - default_catalog.default_database._OrderEntries
Plan:
LogicalProject(id1=[$0], id2=[$6], multi=[*($4, $10)])
  LogicalJoin(condition=[=($1, $7)], joinType=[inner])
    LogicalTableScan(table=[[default_catalog, default_database, _OrderEntries]])
    LogicalTableScan(table=[[default_catalog, default_database, _OrderEntries]])
SQL:
CREATE VIEW `OrderEntryJoin` AS  SELECT o1.id AS id1, o2.id AS id2, o1.quantity * o2.quantity AS multi
                  FROM _OrderEntries o1 JOIN _OrderEntries o2 ON o1.customerid = o2.customerid;

=== _OrderEntries
ID:          default_catalog.default_database._OrderEntries
Type:        stream
Stage:       flink
Primary key: -
Timestamp:   time
Row count:   ~1e8
---
Schema:
 - id: BIGINT NOT NULL
 - customerid: BIGINT NOT NULL
 - time: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
 - productid: BIGINT NOT NULL
 - quantity: BIGINT NOT NULL
 - discount: DOUBLE
Inputs:
 - default_catalog.default_database._Orders
Annotations:
 - stream-root: _Orders
Plan:
LogicalProject(id=[$0], customerid=[$1], time=[$2], productid=[$4], quantity=[$5], discount=[$7])
  LogicalCorrelate(correlation=[$cor1], joinType=[inner], requiredColumns=[{3}])
    LogicalTableScan(table=[[default_catalog, default_database, _Orders]])
    Uncollect
      LogicalProject(entries=[$cor1.entries])
        LogicalValues(tuples=[[{ 0 }]])
SQL:
CREATE VIEW `_OrderEntries` AS  SELECT o.id, o.customerid, o.`time`, e.productid, e.quantity, e.discount FROM _Orders o CROSS JOIN UNNEST(entries) e;

=== _Orders
ID:          default_catalog.default_database._Orders
Type:        stream
Stage:       flink
Primary key: id, time
Timestamp:   time
Row count:   ~1e8
---
Schema:
 - id: BIGINT NOT NULL
 - customerid: BIGINT NOT NULL
 - time: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
 - entries: RecordType:peek_no_expand(BIGINT NOT NULL productid, BIGINT NOT NULL quantity, DOUBLE NOT NULL unit_price, DOUBLE discount) NOT NULL ARRAY NOT NULL
Inputs:
 - default_catalog.default_database._Orders__base
Annotations:
 - features: DENORMALIZE (feature)
 - stream-root: _Orders
Plan:
LogicalWatermarkAssigner(rowtime=[time], watermark=[-($2, 1:INTERVAL SECOND)])
  LogicalTableScan(table=[[default_catalog, default_database, _Orders]])
SQL:
CREATE TEMPORARY TABLE `_Orders__schema` (
  `id` BIGINT NOT NULL,
  `customerid` BIGINT NOT NULL,
  `time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  `entries` ROW(`productid` BIGINT NOT NULL, `quantity` BIGINT NOT NULL, `unit_price` DOUBLE NOT NULL, `discount` DOUBLE) NOT NULL ARRAY NOT NULL
)
WITH (
  'connector' = 'datagen'
);
CREATE TABLE `_Orders` (
  PRIMARY KEY (`id`, `time`) NOT ENFORCED,
  WATERMARK FOR `time` AS `time` - INTERVAL '0.001' SECOND
)
WITH (
  'format' = 'flexible-json',
  'path' = 'file:/mock',
  'source.monitor-interval' = '10 sec',
  'connector' = 'filesystem'
)
LIKE `_Orders__schema`
>>>flink-sql-no-functions.sql
CREATE TEMPORARY TABLE `_Orders__schema` (
  `id` BIGINT NOT NULL,
  `customerid` BIGINT NOT NULL,
  `time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  `entries` ROW(`productid` BIGINT NOT NULL, `quantity` BIGINT NOT NULL, `unit_price` DOUBLE NOT NULL, `discount` DOUBLE) NOT NULL ARRAY NOT NULL
)
WITH (
  'connector' = 'datagen'
);
CREATE TABLE `_Orders` (
  PRIMARY KEY (`id`, `time`) NOT ENFORCED,
  WATERMARK FOR `time` AS `time` - INTERVAL '0.001' SECOND
)
WITH (
  'format' = 'flexible-json',
  'path' = 'file:/mock',
  'source.monitor-interval' = '10 sec',
  'connector' = 'filesystem'
)
LIKE `_Orders__schema`;
CREATE VIEW `_OrderEntries`
AS
SELECT `o`.`id`, `o`.`customerid`, `o`.`time`, `e`.`productid`, `e`.`quantity`, `e`.`discount`
FROM `_Orders` AS `o`
 CROSS JOIN UNNEST(`entries`) AS `e`;
CREATE VIEW `OrderEntryJoin`
AS
SELECT `o1`.`id` AS `id1`, `o2`.`id` AS `id2`, `o1`.`quantity` * `o2`.`quantity` AS `multi`
FROM `_OrderEntries` AS `o1`
 INNER JOIN `_OrderEntries` AS `o2` ON `o1`.`customerid` = `o2`.`customerid`;
CREATE TABLE `_OrderEntries_1` (
  `id` BIGINT NOT NULL,
  `customerid` BIGINT NOT NULL,
  `time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  `productid` BIGINT NOT NULL,
  `quantity` BIGINT NOT NULL,
  `discount` DOUBLE,
  `__pk_hash` CHAR(32) CHARACTER SET `UTF-16LE`,
  PRIMARY KEY (`__pk_hash`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'sink.on-conflict.action' = 'IGNORE',
  'table-name' = '_OrderEntries_1',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
EXECUTE STATEMENT SET BEGIN
INSERT INTO `default_catalog`.`default_database`.`_OrderEntries_1`
SELECT `id`, `customerid`, `time`, `productid`, `quantity`, `discount`, `hash_columns`(`id`, `customerid`, `time`, `productid`, `quantity`, `discount`) AS `__pk_hash`
 FROM `default_catalog`.`default_database`.`_OrderEntries`
;
END
>>>kafka.json
{
  "topics" : [ ],
  "testRunnerTopics" : [ ]
}
>>>postgres.json
{
  "statements" : [
    {
      "name" : "_OrderEntries_1",
      "type" : "TABLE",
      "sql" : "CREATE TABLE IF NOT EXISTS \"_OrderEntries_1\" (\"id\" BIGINT NOT NULL, \"customerid\" BIGINT NOT NULL, \"time\" TIMESTAMP WITH TIME ZONE NOT NULL, \"productid\" BIGINT NOT NULL, \"quantity\" BIGINT NOT NULL, \"discount\" DOUBLE PRECISION, \"__pk_hash\" TEXT, PRIMARY KEY (\"__pk_hash\"))",
      "fields" : [
        {
          "name" : "id",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "customerid",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "time",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : false
        },
        {
          "name" : "productid",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "quantity",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "discount",
          "type" : "DOUBLE PRECISION",
          "nullable" : true
        },
        {
          "name" : "__pk_hash",
          "type" : "TEXT",
          "nullable" : true
        }
      ],
      "primaryKey" : [
        "__pk_hash"
      ],
      "partitionKey" : [ ],
      "partitionType" : "NONE",
      "numPartitions" : 0,
      "ttl" : 0.0
    },
    {
      "name" : "OrderEntryJoin",
      "type" : "VIEW",
      "sql" : "CREATE OR REPLACE VIEW \"OrderEntryJoin\"(\"id1\", \"id2\", \"multi\") AS SELECT \"t3\".\"id\" AS \"id1\", \"t4\".\"id\" AS \"id2\", \"t3\".\"quantity\" * \"t4\".\"quantity\" AS \"multi\"\nFROM (SELECT \"id\", \"customerid\", \"time\", \"productid\", \"quantity\", \"discount\"\n  FROM \"_OrderEntries_1\") AS \"t3\"\n INNER JOIN (SELECT \"id\", \"customerid\", \"time\", \"productid\", \"quantity\", \"discount\"\n  FROM \"_OrderEntries_1\") AS \"t4\" ON \"t3\".\"customerid\" = \"t4\".\"customerid\"",
      "fields" : [
        {
          "name" : "id1",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "id2",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "multi",
          "type" : "BIGINT",
          "nullable" : false
        }
      ]
    },
    {
      "name" : "_OrderEntries_1_hash_c1",
      "type" : "INDEX",
      "sql" : "CREATE INDEX IF NOT EXISTS \"_OrderEntries_1_hash_c1\" ON \"_OrderEntries_1\" USING hash (\"customerid\")"
    }
  ]
}
>>>vertx.json
{
  "models" : {
    "v1" : {
      "queries" : [
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "OrderEntryJoin",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT \"t\".\"id\" AS \"id1\", \"t0\".\"id\" AS \"id2\", \"t\".\"quantity\" * \"t0\".\"quantity\" AS \"multi\"\nFROM (SELECT \"id\", \"customerid\", \"time\", \"productid\", \"quantity\", \"discount\"\n  FROM \"_OrderEntries_1\") AS \"t\"\n INNER JOIN (SELECT \"id\", \"customerid\", \"time\", \"productid\", \"quantity\", \"discount\"\n  FROM \"_OrderEntries_1\") AS \"t0\" ON \"t\".\"customerid\" = \"t0\".\"customerid\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        }
      ],
      "mutations" : [ ],
      "subscriptions" : [ ],
      "operations" : [
        {
          "function" : {
            "name" : "GetOrderEntryJoin",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query OrderEntryJoin($limit: Int = 10, $offset: Int = 0) {\nOrderEntryJoin(limit: $limit, offset: $offset) {\nid1\nid2\nmulti\n}\n\n}",
            "queryName" : "OrderEntryJoin",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/OrderEntryJoin{?offset,limit}"
        }
      ],
      "schema" : {
        "type" : "string",
        "schema" : "\"An RFC-3339 compliant Full Date Scalar\"\nscalar Date\n\n\"A DateTime scalar that handles both full RFC3339 and shorter timestamp formats\"\nscalar DateTime\n\n\"A JSON scalar\"\nscalar JSON\n\n\"24-hour clock time value string in the format `hh:mm:ss` or `hh:mm:ss.sss`.\"\nscalar LocalTime\n\n\"A 64-bit signed integer\"\nscalar Long\n\ntype OrderEntryJoin {\n  id1: Long!\n  id2: Long!\n  multi: Long!\n}\n\ntype Query {\n  OrderEntryJoin(limit: Int = 10, offset: Int = 0): [OrderEntryJoin!]\n}\n\nenum _McpMethodType {\n  NONE\n  TOOL\n  RESOURCE\n}\n\nenum _RestMethodType {\n  NONE\n  GET\n  POST\n}\n\ndirective @api(mcp: _McpMethodType, rest: _RestMethodType, uri: String) on QUERY | MUTATION | FIELD_DEFINITION\n"
      }
    }
  }
}
