>>>inferred_schema.graphqls
"An RFC-3339 compliant Full Date Scalar"
scalar Date

"A DateTime scalar that handles both full RFC3339 and shorter timestamp formats"
scalar DateTime

"A JSON scalar"
scalar JSON

"24-hour clock time value string in the format `hh:mm:ss` or `hh:mm:ss.sss`."
scalar LocalTime

"A 64-bit signed integer"
scalar Long

type Mutation {
  SensorReading(event: [SensorReadingInput!]!): [SensorReadingResultOutput!]!
}

type Query {
  HighTempAlert(limit: Int = 10, offset: Int = 0): [SensorReading!]
  SensorMaxTemp(limit: Int = 10, offset: Int = 0): [SensorMaxTemp!]
  SensorReading(limit: Int = 10, offset: Int = 0): [SensorReading!]
}

type SensorMaxTemp {
  sensorid: Int!
  maxTemp: Float!
}

type SensorReading {
  sensorid: Int!
  temperature: Float!
  event_time: DateTime
}

input SensorReadingInput {
  sensorid: Int!
  temperature: Float!
}

type SensorReadingResultOutput {
  sensorid: Int!
  temperature: Float!
  event_time: DateTime
}

type Subscription {
  SensorReadingSubscription: SensorReading
  SensorSubscriptionById(sensorid: Int!): SensorReading
}

enum _McpMethodType {
  NONE
  TOOL
  RESOURCE
}

enum _RestMethodType {
  NONE
  GET
  POST
}

directive @api(mcp: _McpMethodType, rest: _RestMethodType, uri: String) on QUERY | MUTATION | FIELD_DEFINITION

>>>pipeline_explain.txt
=== HighTempAlert
ID:          default_catalog.default_database.HighTempAlert
Type:        stream
Stage:       flink
Primary key: -
Timestamp:   event_time
Row count:   ~5e7
---
Schema:
 - sensorid: INTEGER NOT NULL
 - temperature: DECIMAL(8, 2) NOT NULL
 - event_time: TIMESTAMP_LTZ(3) *ROWTIME*
Inputs:
 - default_catalog.default_database.SensorReading
Annotations:
 - sort: [0 DESC-nulls-last]

=== SensorMaxTemp
ID:          default_catalog.default_database.SensorMaxTemp
Type:        state
Stage:       flink
Primary key: sensorid
Timestamp:   -
Row count:   ~1e7
---
Schema:
 - sensorid: INTEGER NOT NULL
 - maxTemp: DECIMAL(8, 2) NOT NULL
Inputs:
 - default_catalog.default_database.SensorReading
Annotations:
 - sort: [0 DESC-nulls-last]

=== SensorReading
ID:          default_catalog.default_database.SensorReading
Type:        stream
Stage:       flink
Primary key: -
Timestamp:   event_time
Row count:   ~1e8
---
Schema:
 - sensorid: INTEGER NOT NULL
 - temperature: DECIMAL(8, 2) NOT NULL
 - event_time: TIMESTAMP_LTZ(3) *ROWTIME*
Inputs:
 - default_catalog.default_database.SensorReading__base

=== SensorReadingSubscription
ID:          default_catalog.default_database.SensorReadingSubscription
Type:        stream
Stage:       flink
Primary key: -
Timestamp:   event_time
Row count:   ~1e8
---
Schema:
 - sensorid: INTEGER NOT NULL
 - temperature: DECIMAL(8, 2) NOT NULL
 - event_time: TIMESTAMP_LTZ(3) *ROWTIME*
Inputs:
 - default_catalog.default_database.SensorReading

=== SensorSubscriptionById
ID:          default_catalog.default_database.SensorSubscriptionById
Type:        query
Stage:       kafka
---
Inputs:
 - default_catalog.default_database.SensorReadingSubscription
Annotations:
 - parameters: sensorid
 - base-table: SensorReading

>>>flink-sql-no-functions.sql
CREATE TABLE `SensorReading` (
  `sensorid` INTEGER NOT NULL,
  `temperature` DECIMAL(8, 2) NOT NULL,
  `event_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
  WATERMARK FOR `event_time` AS `event_time` - INTERVAL '0.0' SECOND
)
WITH (
  'connector' = 'kafka',
  'format' = 'flexible-json',
  'properties.auto.offset.reset' = 'earliest',
  'properties.bootstrap.servers' = '${KAFKA_BOOTSTRAP_SERVERS}',
  'properties.compression.type' = 'zstd',
  'properties.group.id' = '${KAFKA_GROUP_ID}',
  'topic' = 'SensorReading'
);
CREATE VIEW `SensorMaxTemp`
AS
SELECT `sensorid`, MAX(`temperature`) AS `maxTemp`
FROM `SensorReading`
GROUP BY `sensorid`;
CREATE VIEW `HighTempAlert`
AS
SELECT *
FROM `SensorReading`
WHERE `temperature` > 50;
CREATE VIEW `SensorReadingSubscription`
AS
SELECT *
FROM `SensorReading`;
CREATE TABLE `HighTempAlert_1` (
  `sensorid` INTEGER NOT NULL,
  `temperature` DECIMAL(8, 2) NOT NULL,
  `event_time` TIMESTAMP(3) WITH LOCAL TIME ZONE,
  `__pk_hash` CHAR(32) CHARACTER SET `UTF-16LE`,
  PRIMARY KEY (`__pk_hash`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'sink.on-conflict.action' = 'IGNORE',
  'table-name' = 'HighTempAlert',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
CREATE TABLE `SensorMaxTemp_2` (
  `sensorid` INTEGER NOT NULL,
  `maxTemp` DECIMAL(8, 2) NOT NULL,
  PRIMARY KEY (`sensorid`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'table-name' = 'SensorMaxTemp',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
CREATE TABLE `SensorReading_3` (
  `sensorid` INTEGER NOT NULL,
  `temperature` DECIMAL(8, 2) NOT NULL,
  `event_time` TIMESTAMP(3) WITH LOCAL TIME ZONE,
  `__pk_hash` CHAR(32) CHARACTER SET `UTF-16LE`,
  PRIMARY KEY (`__pk_hash`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'sink.on-conflict.action' = 'IGNORE',
  'table-name' = 'SensorReading',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
CREATE TABLE `SensorReadingSubscription_4` (
  `sensorid` INTEGER NOT NULL,
  `temperature` DECIMAL(8, 2) NOT NULL,
  `event_time` TIMESTAMP(3) WITH LOCAL TIME ZONE
)
WITH (
  'connector' = 'kafka',
  'format' = 'flexible-json',
  'properties.bootstrap.servers' = '${KAFKA_BOOTSTRAP_SERVERS}',
  'properties.compression.type' = 'zstd',
  'properties.group.id' = '${KAFKA_GROUP_ID}',
  'topic' = 'SensorReadingSubscription'
);
EXECUTE STATEMENT SET BEGIN
INSERT INTO `default_catalog`.`default_database`.`HighTempAlert_1`
SELECT `sensorid`, `temperature`, `event_time`, `hash_columns`(`sensorid`, `temperature`, `event_time`) AS `__pk_hash`
 FROM `default_catalog`.`default_database`.`HighTempAlert`
;
INSERT INTO `default_catalog`.`default_database`.`SensorMaxTemp_2`
 SELECT *
  FROM `default_catalog`.`default_database`.`SensorMaxTemp`
 ;
 INSERT INTO `default_catalog`.`default_database`.`SensorReading_3`
  SELECT `sensorid`, `temperature`, `event_time`, `hash_columns`(`sensorid`, `temperature`, `event_time`) AS `__pk_hash`
   FROM `default_catalog`.`default_database`.`SensorReading`
  ;
  INSERT INTO `default_catalog`.`default_database`.`SensorReadingSubscription_4`
   SELECT *
    FROM `default_catalog`.`default_database`.`SensorReadingSubscription`
   ;
   END
>>>kafka.json
{
  "topics" : [
    {
      "topicName" : "SensorReadingSubscription",
      "tableName" : "SensorReadingSubscription_4",
      "format" : "flexible-json",
      "numPartitions" : 1,
      "replicationFactor" : 3,
      "type" : "SUBSCRIPTION",
      "messageKeys" : [ ],
      "messageSchema" : "",
      "config" : { }
    },
    {
      "topicName" : "SensorReading",
      "tableName" : "SensorReading",
      "format" : "flexible-json",
      "numPartitions" : 1,
      "replicationFactor" : 3,
      "type" : "MUTATION",
      "messageKeys" : [ ],
      "messageSchema" : "",
      "config" : { }
    }
  ],
  "testRunnerTopics" : [ ]
}
>>>postgres-schema.sql
CREATE TABLE IF NOT EXISTS "HighTempAlert" ("sensorid" INTEGER NOT NULL, "temperature" NUMERIC NOT NULL, "event_time" TIMESTAMP WITH TIME ZONE, "__pk_hash" TEXT, PRIMARY KEY ("__pk_hash"));
CREATE TABLE IF NOT EXISTS "SensorMaxTemp" ("sensorid" INTEGER NOT NULL, "maxTemp" NUMERIC NOT NULL, PRIMARY KEY ("sensorid"));
CREATE TABLE IF NOT EXISTS "SensorReading" ("sensorid" INTEGER NOT NULL, "temperature" NUMERIC NOT NULL, "event_time" TIMESTAMP WITH TIME ZONE, "__pk_hash" TEXT, PRIMARY KEY ("__pk_hash"))
>>>postgres-views.sql

>>>vertx.json
{
  "models" : {
    "v1" : {
      "queries" : [
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "HighTempAlert",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM (SELECT \"sensorid\", \"temperature\", \"event_time\"\n  FROM \"HighTempAlert\"\n  ORDER BY \"sensorid\" DESC NULLS LAST) AS \"t0\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        },
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "SensorMaxTemp",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM (SELECT \"sensorid\", \"maxTemp\"\n  FROM \"SensorMaxTemp\"\n  ORDER BY \"sensorid\" DESC NULLS LAST) AS \"t\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        },
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "SensorReading",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT \"sensorid\", \"temperature\", \"event_time\"\nFROM \"SensorReading\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        }
      ],
      "mutations" : [
        {
          "type" : "kafka",
          "fieldName" : "SensorReading",
          "returnList" : true,
          "topic" : "SensorReading",
          "keyColumns" : [ ],
          "computedColumns" : {
            "event_time" : {
              "metadataType" : "TIMESTAMP",
              "name" : "",
              "required" : false
            }
          },
          "transactional" : false,
          "sinkConfig" : { }
        }
      ],
      "subscriptions" : [
        {
          "type" : "kafka",
          "fieldName" : "SensorReadingSubscription",
          "topic" : "SensorReadingSubscription",
          "sinkConfig" : { },
          "equalityConditions" : { }
        },
        {
          "type" : "kafka",
          "fieldName" : "SensorSubscriptionById",
          "topic" : "SensorReadingSubscription",
          "sinkConfig" : { },
          "equalityConditions" : {
            "sensorid" : {
              "type" : "arg",
              "path" : "sensorid",
              "sqlType" : "INTEGER"
            }
          }
        }
      ],
      "operations" : [
        {
          "function" : {
            "name" : "GetHighTempAlert",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query HighTempAlert($limit: Int = 10, $offset: Int = 0) {\nHighTempAlert(limit: $limit, offset: $offset) {\nsensorid\ntemperature\nevent_time\n}\n\n}",
            "queryName" : "HighTempAlert",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/HighTempAlert{?offset,limit}"
        },
        {
          "function" : {
            "name" : "GetSensorMaxTemp",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query SensorMaxTemp($limit: Int = 10, $offset: Int = 0) {\nSensorMaxTemp(limit: $limit, offset: $offset) {\nsensorid\nmaxTemp\n}\n\n}",
            "queryName" : "SensorMaxTemp",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/SensorMaxTemp{?offset,limit}"
        },
        {
          "function" : {
            "name" : "GetSensorReading",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query SensorReading($limit: Int = 10, $offset: Int = 0) {\nSensorReading(limit: $limit, offset: $offset) {\nsensorid\ntemperature\nevent_time\n}\n\n}",
            "queryName" : "SensorReading",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/SensorReading{?offset,limit}"
        },
        {
          "function" : {
            "name" : "AddSensorReading",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "event" : {
                  "type" : "array",
                  "items" : {
                    "type" : "object",
                    "properties" : {
                      "temperature" : {
                        "type" : "number"
                      },
                      "sensorid" : {
                        "type" : "integer"
                      }
                    },
                    "required" : [
                      "sensorid",
                      "temperature"
                    ]
                  }
                }
              },
              "required" : [
                "event"
              ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "mutation SensorReading($event: [SensorReadingInput!]!) {\nSensorReading(event: $event) {\nsensorid\ntemperature\nevent_time\n}\n\n}",
            "queryName" : "SensorReading",
            "operationType" : "MUTATION"
          },
          "mcpMethod" : "NONE",
          "restMethod" : "POST",
          "uriTemplate" : "mutations/SensorReading"
        }
      ],
      "schema" : {
        "type" : "string",
        "schema" : "\"An RFC-3339 compliant Full Date Scalar\"\nscalar Date\n\n\"A DateTime scalar that handles both full RFC3339 and shorter timestamp formats\"\nscalar DateTime\n\n\"A JSON scalar\"\nscalar JSON\n\n\"24-hour clock time value string in the format `hh:mm:ss` or `hh:mm:ss.sss`.\"\nscalar LocalTime\n\n\"A 64-bit signed integer\"\nscalar Long\n\ntype Mutation {\n  SensorReading(event: [SensorReadingInput!]!): [SensorReadingResultOutput!]!\n}\n\ntype Query {\n  HighTempAlert(limit: Int = 10, offset: Int = 0): [SensorReading!]\n  SensorMaxTemp(limit: Int = 10, offset: Int = 0): [SensorMaxTemp!]\n  SensorReading(limit: Int = 10, offset: Int = 0): [SensorReading!]\n}\n\ntype SensorMaxTemp {\n  sensorid: Int!\n  maxTemp: Float!\n}\n\ntype SensorReading {\n  sensorid: Int!\n  temperature: Float!\n  event_time: DateTime\n}\n\ninput SensorReadingInput {\n  sensorid: Int!\n  temperature: Float!\n}\n\ntype SensorReadingResultOutput {\n  sensorid: Int!\n  temperature: Float!\n  event_time: DateTime\n}\n\ntype Subscription {\n  SensorReadingSubscription: SensorReading\n  SensorSubscriptionById(sensorid: Int!): SensorReading\n}\n\nenum _McpMethodType {\n  NONE\n  TOOL\n  RESOURCE\n}\n\nenum _RestMethodType {\n  NONE\n  GET\n  POST\n}\n\ndirective @api(mcp: _McpMethodType, rest: _RestMethodType, uri: String) on QUERY | MUTATION | FIELD_DEFINITION\n"
      }
    }
  }
}
