>>>pipeline_explain.txt
=== MachineReading
ID:          default_catalog.default_database.MachineReading
Type:        stream
Stage:       flink
Primary key: -
Timestamp:   event_time
Row count:   ~9e7
---
Schema:
 - sensorid: INTEGER
 - temperature: FLOAT
 - event_time: TIMESTAMP_LTZ(3) *ROWTIME*
 - machineid: INTEGER
Inputs:
 - default_catalog.default_database._SensorReading
 - default_catalog.default_database._Sensors
Annotations:
 - stream-root: _SensorReading
Plan:
LogicalProject(sensorid=[$1], temperature=[$2], event_time=[$3], machineid=[$5])
  LogicalCorrelate(correlation=[$cor1], joinType=[inner], requiredColumns=[{1, 3}])
    LogicalTableScan(table=[[default_catalog, default_database, _SensorReading]])
    LogicalFilter(condition=[=($cor1.sensorid, $0)])
      LogicalSnapshot(period=[$cor1.event_time])
        LogicalTableScan(table=[[default_catalog, default_database, _Sensors]])
SQL:
CREATE VIEW `MachineReading` AS  SELECT r.sensorid, r.temperature, r.event_time, s.machineid
                  FROM _SensorReading r JOIN _Sensors FOR SYSTEM_TIME AS OF r.event_time s ON r.sensorid = s.sensorid;

=== MachineTempByHour
ID:          default_catalog.default_database.MachineTempByHour
Type:        stream
Stage:       flink
Primary key: -
Timestamp:   time_hour
Row count:   ~7e7
---
Schema:
 - machineid: INTEGER
 - time_hour: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
 - avg_temperature: FLOAT
Inputs:
 - default_catalog.default_database.MachineReading
Annotations:
 - features: STREAM_WINDOW_AGGREGATION (feature)
 - stream-root: _SensorReading
Plan:
LogicalProject(machineid=[$0], time_hour=[$3], avg_temperature=[$4])
  LogicalAggregate(group=[{0, 1, 2, 3}], avg_temperature=[AVG($4)])
    LogicalProject(machineid=[$3], window_start=[$4], window_end=[$5], time_hour=[$6], temperature=[$1])
      LogicalTableFunctionScan(invocation=[TUMBLE(TABLE(#0), DESCRIPTOR('event_time'), 3600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER sensorid, FLOAT temperature, TIMESTAMP_LTZ(3) *ROWTIME* event_time, INTEGER machineid, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *ROWTIME* window_time)])
        LogicalProject(sensorid=[$0], temperature=[$1], event_time=[$2], machineid=[$3])
          LogicalTableScan(table=[[default_catalog, default_database, MachineReading]])
SQL:
CREATE VIEW `MachineTempByHour` AS  SELECT machineid, window_time as time_hour, AVG(temperature) AS avg_temperature
                     FROM TABLE(TUMBLE(TABLE MachineReading, DESCRIPTOR(event_time), INTERVAL '60' MINUTES))
                     GROUP BY machineid, window_start, window_end, window_time;

=== _SensorReading
ID:          default_catalog.default_database._SensorReading
Type:        stream
Stage:       flink
Primary key: event_id
Timestamp:   event_time
Row count:   ~1e8
---
Schema:
 - event_id: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - sensorid: INTEGER
 - temperature: FLOAT
 - event_time: TIMESTAMP_LTZ(3) *ROWTIME*
Inputs:
 - default_catalog.default_database._SensorReading__base
Annotations:
 - stream-root: _SensorReading
Plan:
LogicalWatermarkAssigner(rowtime=[event_time], watermark=[-($3, 5000:INTERVAL SECOND)])
  LogicalProject(event_id=[$0], sensorid=[$1], temperature=[$2], event_time=[CAST($3):TIMESTAMP_LTZ(3) *ROWTIME*])
    LogicalTableScan(table=[[default_catalog, default_database, _SensorReading, metadata=[timestamp]]])
SQL:
CREATE TABLE `_SensorReading` (
  `event_id` STRING NOT NULL,
  `sensorid` INTEGER,
  `temperature` FLOAT,
  `event_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
  WATERMARK FOR `event_time` AS `event_time` - INTERVAL '5' SECOND
)
WITH (
  'connector' = 'kafka',
  'format' = 'flexible-json',
  'properties.auto.offset.reset' = 'earliest',
  'properties.bootstrap.servers' = '${KAFKA_BOOTSTRAP_SERVERS}',
  'properties.compression.type' = 'zstd',
  'properties.group.id' = '${KAFKA_GROUP_ID}',
  'topic' = 'kafka-mutation-_SensorReading'
)
=== _Sensors
ID:          default_catalog.default_database._Sensors
Type:        state
Stage:       flink
Primary key: sensorid
Timestamp:   updatedTime
Row count:   ~1e8
---
Schema:
 - sensorid: INTEGER NOT NULL
 - machineid: INTEGER
 - updatedTime: TIMESTAMP_LTZ(3) *ROWTIME*
Inputs:
 - default_catalog.default_database._Sensors__base
Plan:
LogicalWatermarkAssigner(rowtime=[updatedTime], watermark=[-($2, 0:INTERVAL SECOND)])
  LogicalProject(sensorid=[$0], machineid=[$1], updatedTime=[CAST($2):TIMESTAMP_LTZ(3) *ROWTIME*])
    LogicalTableScan(table=[[default_catalog, default_database, _Sensors, metadata=[timestamp]]])
SQL:
CREATE TABLE `_Sensors` (
  `sensorid` INTEGER NOT NULL,
  `machineid` INTEGER,
  `updatedTime` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
  PRIMARY KEY (`sensorid`) NOT ENFORCED,
  WATERMARK FOR `updatedTime` AS `updatedTime` - INTERVAL '0.0' SECOND
)
WITH (
  'connector' = 'upsert-kafka',
  'key.format' = 'flexible-json',
  'properties.auto.offset.reset' = 'earliest',
  'properties.bootstrap.servers' = '${KAFKA_BOOTSTRAP_SERVERS}',
  'properties.compression.type' = 'zstd',
  'properties.group.id' = '${KAFKA_GROUP_ID}',
  'topic' = 'kafka-mutation-_Sensors',
  'value.fields-include' = 'ALL',
  'value.format' = 'flexible-json'
)
>>>flink-sql-no-functions.sql
CREATE TABLE `_Sensors` (
  `sensorid` INTEGER,
  `machineid` INTEGER,
  `updatedTime` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
  PRIMARY KEY (`sensorid`) NOT ENFORCED,
  WATERMARK FOR `updatedTime` AS `updatedTime` - INTERVAL '0.0' SECOND
)
WITH (
  'connector' = 'upsert-kafka',
  'key.format' = 'flexible-json',
  'properties.auto.offset.reset' = 'earliest',
  'properties.bootstrap.servers' = '${KAFKA_BOOTSTRAP_SERVERS}',
  'properties.compression.type' = 'zstd',
  'properties.group.id' = '${KAFKA_GROUP_ID}',
  'topic' = 'kafka-mutation-_Sensors',
  'value.fields-include' = 'ALL',
  'value.format' = 'flexible-json'
);
CREATE TABLE `_SensorReading` (
  `event_id` STRING NOT NULL,
  `sensorid` INTEGER,
  `temperature` FLOAT,
  `event_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
  WATERMARK FOR `event_time` AS `event_time` - INTERVAL '5' SECOND
)
WITH (
  'connector' = 'kafka',
  'format' = 'flexible-json',
  'properties.auto.offset.reset' = 'earliest',
  'properties.bootstrap.servers' = '${KAFKA_BOOTSTRAP_SERVERS}',
  'properties.compression.type' = 'zstd',
  'properties.group.id' = '${KAFKA_GROUP_ID}',
  'topic' = 'kafka-mutation-_SensorReading'
);
CREATE VIEW `MachineReading`
AS
SELECT `r`.`sensorid`, `r`.`temperature`, `r`.`event_time`, `s`.`machineid`
FROM `_SensorReading` AS `r`
 INNER JOIN `_Sensors` FOR SYSTEM_TIME AS OF `r`.`event_time` AS `s` ON `r`.`sensorid` = `s`.`sensorid`;
CREATE VIEW `MachineTempByHour`
AS
SELECT `machineid`, `window_time` AS `time_hour`, AVG(`temperature`) AS `avg_temperature`
FROM TABLE(TUMBLE(TABLE `MachineReading`, DESCRIPTOR(`event_time`), INTERVAL '60' MINUTE))
GROUP BY `machineid`, `window_start`, `window_end`, `window_time`;
CREATE TABLE `MachineReading_1` (
  `sensorid` INTEGER,
  `temperature` FLOAT,
  `event_time` TIMESTAMP(3) WITH LOCAL TIME ZONE,
  `machineid` INTEGER,
  `__pk_hash` CHAR(32) CHARACTER SET `UTF-16LE`,
  PRIMARY KEY (`__pk_hash`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'sink.on-conflict.action' = 'IGNORE',
  'table-name' = 'MachineReading_1',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
CREATE TABLE `MachineTempByHour_2` (
  `machineid` INTEGER,
  `time_hour` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  `avg_temperature` FLOAT,
  `__pk_hash` CHAR(32) CHARACTER SET `UTF-16LE`,
  PRIMARY KEY (`__pk_hash`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'sink.on-conflict.action' = 'IGNORE',
  'table-name' = 'MachineTempByHour_2',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
EXECUTE STATEMENT SET BEGIN
INSERT INTO `default_catalog`.`default_database`.`MachineReading_1`
SELECT `sensorid`, `temperature`, `event_time`, `machineid`, `hash_columns`(`sensorid`, `temperature`, `event_time`, `machineid`) AS `__pk_hash`
 FROM `default_catalog`.`default_database`.`MachineReading`
;
INSERT INTO `default_catalog`.`default_database`.`MachineTempByHour_2`
 SELECT `machineid`, `time_hour`, `avg_temperature`, `hash_columns`(`machineid`, `time_hour`, `avg_temperature`) AS `__pk_hash`
  FROM `default_catalog`.`default_database`.`MachineTempByHour`
 ;
 END
>>>kafka.json
{
  "topics" : [
    {
      "topicName" : "kafka-mutation-_SensorReading",
      "tableName" : "_SensorReading",
      "format" : "flexible-json",
      "numPartitions" : 1,
      "replicationFactor" : 3,
      "type" : "MUTATION",
      "messageKeys" : [ ],
      "messageSchema" : "",
      "config" : { }
    },
    {
      "topicName" : "kafka-mutation-_Sensors",
      "tableName" : "_Sensors",
      "format" : "flexible-json",
      "numPartitions" : 1,
      "replicationFactor" : 3,
      "type" : "MUTATION",
      "messageKeys" : [
        "sensorid"
      ],
      "messageSchema" : "",
      "config" : { }
    }
  ],
  "testRunnerTopics" : [ ]
}
>>>postgres.json
{
  "statements" : [
    {
      "name" : "MachineReading_1",
      "type" : "TABLE",
      "sql" : "CREATE TABLE IF NOT EXISTS \"MachineReading_1\" (\"sensorid\" INTEGER, \"temperature\" FLOAT, \"event_time\" TIMESTAMP WITH TIME ZONE, \"machineid\" INTEGER, \"__pk_hash\" TEXT, PRIMARY KEY (\"__pk_hash\"))",
      "fields" : [
        {
          "name" : "sensorid",
          "type" : "INTEGER",
          "nullable" : true
        },
        {
          "name" : "temperature",
          "type" : "FLOAT",
          "nullable" : true
        },
        {
          "name" : "event_time",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : true
        },
        {
          "name" : "machineid",
          "type" : "INTEGER",
          "nullable" : true
        },
        {
          "name" : "__pk_hash",
          "type" : "TEXT",
          "nullable" : true
        }
      ],
      "primaryKey" : [
        "__pk_hash"
      ],
      "partitionKey" : [ ],
      "partitionType" : "NONE",
      "numPartitions" : 0,
      "ttl" : 0.0
    },
    {
      "name" : "MachineTempByHour_2",
      "type" : "TABLE",
      "sql" : "CREATE TABLE IF NOT EXISTS \"MachineTempByHour_2\" (\"machineid\" INTEGER, \"time_hour\" TIMESTAMP WITH TIME ZONE NOT NULL, \"avg_temperature\" FLOAT, \"__pk_hash\" TEXT, PRIMARY KEY (\"__pk_hash\"))",
      "fields" : [
        {
          "name" : "machineid",
          "type" : "INTEGER",
          "nullable" : true
        },
        {
          "name" : "time_hour",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : false
        },
        {
          "name" : "avg_temperature",
          "type" : "FLOAT",
          "nullable" : true
        },
        {
          "name" : "__pk_hash",
          "type" : "TEXT",
          "nullable" : true
        }
      ],
      "primaryKey" : [
        "__pk_hash"
      ],
      "partitionKey" : [ ],
      "partitionType" : "NONE",
      "numPartitions" : 0,
      "ttl" : 0.0
    },
    {
      "name" : "MachineReading",
      "type" : "VIEW",
      "sql" : "CREATE OR REPLACE VIEW \"MachineReading\"(\"sensorid\", \"temperature\", \"event_time\", \"machineid\") AS SELECT \"sensorid\", \"temperature\", \"event_time\", \"machineid\"\nFROM \"MachineReading_1\"",
      "fields" : [
        {
          "name" : "sensorid",
          "type" : "INTEGER",
          "nullable" : true
        },
        {
          "name" : "temperature",
          "type" : "FLOAT",
          "nullable" : true
        },
        {
          "name" : "event_time",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : true
        },
        {
          "name" : "machineid",
          "type" : "INTEGER",
          "nullable" : true
        }
      ]
    },
    {
      "name" : "MachineTempByHour",
      "type" : "VIEW",
      "sql" : "CREATE OR REPLACE VIEW \"MachineTempByHour\"(\"machineid\", \"time_hour\", \"avg_temperature\") AS SELECT \"machineid\", \"time_hour\", \"avg_temperature\"\nFROM \"MachineTempByHour_2\"",
      "fields" : [
        {
          "name" : "machineid",
          "type" : "INTEGER",
          "nullable" : true
        },
        {
          "name" : "time_hour",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : false
        },
        {
          "name" : "avg_temperature",
          "type" : "FLOAT",
          "nullable" : true
        }
      ]
    }
  ]
}
>>>vertx.json
{
  "models" : {
    "v1" : {
      "queries" : [
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "MachineReading",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT \"sensorid\", \"temperature\", \"event_time\", \"machineid\"\nFROM \"MachineReading_1\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        },
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "MachineTempByHour",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT \"machineid\", \"time_hour\", \"avg_temperature\"\nFROM \"MachineTempByHour_2\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        }
      ],
      "mutations" : [
        {
          "type" : "kafka",
          "fieldName" : "_SensorReading",
          "returnList" : false,
          "topic" : "kafka-mutation-_SensorReading",
          "keyColumns" : [ ],
          "computedColumns" : {
            "event_id" : {
              "metadataType" : "UUID",
              "name" : "",
              "required" : true
            },
            "event_time" : {
              "metadataType" : "TIMESTAMP",
              "name" : "",
              "required" : false
            }
          },
          "transactional" : false,
          "sinkConfig" : { }
        },
        {
          "type" : "kafka",
          "fieldName" : "_Sensors",
          "returnList" : false,
          "topic" : "kafka-mutation-_Sensors",
          "keyColumns" : [
            "sensorid"
          ],
          "computedColumns" : {
            "updatedTime" : {
              "metadataType" : "TIMESTAMP",
              "name" : "",
              "required" : false
            }
          },
          "transactional" : false,
          "sinkConfig" : { }
        }
      ],
      "subscriptions" : [ ],
      "operations" : [
        {
          "function" : {
            "name" : "GetMachineReading",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query MachineReading($limit: Int = 10, $offset: Int = 0) {\nMachineReading(limit: $limit, offset: $offset) {\nsensorid\ntemperature\nevent_time\nmachineid\n}\n\n}",
            "queryName" : "MachineReading",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/MachineReading{?offset,limit}"
        },
        {
          "function" : {
            "name" : "GetMachineTempByHour",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query MachineTempByHour($limit: Int = 10, $offset: Int = 0) {\nMachineTempByHour(limit: $limit, offset: $offset) {\nmachineid\ntime_hour\navg_temperature\n}\n\n}",
            "queryName" : "MachineTempByHour",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/MachineTempByHour{?offset,limit}"
        }
      ],
      "schema" : {
        "type" : "string",
        "schema" : "\"An RFC-3339 compliant Full Date Scalar\"\nscalar Date\n\n\"A DateTime scalar that handles both full RFC3339 and shorter timestamp formats\"\nscalar DateTime\n\n\"A JSON scalar\"\nscalar JSON\n\n\"24-hour clock time value string in the format `hh:mm:ss` or `hh:mm:ss.sss`.\"\nscalar LocalTime\n\n\"A 64-bit signed integer\"\nscalar Long\n\ntype MachineReading {\n  sensorid: Int\n  temperature: Float\n  event_time: DateTime\n  machineid: Int\n}\n\ntype MachineTempByHour {\n  machineid: Int\n  time_hour: DateTime!\n  avg_temperature: Float\n}\n\ntype Mutation {\n  _SensorReading(event: _SensorReadingInput!): _SensorReadingResultOutput!\n  _Sensors(event: _SensorsInput!): _SensorsResultOutput!\n}\n\ntype Query {\n  MachineReading(limit: Int = 10, offset: Int = 0): [MachineReading!]\n  MachineTempByHour(limit: Int = 10, offset: Int = 0): [MachineTempByHour!]\n}\n\nenum _McpMethodType {\n  NONE\n  TOOL\n  RESOURCE\n}\n\nenum _RestMethodType {\n  NONE\n  GET\n  POST\n}\n\ninput _SensorReadingInput {\n  sensorid: Int\n  temperature: Float\n}\n\ntype _SensorReadingResultOutput {\n  event_id: String!\n  sensorid: Int\n  temperature: Float\n  event_time: DateTime\n}\n\ninput _SensorsInput {\n  sensorid: Int!\n  machineid: Int\n}\n\ntype _SensorsResultOutput {\n  sensorid: Int!\n  machineid: Int\n  updatedTime: DateTime\n}\n\ndirective @api(mcp: _McpMethodType, rest: _RestMethodType, uri: String) on QUERY | MUTATION | FIELD_DEFINITION\n"
      }
    }
  }
}
