>>>pipeline_explain.txt
=== TotalOrgTokens
ID:          default_catalog.default_database.TotalOrgTokens
Type:        state
Stage:       flink
Primary key: orgid
Timestamp:   -
Row count:   ~1e7
---
Schema:
 - orgid: BIGINT NOT NULL
 - total_tokens: BIGINT NOT NULL
 - total_requests: BIGINT NOT NULL
Inputs:
 - default_catalog.default_database._EnrichedUserTokens

=== TotalOrgTokensByRange
ID:          default_catalog.default_database.TotalOrgTokensByRange
Type:        query
Stage:       postgres
---
Inputs:
 - default_catalog.default_database.TotalOrgTokens
Annotations:
 - parameters: minTokens, maxTokens
 - base-table: TotalOrgTokens

=== TotalUserTokens
ID:          default_catalog.default_database.TotalUserTokens
Type:        state
Stage:       flink
Primary key: userid
Timestamp:   -
Row count:   ~1e7
---
Schema:
 - userid: BIGINT NOT NULL
 - total_tokens: BIGINT NOT NULL
 - total_requests: BIGINT NOT NULL
Inputs:
 - default_catalog.default_database.UserTokens

=== UsageAlert
ID:          default_catalog.default_database.UsageAlert
Type:        stream
Stage:       flink
Primary key: -
Timestamp:   request_time
Row count:   ~5e7
---
Schema:
 - userid: BIGINT NOT NULL
 - tokens: BIGINT NOT NULL
 - request_time: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
Inputs:
 - default_catalog.default_database.UserTokens

=== UserTokens
ID:          default_catalog.default_database.UserTokens
Type:        stream
Stage:       flink
Primary key: -
Timestamp:   request_time
Row count:   ~1e8
---
Schema:
 - userid: BIGINT NOT NULL
 - tokens: BIGINT NOT NULL
 - request_time: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
Inputs:
 - default_catalog.default_database.UserTokens__base

=== _CurrentUserInfo
ID:          default_catalog.default_database._CurrentUserInfo
Type:        state
Stage:       flink
Primary key: userid
Timestamp:   event_time
Row count:   ~2e7
---
Schema:
 - userid: BIGINT NOT NULL
 - orgid: BIGINT NOT NULL
 - last_updated: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL
 - event_time: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
Inputs:
 - default_catalog.default_database._UserInfo

=== _EnrichedUserTokens
ID:          default_catalog.default_database._EnrichedUserTokens
Type:        stream
Stage:       flink
Primary key: -
Timestamp:   request_time
Row count:   ~9e7
---
Schema:
 - userid: BIGINT NOT NULL
 - tokens: BIGINT NOT NULL
 - request_time: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
 - orgid: BIGINT NOT NULL
Inputs:
 - default_catalog.default_database.UserTokens
 - default_catalog.default_database._CurrentUserInfo

=== _UserInfo
ID:          default_catalog.default_database._UserInfo
Type:        stream
Stage:       flink
Primary key: -
Timestamp:   event_time
Row count:   ~1e8
---
Schema:
 - userid: BIGINT NOT NULL
 - orgid: BIGINT NOT NULL
 - last_updated: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL
 - event_time: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
Inputs:
 - default_catalog.default_database._UserInfo__base

>>>flink-sql-no-functions.sql
CREATE TABLE `UserTokens` (
  `userid` BIGINT NOT NULL,
  `tokens` BIGINT NOT NULL,
  `request_time` TIMESTAMP_LTZ(3) NOT NULL METADATA FROM 'timestamp',
  WATERMARK FOR `request_time` AS `request_time` - INTERVAL '0.0' SECOND
)
WITH (
  'connector' = 'kafka',
  'format' = 'flexible-json',
  'properties.auto.offset.reset' = 'earliest',
  'properties.bootstrap.servers' = '${KAFKA_BOOTSTRAP_SERVERS}',
  'properties.compression.type' = 'zstd',
  'properties.group.id' = '${KAFKA_GROUP_ID}',
  'topic' = 'UserTokens'
);
CREATE VIEW `TotalUserTokens`
AS
SELECT `userid`, SUM(`tokens`) AS `total_tokens`, COUNT(`tokens`) AS `total_requests`
FROM `UserTokens`
GROUP BY `userid`;
CREATE VIEW `UsageAlert`
AS
SELECT *
FROM `UserTokens`
WHERE `tokens` > 100000;
CREATE TABLE `_UserInfo` (
  `userid` BIGINT NOT NULL,
  `orgid` BIGINT NOT NULL,
  `last_updated` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  `event_time` AS `NOW`(),
  WATERMARK FOR `event_time` AS `event_time` - INTERVAL '0.001' SECOND
)
WITH (
  'connector' = 'filesystem',
  'format' = 'flexible-json',
  'path' = '${DATA_PATH}/userinfo.jsonl',
  'source.monitor-interval' = '10 sec'
);
CREATE VIEW `_CurrentUserInfo`
AS
SELECT `userid`, `orgid`, `last_updated`, `event_time`
FROM (SELECT `userid`, `orgid`, `last_updated`, `event_time`, ROW_NUMBER() OVER (PARTITION BY `userid` ORDER BY `last_updated` DESC NULLS LAST) AS `__sqrlinternal_rownum`
  FROM `default_catalog`.`default_database`.`_UserInfo`) AS `t`
WHERE `__sqrlinternal_rownum` = 1;
CREATE VIEW `_EnrichedUserTokens`
AS
SELECT `t`.*, `u`.`orgid`
FROM `UserTokens` AS `t`
 INNER JOIN `_CurrentUserInfo` FOR SYSTEM_TIME AS OF `t`.`request_time` AS `u` ON `u`.`userid` = `t`.`userid`;
CREATE VIEW `TotalOrgTokens`
AS
SELECT `orgid`, SUM(`tokens`) AS `total_tokens`, COUNT(`tokens`) AS `total_requests`
FROM `_EnrichedUserTokens`
GROUP BY `orgid`;
CREATE TABLE `TotalOrgTokens_1` (
  `orgid` BIGINT NOT NULL,
  `total_tokens` BIGINT NOT NULL,
  `total_requests` BIGINT NOT NULL,
  PRIMARY KEY (`orgid`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'table-name' = 'TotalOrgTokens',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
CREATE TABLE `TotalUserTokens_2` (
  `userid` BIGINT NOT NULL,
  `total_tokens` BIGINT NOT NULL,
  `total_requests` BIGINT NOT NULL,
  PRIMARY KEY (`userid`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'table-name' = 'TotalUserTokens',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
CREATE TABLE `UsageAlert_3` (
  `userid` BIGINT NOT NULL,
  `tokens` BIGINT NOT NULL,
  `request_time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL
)
WITH (
  'connector' = 'kafka',
  'format' = 'flexible-json',
  'properties.bootstrap.servers' = '${KAFKA_BOOTSTRAP_SERVERS}',
  'properties.compression.type' = 'zstd',
  'properties.group.id' = '${KAFKA_GROUP_ID}',
  'topic' = 'UsageAlert'
);
EXECUTE STATEMENT SET BEGIN
INSERT INTO `default_catalog`.`default_database`.`TotalOrgTokens_1`
SELECT *
 FROM `default_catalog`.`default_database`.`TotalOrgTokens`
;
INSERT INTO `default_catalog`.`default_database`.`TotalUserTokens_2`
 SELECT *
  FROM `default_catalog`.`default_database`.`TotalUserTokens`
 ;
 INSERT INTO `default_catalog`.`default_database`.`UsageAlert_3`
  SELECT *
   FROM `default_catalog`.`default_database`.`UsageAlert`
  ;
  END
>>>kafka.json
{
  "topics" : [
    {
      "topicName" : "UsageAlert",
      "tableName" : "UsageAlert_3",
      "format" : "flexible-json",
      "numPartitions" : 1,
      "replicationFactor" : 3,
      "type" : "SUBSCRIPTION",
      "messageKeys" : [ ],
      "messageSchema" : "",
      "config" : { }
    },
    {
      "topicName" : "UserTokens",
      "tableName" : "UserTokens",
      "format" : "flexible-json",
      "numPartitions" : 1,
      "replicationFactor" : 3,
      "type" : "MUTATION",
      "messageKeys" : [ ],
      "messageSchema" : "",
      "config" : { }
    }
  ],
  "testRunnerTopics" : [ ]
}
>>>postgres-schema.sql
CREATE TABLE IF NOT EXISTS "TotalOrgTokens" ("orgid" BIGINT NOT NULL, "total_tokens" BIGINT NOT NULL, "total_requests" BIGINT NOT NULL, PRIMARY KEY ("orgid"));
CREATE TABLE IF NOT EXISTS "TotalUserTokens" ("userid" BIGINT NOT NULL, "total_tokens" BIGINT NOT NULL, "total_requests" BIGINT NOT NULL, PRIMARY KEY ("userid"));

CREATE INDEX IF NOT EXISTS "TotalOrgTokens_btree_c1" ON "TotalOrgTokens" USING btree ("total_tokens")
>>>postgres-views.sql

>>>vertx.json
{
  "models" : {
    "v1" : {
      "queries" : [
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "TotalOrgTokens",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM \"TotalOrgTokens\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        },
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "TotalUserTokens",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              },
              {
                "type" : "variable",
                "path" : "userid"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM \"TotalUserTokens\"\nWHERE \"userid\" = $1",
              "parameters" : [
                {
                  "type" : "arg",
                  "path" : "userid",
                  "sqlType" : "BIGINT"
                }
              ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        },
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "TotalOrgTokensByRange",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "minTokens"
              },
              {
                "type" : "variable",
                "path" : "offset"
              },
              {
                "type" : "variable",
                "path" : "maxTokens"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM \"TotalOrgTokens\"\nWHERE \"total_tokens\" >= $1 AND \"total_tokens\" <= $2",
              "parameters" : [
                {
                  "type" : "arg",
                  "path" : "minTokens",
                  "sqlType" : "BIGINT"
                },
                {
                  "type" : "arg",
                  "path" : "maxTokens",
                  "sqlType" : "BIGINT"
                }
              ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        }
      ],
      "mutations" : [
        {
          "type" : "kafka",
          "fieldName" : "UserTokens",
          "returnList" : false,
          "topic" : "UserTokens",
          "keyColumns" : [ ],
          "computedColumns" : {
            "request_time" : {
              "metadataType" : "TIMESTAMP",
              "name" : "",
              "required" : true
            }
          },
          "transactional" : false,
          "sinkConfig" : { }
        }
      ],
      "subscriptions" : [
        {
          "type" : "kafka",
          "fieldName" : "UsageAlert",
          "topic" : "UsageAlert",
          "sinkConfig" : { },
          "equalityConditions" : { }
        }
      ],
      "operations" : [
        {
          "function" : {
            "name" : "GetTotalOrgTokens",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query TotalOrgTokens($limit: Int = 10, $offset: Int = 0) {\nTotalOrgTokens(limit: $limit, offset: $offset) {\norgid\ntotal_tokens\ntotal_requests\n}\n\n}",
            "queryName" : "TotalOrgTokens",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/TotalOrgTokens{?offset,limit}"
        },
        {
          "function" : {
            "name" : "GetTotalUserTokens",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                },
                "userid" : {
                  "type" : "integer"
                }
              },
              "required" : [
                "userid"
              ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query TotalUserTokens($userid: Long!, $limit: Int = 10, $offset: Int = 0) {\nTotalUserTokens(userid: $userid, limit: $limit, offset: $offset) {\nuserid\ntotal_tokens\ntotal_requests\n}\n\n}",
            "queryName" : "TotalUserTokens",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/TotalUserTokens{?offset,limit,userid}"
        },
        {
          "function" : {
            "name" : "GetTotalOrgTokensByRange",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "maxTokens" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                },
                "minTokens" : {
                  "type" : "integer"
                }
              },
              "required" : [
                "minTokens",
                "maxTokens"
              ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query TotalOrgTokensByRange($minTokens: Long!, $maxTokens: Long!, $limit: Int = 10, $offset: Int = 0) {\nTotalOrgTokensByRange(minTokens: $minTokens, maxTokens: $maxTokens, limit: $limit, offset: $offset) {\norgid\ntotal_tokens\ntotal_requests\n}\n\n}",
            "queryName" : "TotalOrgTokensByRange",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/TotalOrgTokensByRange{?offset,maxTokens,limit,minTokens}"
        },
        {
          "function" : {
            "name" : "AddUserTokens",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "tokens" : {
                  "type" : "integer"
                },
                "userid" : {
                  "type" : "integer"
                }
              },
              "required" : [
                "userid",
                "tokens"
              ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "mutation UserTokens($userid: Long!, $tokens: Long!) {\nUserTokens(event: { userid: $userid, tokens: $tokens }) {\nuserid\ntokens\nrequest_time\n}\n\n}",
            "queryName" : "UserTokens",
            "operationType" : "MUTATION"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "POST",
          "uriTemplate" : "mutations/UserTokens"
        }
      ],
      "schema" : {
        "type" : "string",
        "schema" : "\"An RFC-3339 compliant DateTime Scalar\"\nscalar DateTime\n\n\"A 64-bit signed integer\"\nscalar Long\n\ntype Mutation {\n  UserTokens(event: UserTokensInput!): UserTokensResultOutput!\n}\n\ntype Query {\n  TotalOrgTokens(limit: Int = 10, offset: Int = 0): [TotalOrgTokens!]\n  TotalUserTokens(userid: Long!, limit: Int = 10, offset: Int = 0): [TotalUserTokens!]\n  TotalOrgTokensByRange(minTokens: Long!, maxTokens: Long!, limit: Int = 10, offset: Int = 0): [TotalOrgTokens!]\n}\n\ntype Subscription {\n  UsageAlert: UserTokens\n}\n\ntype TotalOrgTokens {\n  orgid: Long!\n  total_tokens: Long!\n  total_requests: Long!\n}\n\ntype TotalUserTokens {\n  userid: Long!\n  total_tokens: Long!\n  total_requests: Long!\n}\n\ntype UserTokens {\n  userid: Long!\n  tokens: Long!\n  request_time: DateTime!\n}\n\ninput UserTokensInput {\n  userid: Long!\n  tokens: Long!\n}\n\ntype UserTokensResultOutput {\n  userid: Long!\n  tokens: Long!\n  request_time: DateTime!\n}\n"
      }
    }
  }
}
