>>>pipeline_explain.txt
=== InputDistinct
ID:          default_catalog.default_database.InputDistinct
Type:        state
Stage:       flink
Primary key: userid
Timestamp:   event_time
Row count:   ~2e7
---
Schema:
 - _uuid: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - userid: INTEGER
 - event_time: TIMESTAMP_LTZ(3) *ROWTIME*
 - unique_id: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
Inputs:
 - default_catalog.default_database._Input
Annotations:
 - mostRecentDistinct: true
 - stream-root: SimpleInput
Plan:
LogicalProject(_uuid=[$0], userid=[$1], event_time=[$2], unique_id=[$3])
  LogicalFilter(condition=[=($4, 1)])
    LogicalProject(_uuid=[$0], userid=[$1], event_time=[$2], unique_id=[$3], __sqrlinternal_rownum=[ROW_NUMBER() OVER (PARTITION BY $1 ORDER BY $2 DESC NULLS LAST)])
      LogicalTableScan(table=[[default_catalog, default_database, _Input]])
SQL:
CREATE VIEW `InputDistinct`
AS
SELECT `_uuid`, `userid`, `event_time`, `unique_id`
FROM (SELECT `_uuid`, `userid`, `event_time`, `unique_id`, ROW_NUMBER() OVER (PARTITION BY `userid` ORDER BY `event_time` DESC NULLS LAST) AS `__sqrlinternal_rownum`
  FROM `default_catalog`.`default_database`.`_Input`) AS `t`
WHERE `__sqrlinternal_rownum` = 1
=== JoinTable
ID:          default_catalog.default_database.JoinTable
Type:        state
Stage:       postgres
Primary key: userid, _uuid0, unique_id0
Timestamp:   event_time
Row count:   ~1e8
---
Schema:
 - _uuid: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - userid: INTEGER
 - event_time: TIMESTAMP_LTZ(3) *ROWTIME*
 - unique_id: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - _uuid0: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - userid0: INTEGER
 - event_time0: TIMESTAMP_LTZ(3) *ROWTIME*
 - unique_id0: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
Inputs:
 - default_catalog.default_database.InputDistinct
 - default_catalog.default_database._Input
Plan:
LogicalProject(_uuid=[$0], userid=[$1], event_time=[$2], unique_id=[$3], _uuid0=[$4], userid0=[$5], event_time0=[$6], unique_id0=[$7])
  LogicalJoin(condition=[=($5, $1)], joinType=[inner])
    LogicalTableScan(table=[[default_catalog, default_database, InputDistinct]])
    LogicalTableScan(table=[[default_catalog, default_database, _Input]])
SQL:
CREATE VIEW `JoinTable` AS  SELECT * FROM InputDistinct i2 JOIN _Input i1 ON i1.userid = i2.userid;

=== SimpleInput
ID:          default_catalog.default_database.SimpleInput
Type:        stream
Stage:       flink
Primary key: _uuid
Timestamp:   event_time
Row count:   ~1e8
---
Schema:
 - _uuid: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - userid: INTEGER
 - event_time: TIMESTAMP_LTZ(3) *ROWTIME*
Inputs:
 - default_catalog.default_database.SimpleInput__base
Annotations:
 - stream-root: SimpleInput
Plan:
LogicalWatermarkAssigner(rowtime=[event_time], watermark=[-($2, 0:INTERVAL SECOND)])
  LogicalProject(_uuid=[$0], userid=[$1], event_time=[CAST($2):TIMESTAMP_LTZ(3) *ROWTIME*])
    LogicalTableScan(table=[[default_catalog, default_database, SimpleInput, metadata=[timestamp]]])
SQL:
CREATE TABLE `SimpleInput` (
  `_uuid` STRING NOT NULL,
  `userid` INTEGER,
  `event_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
  WATERMARK FOR `event_time` AS `event_time` - INTERVAL '0.0' SECOND
)
WITH (
  'connector' = 'kafka',
  'format' = 'flexible-json',
  'properties.auto.offset.reset' = 'earliest',
  'properties.bootstrap.servers' = '${KAFKA_BOOTSTRAP_SERVERS}',
  'properties.compression.type' = 'zstd',
  'properties.group.id' = '${KAFKA_GROUP_ID}',
  'topic' = 'kafka-mutation-SimpleInput'
)
=== _Input
ID:          default_catalog.default_database._Input
Type:        stream
Stage:       flink
Primary key: _uuid, unique_id
Timestamp:   event_time
Row count:   ~1e8
---
Schema:
 - _uuid: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - userid: INTEGER
 - event_time: TIMESTAMP_LTZ(3) *ROWTIME*
 - unique_id: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
Inputs:
 - default_catalog.default_database.SimpleInput
Annotations:
 - stream-root: SimpleInput
Plan:
LogicalProject(_uuid=[$0], userid=[$1], event_time=[$2], unique_id=[$0])
  LogicalTableScan(table=[[default_catalog, default_database, SimpleInput]])
SQL:
CREATE VIEW `_Input` AS  SELECT *, _uuid AS unique_id FROM SimpleInput;

>>>flink-sql-no-functions.sql
CREATE TABLE `SimpleInput` (
  `_uuid` STRING NOT NULL,
  `userid` INTEGER,
  `event_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
  WATERMARK FOR `event_time` AS `event_time` - INTERVAL '0.0' SECOND
)
WITH (
  'connector' = 'kafka',
  'format' = 'flexible-json',
  'properties.auto.offset.reset' = 'earliest',
  'properties.bootstrap.servers' = '${KAFKA_BOOTSTRAP_SERVERS}',
  'properties.compression.type' = 'zstd',
  'properties.group.id' = '${KAFKA_GROUP_ID}',
  'topic' = 'kafka-mutation-SimpleInput'
);
CREATE VIEW `_Input`
AS
SELECT *, `_uuid` AS `unique_id`
FROM `SimpleInput`;
CREATE VIEW `InputDistinct`
AS
SELECT `_uuid`, `userid`, `event_time`, `unique_id`
FROM (SELECT `_uuid`, `userid`, `event_time`, `unique_id`, ROW_NUMBER() OVER (PARTITION BY `userid` ORDER BY `event_time` DESC NULLS LAST) AS `__sqrlinternal_rownum`
  FROM `default_catalog`.`default_database`.`_Input`) AS `t`
WHERE `__sqrlinternal_rownum` = 1;
CREATE VIEW `JoinTable`
AS
SELECT *
FROM `InputDistinct` AS `i2`
 INNER JOIN `_Input` AS `i1` ON `i1`.`userid` = `i2`.`userid`;
CREATE TABLE `InputDistinct_1` (
  `_uuid` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `userid` INTEGER,
  `event_time` TIMESTAMP(3) WITH LOCAL TIME ZONE,
  `unique_id` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `__pk_hash` CHAR(32) CHARACTER SET `UTF-16LE`,
  PRIMARY KEY (`__pk_hash`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'sink.on-conflict.action' = 'TIMESTAMP',
  'sink.on-conflict.timestamp-column' = 'event_time',
  'table-name' = 'InputDistinct_1',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
CREATE TABLE `_Input_2` (
  `_uuid` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `userid` INTEGER,
  `event_time` TIMESTAMP(3) WITH LOCAL TIME ZONE,
  `unique_id` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  PRIMARY KEY (`_uuid`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'sink.on-conflict.action' = 'IGNORE',
  'table-name' = '_Input_2',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
EXECUTE STATEMENT SET BEGIN
INSERT INTO `default_catalog`.`default_database`.`InputDistinct_1`
SELECT `_uuid`, `userid`, `event_time`, `unique_id`, `hash_columns`(`userid`) AS `__pk_hash`
 FROM `default_catalog`.`default_database`.`_Input`
;
INSERT INTO `default_catalog`.`default_database`.`_Input_2`
 SELECT *
  FROM `default_catalog`.`default_database`.`_Input`
 ;
 END
>>>kafka.json
{
  "topics" : [
    {
      "topicName" : "kafka-mutation-SimpleInput",
      "tableName" : "SimpleInput",
      "format" : "flexible-json",
      "numPartitions" : 1,
      "replicationFactor" : 3,
      "type" : "MUTATION",
      "messageKeys" : [ ],
      "messageSchema" : "",
      "config" : { }
    }
  ],
  "testRunnerTopics" : [ ]
}
>>>postgres.json
{
  "statements" : [
    {
      "name" : "InputDistinct_1",
      "type" : "TABLE",
      "sql" : "CREATE TABLE IF NOT EXISTS \"InputDistinct_1\" (\"_uuid\" TEXT NOT NULL, \"userid\" INTEGER, \"event_time\" TIMESTAMP WITH TIME ZONE, \"unique_id\" TEXT NOT NULL, \"__pk_hash\" TEXT, PRIMARY KEY (\"__pk_hash\"))",
      "fields" : [
        {
          "name" : "_uuid",
          "type" : "TEXT",
          "nullable" : false
        },
        {
          "name" : "userid",
          "type" : "INTEGER",
          "nullable" : true
        },
        {
          "name" : "event_time",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : true
        },
        {
          "name" : "unique_id",
          "type" : "TEXT",
          "nullable" : false
        },
        {
          "name" : "__pk_hash",
          "type" : "TEXT",
          "nullable" : true
        }
      ],
      "primaryKey" : [
        "__pk_hash"
      ],
      "partitionKey" : [ ],
      "partitionType" : "NONE",
      "numPartitions" : 0,
      "ttl" : 0.0
    },
    {
      "name" : "_Input_2",
      "type" : "TABLE",
      "sql" : "CREATE TABLE IF NOT EXISTS \"_Input_2\" (\"_uuid\" TEXT NOT NULL, \"userid\" INTEGER, \"event_time\" TIMESTAMP WITH TIME ZONE, \"unique_id\" TEXT NOT NULL, PRIMARY KEY (\"_uuid\"))",
      "fields" : [
        {
          "name" : "_uuid",
          "type" : "TEXT",
          "nullable" : false
        },
        {
          "name" : "userid",
          "type" : "INTEGER",
          "nullable" : true
        },
        {
          "name" : "event_time",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : true
        },
        {
          "name" : "unique_id",
          "type" : "TEXT",
          "nullable" : false
        }
      ],
      "primaryKey" : [
        "_uuid"
      ],
      "partitionKey" : [ ],
      "partitionType" : "NONE",
      "numPartitions" : 0,
      "ttl" : 0.0
    },
    {
      "name" : "JoinTable",
      "type" : "VIEW",
      "sql" : "CREATE OR REPLACE VIEW \"JoinTable\"(\"_uuid\", \"userid\", \"event_time\", \"unique_id\", \"_uuid0\", \"userid0\", \"event_time0\", \"unique_id0\") AS SELECT *\nFROM (SELECT \"_uuid\", \"userid\", \"event_time\", \"unique_id\"\n  FROM \"InputDistinct_1\") AS \"t2\"\n INNER JOIN \"_Input_2\" AS \"_Input_22\" ON \"t2\".\"userid\" = \"_Input_22\".\"userid\"",
      "fields" : [
        {
          "name" : "_uuid",
          "type" : "TEXT",
          "nullable" : false
        },
        {
          "name" : "userid",
          "type" : "INTEGER",
          "nullable" : true
        },
        {
          "name" : "event_time",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : true
        },
        {
          "name" : "unique_id",
          "type" : "TEXT",
          "nullable" : false
        },
        {
          "name" : "_uuid0",
          "type" : "TEXT",
          "nullable" : false
        },
        {
          "name" : "userid0",
          "type" : "INTEGER",
          "nullable" : true
        },
        {
          "name" : "event_time0",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : true
        },
        {
          "name" : "unique_id0",
          "type" : "TEXT",
          "nullable" : false
        }
      ]
    },
    {
      "name" : "_Input_2_hash_c1",
      "type" : "INDEX",
      "sql" : "CREATE INDEX IF NOT EXISTS \"_Input_2_hash_c1\" ON \"_Input_2\" USING hash (\"userid\")"
    }
  ]
}
>>>vertx.json
{
  "models" : {
    "v1" : {
      "queries" : [
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "JoinTable",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM (SELECT \"_uuid\", \"userid\", \"event_time\", \"unique_id\"\n  FROM \"InputDistinct_1\") AS \"t\"\n INNER JOIN \"_Input_2\" AS \"_Input_20\" ON \"t\".\"userid\" = \"_Input_20\".\"userid\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        }
      ],
      "mutations" : [
        {
          "type" : "kafka",
          "fieldName" : "SimpleInput",
          "returnList" : false,
          "topic" : "kafka-mutation-SimpleInput",
          "keyColumns" : [ ],
          "computedColumns" : {
            "_uuid" : {
              "metadataType" : "UUID",
              "name" : "",
              "required" : true
            },
            "event_time" : {
              "metadataType" : "TIMESTAMP",
              "name" : "",
              "required" : false
            }
          },
          "transactional" : false,
          "sinkConfig" : { }
        }
      ],
      "subscriptions" : [ ],
      "operations" : [
        {
          "function" : {
            "name" : "GetJoinTable",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query JoinTable($limit: Int = 10, $offset: Int = 0) {\nJoinTable(limit: $limit, offset: $offset) {\nuserid\nevent_time\nunique_id\nuserid0\nevent_time0\nunique_id0\n}\n\n}",
            "queryName" : "JoinTable",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/JoinTable{?offset,limit}"
        },
        {
          "function" : {
            "name" : "AddSimpleInput",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "userid" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "mutation SimpleInput($userid: Int) {\nSimpleInput(event: { userid: $userid }) {\nuserid\nevent_time\n}\n\n}",
            "queryName" : "SimpleInput",
            "operationType" : "MUTATION"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "POST",
          "uriTemplate" : "mutations/SimpleInput"
        }
      ],
      "schema" : {
        "type" : "string",
        "schema" : "\"An RFC-3339 compliant Full Date Scalar\"\nscalar Date\n\n\"A DateTime scalar that handles both full RFC3339 and shorter timestamp formats\"\nscalar DateTime\n\n\"A JSON scalar\"\nscalar JSON\n\ntype JoinTable {\n  userid: Int\n  event_time: DateTime\n  unique_id: String!\n  userid0: Int\n  event_time0: DateTime\n  unique_id0: String!\n}\n\n\"24-hour clock time value string in the format `hh:mm:ss` or `hh:mm:ss.sss`.\"\nscalar LocalTime\n\n\"A 64-bit signed integer\"\nscalar Long\n\ntype Mutation {\n  SimpleInput(event: SimpleInputInput!): SimpleInputResultOutput!\n}\n\ntype Query {\n  JoinTable(limit: Int = 10, offset: Int = 0): [JoinTable!]\n}\n\ninput SimpleInputInput {\n  userid: Int\n}\n\ntype SimpleInputResultOutput {\n  userid: Int\n  event_time: DateTime\n}\n\nenum _McpMethodType {\n  NONE\n  TOOL\n  RESOURCE\n}\n\nenum _RestMethodType {\n  NONE\n  GET\n  POST\n}\n\ndirective @api(mcp: _McpMethodType, rest: _RestMethodType, uri: String) on QUERY | MUTATION | FIELD_DEFINITION\n"
      }
    }
  }
}
