>>>pipeline_explain.txt
=== SensorReading
ID:          default_catalog.default_database.SensorReading
Type:        stream
Stage:       flink
Primary key: -
Timestamp:   timestamp
Row count:   ~1e8
---
Schema:
 - uuid: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - sensorid: BIGINT NOT NULL
 - epoch_timestamp: BIGINT NOT NULL
 - temperature: DOUBLE NOT NULL
 - timestamp: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
Inputs:
 - default_catalog.default_database.SensorReading__base
Plan:
LogicalWatermarkAssigner(rowtime=[timestamp], watermark=[-($4, 1:INTERVAL SECOND)])
  LogicalProject(uuid=[$0], sensorid=[$1], epoch_timestamp=[$2], temperature=[$3], timestamp=[COALESCE(TO_TIMESTAMP_LTZ($2, 3), 1970-01-01 08:00:00:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))])
    LogicalTableScan(table=[[default_catalog, default_database, SensorReading]])
SQL:
CREATE TABLE `SensorReading` (
  `uuid` STRING NOT NULL,
  `sensorid` BIGINT NOT NULL,
  `epoch_timestamp` BIGINT NOT NULL,
  `temperature` DOUBLE NOT NULL,
  `timestamp` AS COALESCE(`TO_TIMESTAMP_LTZ`(`epoch_timestamp`, 3), CAST(TIMESTAMP '1970-01-01 00:00:00.000' AS TIMESTAMP(3) WITH LOCAL TIME ZONE)),
  WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '0.001' SECOND
)
WITH (
  'connector' = 'datagen',
  'number-of-rows' = '1000',
  'fields.sensorid.kind' = 'sequence',
  'fields.sensorid.start' = '0',
  'fields.sensorid.end' = '9',
  'fields.epoch_timestamp.kind' = 'sequence',
  'fields.epoch_timestamp.start' = '1719318565000',
  'fields.epoch_timestamp.end' = '1719319565000',
  'fields.uuid.kind' = 'random',
  'fields.temperature.kind' = 'random'
)
=== SensorTempByHour
ID:          default_catalog.default_database.SensorTempByHour
Type:        stream
Stage:       flink
Primary key: sensorid, time_hour
Timestamp:   time_hour
Row count:   ~7e7
---
Schema:
 - sensorid: BIGINT NOT NULL
 - time_hour: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
 - avg_temperature: DOUBLE NOT NULL
Inputs:
 - default_catalog.default_database.SensorReading
Annotations:
 - features: STREAM_WINDOW_AGGREGATION (feature)
Plan:
LogicalProject(sensorid=[$0], time_hour=[$3], avg_temperature=[$4])
  LogicalAggregate(group=[{0, 1, 2, 3}], avg_temperature=[AVG($4)])
    LogicalProject(sensorid=[$1], window_start=[$5], window_end=[$6], time_hour=[$7], temperature=[$3])
      LogicalTableFunctionScan(invocation=[TUMBLE(TABLE(#0), DESCRIPTOR('timestamp'), 3600000:INTERVAL MINUTE)], rowType=[RecordType(VARCHAR(2147483647) uuid, BIGINT sensorid, BIGINT epoch_timestamp, DOUBLE temperature, TIMESTAMP_LTZ(3) *ROWTIME* timestamp, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *ROWTIME* window_time)])
        LogicalProject(uuid=[$0], sensorid=[$1], epoch_timestamp=[$2], temperature=[$3], timestamp=[$4])
          LogicalTableScan(table=[[default_catalog, default_database, SensorReading]])
SQL:
CREATE VIEW `SensorTempByHour` AS  SELECT sensorid, window_time as time_hour, AVG(temperature) AS avg_temperature
                     FROM TABLE(TUMBLE(TABLE SensorReading, DESCRIPTOR(`timestamp`), INTERVAL '60' MINUTES))
                     GROUP BY sensorid, window_start, window_end, window_time;

>>>flink-sql-no-functions.sql
CREATE TABLE `SensorReading` (
  `uuid` STRING NOT NULL,
  `sensorid` BIGINT NOT NULL,
  `epoch_timestamp` BIGINT NOT NULL,
  `temperature` DOUBLE NOT NULL,
  `timestamp` AS COALESCE(`TO_TIMESTAMP_LTZ`(`epoch_timestamp`, 3), TIMESTAMP '1970-01-01 00:00:00.000'),
  WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '0.001' SECOND
)
WITH (
  'connector' = 'datagen',
  'number-of-rows' = '1000',
  'fields.sensorid.kind' = 'sequence',
  'fields.sensorid.start' = '0',
  'fields.sensorid.end' = '9',
  'fields.epoch_timestamp.kind' = 'sequence',
  'fields.epoch_timestamp.start' = '1719318565000',
  'fields.epoch_timestamp.end' = '1719319565000',
  'fields.uuid.kind' = 'random',
  'fields.temperature.kind' = 'random'
);
CREATE VIEW `SensorTempByHour`
AS
SELECT `sensorid`, `window_time` AS `time_hour`, AVG(`temperature`) AS `avg_temperature`
FROM TABLE(TUMBLE(TABLE `SensorReading`, DESCRIPTOR(`timestamp`), INTERVAL '60' MINUTE))
GROUP BY `sensorid`, `window_start`, `window_end`, `window_time`;
CREATE TABLE `SensorReading_1` (
  `uuid` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `sensorid` BIGINT NOT NULL,
  `epoch_timestamp` BIGINT NOT NULL,
  `temperature` DOUBLE NOT NULL,
  `timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  `__pk_hash` CHAR(32) CHARACTER SET `UTF-16LE`,
  PRIMARY KEY (`__pk_hash`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'sink.on-conflict.action' = 'IGNORE',
  'table-name' = 'SensorReading_1',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
CREATE TABLE `SensorTempByHour_2` (
  `sensorid` BIGINT NOT NULL,
  `time_hour` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  `avg_temperature` DOUBLE NOT NULL,
  PRIMARY KEY (`sensorid`, `time_hour`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'sink.on-conflict.action' = 'IGNORE',
  'table-name' = 'SensorTempByHour_2',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
EXECUTE STATEMENT SET BEGIN
INSERT INTO `default_catalog`.`default_database`.`SensorReading_1`
SELECT `uuid`, `sensorid`, `epoch_timestamp`, `temperature`, `timestamp`, `hash_columns`(`uuid`, `sensorid`, `epoch_timestamp`, `temperature`, `timestamp`) AS `__pk_hash`
 FROM `default_catalog`.`default_database`.`SensorReading`
;
INSERT INTO `default_catalog`.`default_database`.`SensorTempByHour_2`
 SELECT *
  FROM `default_catalog`.`default_database`.`SensorTempByHour`
 ;
 END
>>>kafka.json
{
  "topics" : [ ],
  "testRunnerTopics" : [ ]
}
>>>postgres.json
{
  "statements" : [
    {
      "name" : "SensorReading_1",
      "type" : "TABLE",
      "sql" : "CREATE TABLE IF NOT EXISTS \"SensorReading_1\" (\"uuid\" TEXT NOT NULL, \"sensorid\" BIGINT NOT NULL, \"epoch_timestamp\" BIGINT NOT NULL, \"temperature\" DOUBLE PRECISION NOT NULL, \"timestamp\" TIMESTAMP WITH TIME ZONE NOT NULL, \"__pk_hash\" TEXT, PRIMARY KEY (\"__pk_hash\"))",
      "fields" : [
        {
          "name" : "uuid",
          "type" : "TEXT",
          "nullable" : false
        },
        {
          "name" : "sensorid",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "epoch_timestamp",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "temperature",
          "type" : "DOUBLE PRECISION",
          "nullable" : false
        },
        {
          "name" : "timestamp",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : false
        },
        {
          "name" : "__pk_hash",
          "type" : "TEXT",
          "nullable" : true
        }
      ],
      "primaryKey" : [
        "__pk_hash"
      ],
      "partitionKey" : [ ],
      "partitionType" : "NONE",
      "numPartitions" : 0,
      "ttl" : 0.0
    },
    {
      "name" : "SensorTempByHour_2",
      "type" : "TABLE",
      "sql" : "CREATE TABLE IF NOT EXISTS \"SensorTempByHour_2\" (\"sensorid\" BIGINT NOT NULL, \"time_hour\" TIMESTAMP WITH TIME ZONE NOT NULL, \"avg_temperature\" DOUBLE PRECISION NOT NULL, PRIMARY KEY (\"sensorid\",\"time_hour\"))",
      "fields" : [
        {
          "name" : "sensorid",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "time_hour",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : false
        },
        {
          "name" : "avg_temperature",
          "type" : "DOUBLE PRECISION",
          "nullable" : false
        }
      ],
      "primaryKey" : [
        "sensorid",
        "time_hour"
      ],
      "partitionKey" : [ ],
      "partitionType" : "NONE",
      "numPartitions" : 0,
      "ttl" : 0.0
    },
    {
      "name" : "SensorReading",
      "type" : "VIEW",
      "sql" : "CREATE OR REPLACE VIEW \"SensorReading\"(\"uuid\", \"sensorid\", \"epoch_timestamp\", \"temperature\", \"timestamp\") AS SELECT \"uuid\", \"sensorid\", \"epoch_timestamp\", \"temperature\", \"timestamp\"\nFROM \"SensorReading_1\"",
      "fields" : [
        {
          "name" : "uuid",
          "type" : "TEXT",
          "nullable" : false
        },
        {
          "name" : "sensorid",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "epoch_timestamp",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "temperature",
          "type" : "DOUBLE PRECISION",
          "nullable" : false
        },
        {
          "name" : "timestamp",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : false
        }
      ]
    },
    {
      "name" : "SensorTempByHour",
      "type" : "VIEW",
      "sql" : "CREATE OR REPLACE VIEW \"SensorTempByHour\"(\"sensorid\", \"time_hour\", \"avg_temperature\") AS SELECT *\nFROM \"SensorTempByHour_2\"",
      "fields" : [
        {
          "name" : "sensorid",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "time_hour",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : false
        },
        {
          "name" : "avg_temperature",
          "type" : "DOUBLE PRECISION",
          "nullable" : false
        }
      ]
    }
  ]
}
>>>vertx.json
{
  "models" : {
    "v1" : {
      "queries" : [
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "SensorReading",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT \"uuid\", \"sensorid\", \"epoch_timestamp\", \"temperature\", \"timestamp\"\nFROM \"SensorReading_1\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        },
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "SensorTempByHour",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM \"SensorTempByHour_2\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        }
      ],
      "mutations" : [ ],
      "subscriptions" : [ ],
      "operations" : [
        {
          "function" : {
            "name" : "GetSensorReading",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query SensorReading($limit: Int = 10, $offset: Int = 0) {\nSensorReading(limit: $limit, offset: $offset) {\nuuid\nsensorid\nepoch_timestamp\ntemperature\ntimestamp\n}\n\n}",
            "queryName" : "SensorReading",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/SensorReading{?offset,limit}"
        },
        {
          "function" : {
            "name" : "GetSensorTempByHour",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query SensorTempByHour($limit: Int = 10, $offset: Int = 0) {\nSensorTempByHour(limit: $limit, offset: $offset) {\nsensorid\ntime_hour\navg_temperature\n}\n\n}",
            "queryName" : "SensorTempByHour",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/SensorTempByHour{?offset,limit}"
        }
      ],
      "schema" : {
        "type" : "string",
        "schema" : "\"An RFC-3339 compliant Full Date Scalar\"\nscalar Date\n\n\"A DateTime scalar that handles both full RFC3339 and shorter timestamp formats\"\nscalar DateTime\n\n\"A JSON scalar\"\nscalar JSON\n\n\"24-hour clock time value string in the format `hh:mm:ss` or `hh:mm:ss.sss`.\"\nscalar LocalTime\n\n\"A 64-bit signed integer\"\nscalar Long\n\ntype Query {\n  SensorReading(limit: Int = 10, offset: Int = 0): [SensorReading!]\n  SensorTempByHour(limit: Int = 10, offset: Int = 0): [SensorTempByHour!]\n}\n\ntype SensorReading {\n  uuid: String!\n  sensorid: Long!\n  epoch_timestamp: Long!\n  temperature: Float!\n  timestamp: DateTime!\n}\n\ntype SensorTempByHour {\n  sensorid: Long!\n  time_hour: DateTime!\n  avg_temperature: Float!\n}\n\nenum _McpMethodType {\n  NONE\n  TOOL\n  RESOURCE\n}\n\nenum _RestMethodType {\n  NONE\n  GET\n  POST\n}\n\ndirective @api(mcp: _McpMethodType, rest: _RestMethodType, uri: String) on QUERY | MUTATION | FIELD_DEFINITION\n"
      }
    }
  }
}
