>>>inferred_schema.graphqls
"An RFC-3339 compliant Full Date Scalar"
scalar Date

"A DateTime scalar that handles both full RFC3339 and shorter timestamp formats"
scalar DateTime

input InputTableInput {
  userid: Int!
  tokens: Long!
}

type InputTableResultOutput {
  userid: Int!
  tokens: Long!
  event_time: DateTime
}

"A JSON scalar"
scalar JSON

type LatestTokens {
  userid: Int!
  total_tokens: Long!
  window_end: DateTime!
  window_time: DateTime!
}

"24-hour clock time value string in the format `hh:mm:ss` or `hh:mm:ss.sss`."
scalar LocalTime

"A 64-bit signed integer"
scalar Long

type Mutation {
  InputTable(event: InputTableInput!): InputTableResultOutput!
}

type Query {
  LatestTokens(limit: Int = 10, offset: Int = 0): [LatestTokens!]
}

enum _McpMethodType {
  NONE
  TOOL
  RESOURCE
}

enum _RestMethodType {
  NONE
  GET
  POST
}

directive @api(mcp: _McpMethodType, rest: _RestMethodType, uri: String) on QUERY | MUTATION | FIELD_DEFINITION

>>>pipeline_explain.txt
=== InputTable
ID:          default_catalog.default_database.InputTable
Type:        stream
Stage:       flink
Primary key: -
Timestamp:   event_time
Row count:   ~1e8
---
Schema:
 - userid: INTEGER NOT NULL
 - tokens: BIGINT NOT NULL
 - event_time: TIMESTAMP_LTZ(3) *ROWTIME*
Inputs:
 - default_catalog.default_database.InputTable__base

=== LatestTokens
ID:          default_catalog.default_database.LatestTokens
Type:        state
Stage:       flink
Primary key: userid
Timestamp:   window_time
Row count:   ~2e7
---
Schema:
 - userid: INTEGER NOT NULL
 - total_tokens: BIGINT NOT NULL
 - window_end: TIMESTAMP(3) NOT NULL
 - window_time: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
Inputs:
 - default_catalog.default_database._UserTokens
Annotations:
 - mostRecentDistinct: true

=== _UserTokens
ID:          default_catalog.default_database._UserTokens
Type:        stream
Stage:       flink
Primary key: -
Timestamp:   window_time
Row count:   ~7e7
---
Schema:
 - userid: INTEGER NOT NULL
 - total_tokens: BIGINT NOT NULL
 - window_end: TIMESTAMP(3) NOT NULL
 - window_time: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
Inputs:
 - default_catalog.default_database.InputTable
Annotations:
 - features: STREAM_WINDOW_AGGREGATION (feature)

=== SinkTable
ID:          iceberg.SinkTable
Type:        export
Stage:       flink
Connector:   iceberg
---
Inputs:
 - default_catalog.default_database._UserTokens

>>>flink-sql-no-functions.sql
CREATE TABLE `InputTable` (
  `userid` INTEGER NOT NULL,
  `tokens` BIGINT NOT NULL,
  `event_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
  WATERMARK FOR `event_time` AS `event_time` - INTERVAL '0.0' SECOND
)
WITH (
  'connector' = 'kafka',
  'format' = 'flexible-json',
  'properties.auto.offset.reset' = 'earliest',
  'properties.bootstrap.servers' = '${KAFKA_BOOTSTRAP_SERVERS}',
  'properties.compression.type' = 'zstd',
  'properties.group.id' = '${KAFKA_GROUP_ID}-group',
  'topic' = 'InputTable'
);
CREATE DATABASE IF NOT EXISTS `iceberg`;
USE `iceberg`;
CREATE TABLE `SinkTable` (
  `userid` INTEGER NOT NULL,
  `total_tokens` BIGINT NOT NULL,
  `window_end` TIMESTAMP_LTZ(3),
  `window_time` TIMESTAMP_LTZ(3)
)
WITH (
  'catalog-name' = 'mydatabase',
  'catalog-table' = 'SinkTable',
  'catalog-type' = 'hadoop',
  'commit.retry.max-wait-ms' = '5000',
  'commit.retry.min-wait-ms' = '100',
  'commit.retry.num-retries' = '20',
  'connector' = 'iceberg',
  'format-version' = '2',
  'warehouse' = '/tmp/duckdb',
  'write.distribution-mode' = 'none'
);
USE CATALOG `default_catalog`;
CREATE DATABASE IF NOT EXISTS `default_database`;
USE `default_database`;
CREATE VIEW `_UserTokens`
AS
SELECT `userid`, SUM(`tokens`) AS `total_tokens`, `window_end`, `window_time`
FROM TABLE(TUMBLE(TABLE `InputTable`, DESCRIPTOR(`event_time`), INTERVAL '1' SECOND))
GROUP BY `userid`, `window_start`, `window_end`, `window_time`;
CREATE VIEW `LatestTokens`
AS
SELECT `userid`, `total_tokens`, `window_end`, `window_time`
FROM (SELECT `userid`, `total_tokens`, `window_end`, `window_time`, ROW_NUMBER() OVER (PARTITION BY `userid` ORDER BY `window_time` DESC NULLS LAST) AS `__sqrlinternal_rownum`
  FROM `default_catalog`.`default_database`.`_UserTokens`) AS `t`
WHERE `__sqrlinternal_rownum` = 1;
CREATE TABLE `LatestTokens_1` (
  `userid` INTEGER NOT NULL,
  `total_tokens` BIGINT NOT NULL,
  `window_end` TIMESTAMP(3) NOT NULL,
  `window_time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  PRIMARY KEY (`userid`) NOT ENFORCED
)
WITH (
  'catalog-name' = 'mydatabase',
  'catalog-table' = 'LatestTokens',
  'catalog-type' = 'hadoop',
  'commit.retry.max-wait-ms' = '5000',
  'commit.retry.min-wait-ms' = '100',
  'commit.retry.num-retries' = '20',
  'compaction.enabled' = 'true',
  'connector' = 'iceberg',
  'format-version' = '2',
  'history.expire.max-snapshot-age-ms' = '86400000',
  'warehouse' = '/tmp/duckdb',
  'write.distribution-mode' = 'hash',
  'write.metadata.delete-after-commit.enabled' = 'true',
  'write.metadata.previous-versions-max' = '10',
  'write.upsert.enabled' = 'true'
);
EXECUTE STATEMENT SET BEGIN
INSERT INTO `default_catalog`.`default_database`.`LatestTokens_1`
SELECT *
 FROM `default_catalog`.`default_database`.`LatestTokens`
;
INSERT INTO `default_catalog`.`iceberg`.`SinkTable`
 SELECT *
  FROM `default_catalog`.`default_database`.`_UserTokens`
 ;
 END
>>>iceberg-duckdb-schema.sql
CREATE TABLE IF NOT EXISTS "LatestTokens" ("userid" INTEGER NOT NULL, "total_tokens" BIGINT NOT NULL, "window_end" TIMESTAMP(3) NOT NULL, "window_time" TIMESTAMP WITH TIME ZONE NOT NULL, PRIMARY KEY ("userid"));
CREATE TABLE IF NOT EXISTS "SinkTable" ("userid" INTEGER NOT NULL, "total_tokens" BIGINT NOT NULL, "window_end" TIMESTAMP WITH TIME ZONE, "window_time" TIMESTAMP WITH TIME ZONE)
>>>iceberg-duckdb-views.sql

>>>iceberg-schema.sql
CREATE TABLE IF NOT EXISTS "LatestTokens" ("userid" INTEGER NOT NULL, "total_tokens" BIGINT NOT NULL, "window_end" TIMESTAMP WITHOUT TIME ZONE NOT NULL, "window_time" TIMESTAMP WITH TIME ZONE NOT NULL, PRIMARY KEY ("userid"));
CREATE TABLE IF NOT EXISTS "SinkTable" ("userid" INTEGER NOT NULL, "total_tokens" BIGINT NOT NULL, "window_end" TIMESTAMP WITH TIME ZONE, "window_time" TIMESTAMP WITH TIME ZONE)
>>>iceberg-views.sql

>>>kafka.json
{
  "topics" : [
    {
      "topicName" : "InputTable",
      "tableName" : "InputTable",
      "format" : "flexible-json",
      "numPartitions" : 1,
      "replicationFactor" : 3,
      "type" : "MUTATION",
      "messageKeys" : [ ],
      "messageSchema" : "",
      "config" : { }
    }
  ],
  "testRunnerTopics" : [ ]
}
>>>vertx.json
{
  "models" : {
    "v1" : {
      "queries" : [
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "LatestTokens",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM \"iceberg_scan\"('/tmp/duckdb/default_database/LatestTokens', ALLOW_MOVED_PATHS = TRUE)",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "DUCKDB"
            }
          }
        }
      ],
      "mutations" : [
        {
          "type" : "kafka",
          "fieldName" : "InputTable",
          "returnList" : false,
          "topic" : "InputTable",
          "keyColumns" : [ ],
          "computedColumns" : {
            "event_time" : {
              "metadataType" : "TIMESTAMP",
              "name" : "",
              "required" : false
            }
          },
          "transactional" : false,
          "sinkConfig" : { }
        }
      ],
      "subscriptions" : [ ],
      "operations" : [
        {
          "function" : {
            "name" : "GetLatestTokens",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query LatestTokens($limit: Int = 10, $offset: Int = 0) {\nLatestTokens(limit: $limit, offset: $offset) {\nuserid\ntotal_tokens\nwindow_end\nwindow_time\n}\n\n}",
            "queryName" : "LatestTokens",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/LatestTokens{?offset,limit}"
        },
        {
          "function" : {
            "name" : "AddInputTable",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "tokens" : {
                  "type" : "integer"
                },
                "userid" : {
                  "type" : "integer"
                }
              },
              "required" : [
                "userid",
                "tokens"
              ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "mutation InputTable($userid: Int!, $tokens: Long!) {\nInputTable(event: { userid: $userid, tokens: $tokens }) {\nuserid\ntokens\nevent_time\n}\n\n}",
            "queryName" : "InputTable",
            "operationType" : "MUTATION"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "POST",
          "uriTemplate" : "mutations/InputTable"
        }
      ],
      "schema" : {
        "type" : "string",
        "schema" : "\"An RFC-3339 compliant Full Date Scalar\"\nscalar Date\n\n\"A DateTime scalar that handles both full RFC3339 and shorter timestamp formats\"\nscalar DateTime\n\ninput InputTableInput {\n  userid: Int!\n  tokens: Long!\n}\n\ntype InputTableResultOutput {\n  userid: Int!\n  tokens: Long!\n  event_time: DateTime\n}\n\n\"A JSON scalar\"\nscalar JSON\n\ntype LatestTokens {\n  userid: Int!\n  total_tokens: Long!\n  window_end: DateTime!\n  window_time: DateTime!\n}\n\n\"24-hour clock time value string in the format `hh:mm:ss` or `hh:mm:ss.sss`.\"\nscalar LocalTime\n\n\"A 64-bit signed integer\"\nscalar Long\n\ntype Mutation {\n  InputTable(event: InputTableInput!): InputTableResultOutput!\n}\n\ntype Query {\n  LatestTokens(limit: Int = 10, offset: Int = 0): [LatestTokens!]\n}\n\nenum _McpMethodType {\n  NONE\n  TOOL\n  RESOURCE\n}\n\nenum _RestMethodType {\n  NONE\n  GET\n  POST\n}\n\ndirective @api(mcp: _McpMethodType, rest: _RestMethodType, uri: String) on QUERY | MUTATION | FIELD_DEFINITION\n"
      }
    }
  }
}
