>>>pipeline_explain.txt
=== HighTempAlert
ID:          default_catalog.default_database.HighTempAlert
Type:        stream
Stage:       flink
Primary key: uuid
Timestamp:   event_time
Row count:   ~5e7
---
Schema:
 - uuid: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - sensorid: INTEGER NOT NULL
 - temperature: FLOAT NOT NULL
 - event_time: TIMESTAMP_LTZ(3) *ROWTIME*
Inputs:
 - default_catalog.default_database.SensorReading
Annotations:
 - stream-root: SensorReading

=== SensorAnalysis
ID:          default_catalog.default_database.SensorAnalysis
Type:        state
Stage:       flink
Primary key: sensorid
Timestamp:   -
Row count:   ~1e7
---
Schema:
 - sensorid: INTEGER NOT NULL
 - avg_temperatures: FLOAT NOT NULL
 - max_temperature: FLOAT NOT NULL
Inputs:
 - default_catalog.default_database.SensorReading
Annotations:
 - stream-root: SensorReading
 - sort: [0 DESC-nulls-last]

=== SensorAnalysisById
ID:          default_catalog.default_database.SensorAnalysisById
Type:        query
Stage:       postgres
---
Inputs:
 - default_catalog.default_database.SensorAnalysis
Annotations:
 - stream-root: SensorReading
 - parameters: sensorid
 - base-table: SensorAnalysis

=== SensorMaxTempLastMin
ID:          default_catalog.default_database.SensorMaxTempLastMin
Type:        state
Stage:       flink
Primary key: sensorid
Timestamp:   endOfMin
Row count:   ~2e7
---
Schema:
 - sensorid: INTEGER NOT NULL
 - endOfMin: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
 - avg_temperature: FLOAT NOT NULL
Inputs:
 - default_catalog.default_database._SensorMaxTempLastMinWindow
Annotations:
 - mostRecentDistinct: true
 - stream-root: SensorReading

=== SensorReading
ID:          default_catalog.default_database.SensorReading
Type:        stream
Stage:       flink
Primary key: uuid
Timestamp:   event_time
Row count:   ~1e8
---
Schema:
 - uuid: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - sensorid: INTEGER NOT NULL
 - temperature: FLOAT NOT NULL
 - event_time: TIMESTAMP_LTZ(3) *ROWTIME*
Inputs:
 - default_catalog.default_database.SensorReading__base
Annotations:
 - stream-root: SensorReading

=== SensorReadingById
ID:          default_catalog.default_database.SensorReadingById
Type:        query
Stage:       postgres
---
Inputs:
 - default_catalog.default_database.SensorReading
Annotations:
 - stream-root: SensorReading
 - parameters: sensorid
 - base-table: SensorReading

=== _SensorMaxTempLastMinWindow
ID:          default_catalog.default_database._SensorMaxTempLastMinWindow
Type:        stream
Stage:       flink
Primary key: -
Timestamp:   endOfMin
Row count:   ~1e8
---
Schema:
 - sensorid: INTEGER NOT NULL
 - endOfMin: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
 - avg_temperature: FLOAT NOT NULL
Inputs:
 - default_catalog.default_database.SensorReading
Annotations:
 - features: STREAM_WINDOW_AGGREGATION (feature)
 - stream-root: SensorReading

>>>flink-sql-no-functions.sql
CREATE TABLE `SensorReading` (
  `uuid` STRING NOT NULL,
  `sensorid` INTEGER NOT NULL,
  `temperature` FLOAT NOT NULL,
  `event_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
  WATERMARK FOR `event_time` AS `event_time` - INTERVAL '0.0' SECOND
)
WITH (
  'connector' = 'kafka',
  'format' = 'flexible-json',
  'properties.auto.offset.reset' = 'earliest',
  'properties.bootstrap.servers' = '${KAFKA_BOOTSTRAP_SERVERS}',
  'properties.compression.type' = 'zstd',
  'properties.group.id' = '${KAFKA_GROUP_ID}',
  'topic' = 'SensorReading'
);
CREATE VIEW `HighTempAlert`
AS
SELECT *
FROM `SensorReading`
WHERE `temperature` > 50;
CREATE VIEW `_SensorMaxTempLastMinWindow`
AS
SELECT `sensorid`, `window_time` AS `endOfMin`, AVG(`temperature`) AS `avg_temperature`
FROM TABLE(HOP(TABLE `SensorReading`, DESCRIPTOR(`event_time`), INTERVAL '10' SECOND, INTERVAL '1' MINUTE))
GROUP BY `sensorid`, `window_start`, `window_end`, `window_time`;
CREATE VIEW `SensorMaxTempLastMin`
AS
SELECT `sensorid`, `endOfMin`, `avg_temperature`
FROM (SELECT `sensorid`, `endOfMin`, `avg_temperature`, ROW_NUMBER() OVER (PARTITION BY `sensorid` ORDER BY `endOfMin` DESC NULLS LAST) AS `__sqrlinternal_rownum`
  FROM `default_catalog`.`default_database`.`_SensorMaxTempLastMinWindow`) AS `t`
WHERE `__sqrlinternal_rownum` = 1;
CREATE VIEW `SensorAnalysis`
AS
SELECT `sensorid`, AVG(`temperature`) AS `avg_temperatures`, MAX(`temperature`) AS `max_temperature`
FROM `SensorReading`
GROUP BY `sensorid`;
CREATE TABLE `HighTempAlert_1` (
  `uuid` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `sensorid` INTEGER NOT NULL,
  `temperature` FLOAT NOT NULL,
  `event_time` TIMESTAMP(3) WITH LOCAL TIME ZONE
)
WITH (
  'connector' = 'kafka',
  'format' = 'flexible-json',
  'properties.bootstrap.servers' = '${KAFKA_BOOTSTRAP_SERVERS}',
  'properties.compression.type' = 'zstd',
  'properties.group.id' = '${KAFKA_GROUP_ID}',
  'topic' = 'HighTempAlert'
);
CREATE TABLE `SensorAnalysis_2` (
  `sensorid` INTEGER NOT NULL,
  `avg_temperatures` FLOAT NOT NULL,
  `max_temperature` FLOAT NOT NULL,
  PRIMARY KEY (`sensorid`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'table-name' = 'SensorAnalysis',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
CREATE TABLE `SensorMaxTempLastMin_3` (
  `sensorid` INTEGER NOT NULL,
  `endOfMin` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  `avg_temperature` FLOAT NOT NULL,
  PRIMARY KEY (`sensorid`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'sink.on-conflict.action' = 'TIMESTAMP',
  'sink.on-conflict.timestamp-column' = 'endOfMin',
  'table-name' = 'SensorMaxTempLastMin',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
CREATE TABLE `SensorReading_4` (
  `uuid` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `sensorid` INTEGER NOT NULL,
  `temperature` FLOAT NOT NULL,
  `event_time` TIMESTAMP(3) WITH LOCAL TIME ZONE,
  PRIMARY KEY (`uuid`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'sink.on-conflict.action' = 'IGNORE',
  'table-name' = 'SensorReading',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
EXECUTE STATEMENT SET BEGIN
INSERT INTO `default_catalog`.`default_database`.`HighTempAlert_1`
SELECT *
 FROM `default_catalog`.`default_database`.`HighTempAlert`
;
INSERT INTO `default_catalog`.`default_database`.`SensorAnalysis_2`
 SELECT *
  FROM `default_catalog`.`default_database`.`SensorAnalysis`
 ;
 INSERT INTO `default_catalog`.`default_database`.`SensorMaxTempLastMin_3`
  SELECT *
   FROM `default_catalog`.`default_database`.`_SensorMaxTempLastMinWindow`
  ;
  INSERT INTO `default_catalog`.`default_database`.`SensorReading_4`
   SELECT *
    FROM `default_catalog`.`default_database`.`SensorReading`
   ;
   END
>>>kafka.json
{
  "topics" : [
    {
      "topicName" : "HighTempAlert",
      "tableName" : "HighTempAlert_1",
      "format" : "flexible-json",
      "numPartitions" : 1,
      "replicationFactor" : 3,
      "type" : "SUBSCRIPTION",
      "messageKeys" : [ ],
      "messageSchema" : "",
      "config" : { }
    },
    {
      "topicName" : "SensorReading",
      "tableName" : "SensorReading",
      "format" : "flexible-json",
      "numPartitions" : 1,
      "replicationFactor" : 3,
      "type" : "MUTATION",
      "messageKeys" : [ ],
      "messageSchema" : "",
      "config" : { }
    }
  ],
  "testRunnerTopics" : [ ]
}
>>>postgres-schema.sql
CREATE TABLE IF NOT EXISTS "SensorAnalysis" ("sensorid" INTEGER NOT NULL, "avg_temperatures" FLOAT NOT NULL, "max_temperature" FLOAT NOT NULL, PRIMARY KEY ("sensorid"));
CREATE TABLE IF NOT EXISTS "SensorMaxTempLastMin" ("sensorid" INTEGER NOT NULL, "endOfMin" TIMESTAMP WITH TIME ZONE NOT NULL, "avg_temperature" FLOAT NOT NULL, PRIMARY KEY ("sensorid"));
CREATE TABLE IF NOT EXISTS "SensorReading" ("uuid" TEXT NOT NULL, "sensorid" INTEGER NOT NULL, "temperature" FLOAT NOT NULL, "event_time" TIMESTAMP WITH TIME ZONE, PRIMARY KEY ("uuid"));

CREATE INDEX IF NOT EXISTS "SensorReading_hash_c1" ON "SensorReading" USING hash ("sensorid")
>>>postgres-views.sql

>>>vertx.json
{
  "models" : {
    "v1" : {
      "queries" : [
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "SensorMaxTempLastMin",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM \"SensorMaxTempLastMin\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        },
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "SensorAnalysisById",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              },
              {
                "type" : "variable",
                "path" : "sensorid"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM (SELECT \"sensorid\", \"avg_temperatures\", \"max_temperature\"\n  FROM \"SensorAnalysis\"\n  ORDER BY \"sensorid\" DESC NULLS LAST) AS \"t\"\nWHERE \"sensorid\" = $1",
              "parameters" : [
                {
                  "type" : "arg",
                  "path" : "sensorid",
                  "sqlType" : "INTEGER"
                }
              ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        },
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "SensorReadingById",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              },
              {
                "type" : "variable",
                "path" : "sensorid"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM \"SensorReading\"\nWHERE \"sensorid\" = $1\nORDER BY \"event_time\" DESC NULLS LAST",
              "parameters" : [
                {
                  "type" : "arg",
                  "path" : "sensorid",
                  "sqlType" : "INTEGER"
                }
              ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        }
      ],
      "mutations" : [
        {
          "type" : "kafka",
          "fieldName" : "SensorReading",
          "returnList" : false,
          "topic" : "SensorReading",
          "keyColumns" : [ ],
          "computedColumns" : {
            "uuid" : {
              "metadataType" : "UUID",
              "name" : "",
              "required" : true
            },
            "event_time" : {
              "metadataType" : "TIMESTAMP",
              "name" : "",
              "required" : false
            }
          },
          "transactional" : false,
          "sinkConfig" : { }
        }
      ],
      "subscriptions" : [
        {
          "type" : "kafka",
          "fieldName" : "HighTempAlert",
          "topic" : "HighTempAlert",
          "sinkConfig" : { },
          "equalityConditions" : { }
        }
      ],
      "operations" : [
        {
          "function" : {
            "name" : "GetSensorMaxTempLastMin",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query SensorMaxTempLastMin($limit: Int = 10, $offset: Int = 0) {\nSensorMaxTempLastMin(limit: $limit, offset: $offset) {\nsensorid\nendOfMin\navg_temperature\n}\n\n}",
            "queryName" : "SensorMaxTempLastMin",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/SensorMaxTempLastMin{?offset,limit}"
        },
        {
          "function" : {
            "name" : "GetSensorAnalysisById",
            "description" : "Analyzes the sensor readings for the given sensor with average and max temperatures.",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                },
                "sensorid" : {
                  "type" : "integer"
                }
              },
              "required" : [
                "sensorid"
              ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query SensorAnalysisById($sensorid: Int!, $limit: Int = 10, $offset: Int = 0) {\nSensorAnalysisById(sensorid: $sensorid, limit: $limit, offset: $offset) {\nsensorid\navg_temperatures\nmax_temperature\n}\n\n}",
            "queryName" : "SensorAnalysisById",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/SensorAnalysisById{?offset,limit,sensorid}"
        },
        {
          "function" : {
            "name" : "GetSensorReadingById",
            "description" : "Returns the most recent sensor readings for the given sensor",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                },
                "sensorid" : {
                  "type" : "integer"
                }
              },
              "required" : [
                "sensorid"
              ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query SensorReadingById($sensorid: Int!, $limit: Int = 10, $offset: Int = 0) {\nSensorReadingById(sensorid: $sensorid, limit: $limit, offset: $offset) {\nuuid\nsensorid\ntemperature\nevent_time\n}\n\n}",
            "queryName" : "SensorReadingById",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/SensorReadingById{?offset,limit,sensorid}"
        },
        {
          "function" : {
            "name" : "AddSensorReading",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "temperature" : {
                  "type" : "number"
                },
                "sensorid" : {
                  "type" : "integer"
                }
              },
              "required" : [
                "sensorid",
                "temperature"
              ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "mutation SensorReading($sensorid: Int!, $temperature: Float!) {\nSensorReading(event: { sensorid: $sensorid, temperature: $temperature }) {\nuuid\nsensorid\ntemperature\nevent_time\n}\n\n}",
            "queryName" : "SensorReading",
            "operationType" : "MUTATION"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "POST",
          "uriTemplate" : "mutations/SensorReading"
        }
      ],
      "schema" : {
        "type" : "string",
        "schema" : "\"A slightly refined version of RFC-3339 compliant DateTime Scalar\"\nscalar DateTime\n\n\"A 64-bit signed integer\"\nscalar Long\n\ntype Mutation {\n  SensorReading(event: SensorReadingInput!): SensorReadingResultOutput!\n}\n\ntype Query {\n  SensorMaxTempLastMin(limit: Int = 10, offset: Int = 0): [SensorMaxTempLastMin!]\n  \"Analyzes the sensor readings for the given sensor with average and max temperatures.\"\n  SensorAnalysisById(sensorid: Int!, limit: Int = 10, offset: Int = 0): [SensorAnalysis!]\n  \"Returns the most recent sensor readings for the given sensor\"\n  SensorReadingById(sensorid: Int!, limit: Int = 10, offset: Int = 0): [SensorReading!]\n}\n\ntype SensorAnalysis {\n  sensorid: Int!\n  avg_temperatures: Float!\n  max_temperature: Float!\n}\n\ntype SensorMaxTempLastMin {\n  sensorid: Int!\n  endOfMin: DateTime!\n  avg_temperature: Float!\n}\n\ntype SensorReading {\n  uuid: String!\n  sensorid: Int!\n  temperature: Float!\n  event_time: DateTime\n}\n\ninput SensorReadingInput {\n  sensorid: Int!\n  temperature: Float!\n}\n\ntype SensorReadingResultOutput {\n  uuid: String!\n  sensorid: Int!\n  temperature: Float!\n  event_time: DateTime\n}\n\ntype Subscription {\n  HighTempAlert: SensorReading\n}\n\nenum _McpMethodType {\n  NONE\n  TOOL\n  RESOURCE\n}\n\nenum _RestMethodType {\n  NONE\n  GET\n  POST\n}\n\ndirective @api(mcp: _McpMethodType, rest: _RestMethodType, uri: String) on QUERY | MUTATION | FIELD_DEFINITION\n"
      }
    }
  }
}
