>>>inferred_schema.graphqls
type AuthInputData {
  event_id: String!
  val: Long!
  message: String!
  event_time: DateTime!
}

input AuthInputDataInput {
  message: String!
}

type AuthInputDataResultOutput {
  event_id: String!
  val: Long!
  message: String!
  event_time: DateTime!
}

"An RFC-3339 compliant Full Date Scalar"
scalar Date

"A DateTime scalar that handles both full RFC3339 and shorter timestamp formats"
scalar DateTime

"A JSON scalar"
scalar JSON

"24-hour clock time value string in the format `hh:mm:ss` or `hh:mm:ss.sss`."
scalar LocalTime

"A 64-bit signed integer"
scalar Long

type Mutation {
  AuthInputData(event: AuthInputDataInput!): AuthInputDataResultOutput!
}

type MyStringTable {
  val: String!
}

type MyTable {
  val: Int!
}

type Query {
  MyStringTable(limit: Int = 10, offset: Int = 0): [MyStringTable!]
  MyTable(limit: Int = 10, offset: Int = 0): [MyTable!]
  AuthMyTable(limit: Int = 10, offset: Int = 0): [MyTable!]
  AuthMyTableValues(limit: Int = 10, offset: Int = 0): [MyTable!]
  AuthStringMyTableValues(limit: Int = 10, offset: Int = 0): [MyStringTable!]
}

type Subscription {
  MessageSubscription: AuthInputData
}

enum _McpMethodType {
  NONE
  TOOL
  RESOURCE
}

enum _RestMethodType {
  NONE
  GET
  POST
}

directive @api(mcp: _McpMethodType, rest: _RestMethodType, uri: String) on QUERY | MUTATION | FIELD_DEFINITION

>>>pipeline_explain.txt
=== AuthInputData
ID:          default_catalog.default_database.AuthInputData
Type:        stream
Stage:       flink
Primary key: event_id
Timestamp:   event_time
Row count:   ~1e8
---
Schema:
 - event_id: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - val: BIGINT NOT NULL
 - message: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - event_time: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
Inputs:
 - default_catalog.default_database.AuthInputData__base
Annotations:
 - stream-root: AuthInputData

=== AuthMyTable
ID:          default_catalog.default_database.AuthMyTable
Type:        query
Stage:       postgres
---
Inputs:
 - default_catalog.default_database.MyTable
Annotations:
 - parameters: val
 - base-table: MyTable

=== AuthMyTableValues
ID:          default_catalog.default_database.AuthMyTableValues
Type:        query
Stage:       postgres
---
Inputs:
 - default_catalog.default_database.MyTable
Annotations:
 - parameters: vals
 - base-table: MyTable

=== AuthStringMyTableValues
ID:          default_catalog.default_database.AuthStringMyTableValues
Type:        query
Stage:       postgres
---
Inputs:
 - default_catalog.default_database.MyStringTable
Annotations:
 - parameters: vals
 - base-table: MyStringTable

=== MessageSubscription
ID:          default_catalog.default_database.MessageSubscription
Type:        query
Stage:       kafka
---
Inputs:
 - default_catalog.default_database._Messages
Annotations:
 - stream-root: AuthInputData
 - parameters: val
 - base-table: AuthInputData

=== MyStringTable
ID:          default_catalog.default_database.MyStringTable
Type:        state
Stage:       flink
Primary key: val
Timestamp:   -
Row count:   ~4
---
Schema:
 - val: CHAR(1) CHARACTER SET "UTF-16LE" NOT NULL
Annotations:
 - sort: [0 ASC-nulls-first]

=== MyTable
ID:          default_catalog.default_database.MyTable
Type:        state
Stage:       flink
Primary key: val
Timestamp:   -
Row count:   ~1e1
---
Schema:
 - val: INTEGER NOT NULL
Annotations:
 - sort: [0 ASC-nulls-first]

=== _Messages
ID:          default_catalog.default_database._Messages
Type:        stream
Stage:       flink
Primary key: event_id
Timestamp:   event_time
Row count:   ~5e7
---
Schema:
 - event_id: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - val: BIGINT NOT NULL
 - message: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - event_time: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
Inputs:
 - default_catalog.default_database.AuthInputData
Annotations:
 - stream-root: AuthInputData

>>>flink-sql-no-functions.sql
CREATE VIEW `MyTable`
AS
SELECT `val`
FROM (VALUES ROW(1),
   ROW(2),
   ROW(3),
   ROW(4),
   ROW(5),
   ROW(6),
   ROW(7),
   ROW(8),
   ROW(9),
   ROW(10)) AS `t` (`val`);
CREATE VIEW `MyStringTable`
AS
SELECT `val`
FROM (VALUES ROW('a'),
   ROW('b'),
   ROW('c'),
   ROW('d')) AS `t` (`val`);
CREATE TABLE `AuthInputData` (
  `event_id` STRING NOT NULL,
  `val` BIGINT NOT NULL,
  `message` STRING NOT NULL,
  `event_time` TIMESTAMP_LTZ(3) NOT NULL METADATA FROM 'timestamp',
  WATERMARK FOR `event_time` AS `event_time` - INTERVAL '0.0' SECOND
)
WITH (
  'connector' = 'kafka',
  'format' = 'flexible-json',
  'properties.auto.offset.reset' = 'earliest',
  'properties.bootstrap.servers' = '${KAFKA_BOOTSTRAP_SERVERS}',
  'properties.compression.type' = 'zstd',
  'properties.group.id' = '${KAFKA_GROUP_ID}',
  'topic' = 'AuthInputData'
);
CREATE VIEW `_Messages`
AS
SELECT *
FROM `AuthInputData`
WHERE `message` <> '';
CREATE TABLE `MyStringTable_1` (
  `val` CHAR(1) CHARACTER SET `UTF-16LE` NOT NULL,
  PRIMARY KEY (`val`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'table-name' = 'MyStringTable',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
CREATE TABLE `MyTable_2` (
  `val` INTEGER NOT NULL,
  PRIMARY KEY (`val`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'table-name' = 'MyTable',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
CREATE TABLE `_Messages_3` (
  `event_id` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `val` BIGINT NOT NULL,
  `message` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `event_time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL
)
WITH (
  'connector' = 'kafka',
  'format' = 'flexible-json',
  'properties.bootstrap.servers' = '${KAFKA_BOOTSTRAP_SERVERS}',
  'properties.compression.type' = 'zstd',
  'properties.group.id' = '${KAFKA_GROUP_ID}',
  'topic' = '_Messages'
);
EXECUTE STATEMENT SET BEGIN
INSERT INTO `default_catalog`.`default_database`.`MyStringTable_1`
SELECT *
 FROM `default_catalog`.`default_database`.`MyStringTable`
;
INSERT INTO `default_catalog`.`default_database`.`MyTable_2`
 SELECT *
  FROM `default_catalog`.`default_database`.`MyTable`
 ;
 INSERT INTO `default_catalog`.`default_database`.`_Messages_3`
  SELECT *
   FROM `default_catalog`.`default_database`.`_Messages`
  ;
  END
>>>kafka.json
{
  "topics" : [
    {
      "topicName" : "_Messages",
      "tableName" : "_Messages_3",
      "format" : "flexible-json",
      "numPartitions" : 1,
      "replicationFactor" : 3,
      "type" : "SUBSCRIPTION",
      "messageKeys" : [ ],
      "messageSchema" : "",
      "config" : { }
    },
    {
      "topicName" : "AuthInputData",
      "tableName" : "AuthInputData",
      "format" : "flexible-json",
      "numPartitions" : 1,
      "replicationFactor" : 3,
      "type" : "MUTATION",
      "messageKeys" : [ ],
      "messageSchema" : "",
      "config" : { }
    }
  ],
  "testRunnerTopics" : [ ]
}
>>>postgres-schema.sql
CREATE TABLE IF NOT EXISTS "MyStringTable" ("val" TEXT NOT NULL, PRIMARY KEY ("val"));
CREATE TABLE IF NOT EXISTS "MyTable" ("val" INTEGER NOT NULL, PRIMARY KEY ("val"))
>>>postgres-views.sql

>>>vertx.json
{
  "models" : {
    "v1" : {
      "queries" : [
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "MyStringTable",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM (SELECT \"val\"\n  FROM \"MyStringTable\"\n  ORDER BY \"val\" NULLS FIRST) AS \"t\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        },
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "MyTable",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM (SELECT \"val\"\n  FROM \"MyTable\"\n  ORDER BY \"val\" NULLS FIRST) AS \"t\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        },
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "AuthMyTable",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM (SELECT \"val\"\n  FROM \"MyTable\"\n  ORDER BY \"val\" NULLS FIRST) AS \"t\"\nWHERE \"val\" = $1\nORDER BY \"val\" NULLS FIRST",
              "parameters" : [
                {
                  "type" : "metadata",
                  "metadata" : {
                    "metadataType" : "AUTH",
                    "name" : "val",
                    "required" : true
                  }
                }
              ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        },
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "AuthMyTableValues",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM (SELECT \"val\"\n  FROM \"MyTable\"\n  ORDER BY \"val\" NULLS FIRST) AS \"t\"\nWHERE (CAST(\"val\" AS BIGINT) = ANY (CAST($1 AS BIGINT ARRAY)))\nORDER BY \"val\" NULLS FIRST",
              "parameters" : [
                {
                  "type" : "metadata",
                  "metadata" : {
                    "metadataType" : "AUTH",
                    "name" : "values",
                    "required" : true
                  }
                }
              ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        },
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "AuthStringMyTableValues",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM (SELECT \"val\"\n  FROM \"MyStringTable\"\n  ORDER BY \"val\" NULLS FIRST) AS \"t\"\nWHERE (CAST(\"val\" AS TEXT) = ANY (CAST($1 AS TEXT ARRAY)))\nORDER BY \"val\" NULLS FIRST",
              "parameters" : [
                {
                  "type" : "metadata",
                  "metadata" : {
                    "metadataType" : "AUTH",
                    "name" : "values",
                    "required" : true
                  }
                }
              ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        }
      ],
      "mutations" : [
        {
          "type" : "kafka",
          "fieldName" : "AuthInputData",
          "returnList" : false,
          "topic" : "AuthInputData",
          "keyColumns" : [ ],
          "computedColumns" : {
            "val" : {
              "metadataType" : "AUTH",
              "name" : "val",
              "required" : true
            },
            "event_id" : {
              "metadataType" : "UUID",
              "name" : "",
              "required" : true
            },
            "event_time" : {
              "metadataType" : "TIMESTAMP",
              "name" : "",
              "required" : true
            }
          },
          "transactional" : false,
          "sinkConfig" : { }
        }
      ],
      "subscriptions" : [
        {
          "type" : "kafka",
          "fieldName" : "MessageSubscription",
          "topic" : "_Messages",
          "sinkConfig" : { },
          "equalityConditions" : {
            "val" : {
              "type" : "metadata",
              "metadata" : {
                "metadataType" : "AUTH",
                "name" : "val",
                "required" : true
              }
            }
          }
        }
      ],
      "operations" : [
        {
          "function" : {
            "name" : "GetMyStringTable",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query MyStringTable($limit: Int = 10, $offset: Int = 0) {\nMyStringTable(limit: $limit, offset: $offset) {\nval\n}\n\n}",
            "queryName" : "MyStringTable",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/MyStringTable{?offset,limit}"
        },
        {
          "function" : {
            "name" : "GetMyTable",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query MyTable($limit: Int = 10, $offset: Int = 0) {\nMyTable(limit: $limit, offset: $offset) {\nval\n}\n\n}",
            "queryName" : "MyTable",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/MyTable{?offset,limit}"
        },
        {
          "function" : {
            "name" : "GetAuthMyTable",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query AuthMyTable($limit: Int = 10, $offset: Int = 0) {\nAuthMyTable(limit: $limit, offset: $offset) {\nval\n}\n\n}",
            "queryName" : "AuthMyTable",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/AuthMyTable{?offset,limit}"
        },
        {
          "function" : {
            "name" : "GetAuthMyTableValues",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query AuthMyTableValues($limit: Int = 10, $offset: Int = 0) {\nAuthMyTableValues(limit: $limit, offset: $offset) {\nval\n}\n\n}",
            "queryName" : "AuthMyTableValues",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/AuthMyTableValues{?offset,limit}"
        },
        {
          "function" : {
            "name" : "GetAuthStringMyTableValues",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query AuthStringMyTableValues($limit: Int = 10, $offset: Int = 0) {\nAuthStringMyTableValues(limit: $limit, offset: $offset) {\nval\n}\n\n}",
            "queryName" : "AuthStringMyTableValues",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/AuthStringMyTableValues{?offset,limit}"
        },
        {
          "function" : {
            "name" : "AddAuthInputData",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "message" : {
                  "type" : "string"
                }
              },
              "required" : [
                "message"
              ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "mutation AuthInputData($message: String!) {\nAuthInputData(event: { message: $message }) {\nevent_id\nval\nmessage\nevent_time\n}\n\n}",
            "queryName" : "AuthInputData",
            "operationType" : "MUTATION"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "POST",
          "uriTemplate" : "mutations/AuthInputData"
        }
      ],
      "schema" : {
        "type" : "string",
        "schema" : "type AuthInputData {\n  event_id: String!\n  val: Long!\n  message: String!\n  event_time: DateTime!\n}\n\ninput AuthInputDataInput {\n  message: String!\n}\n\ntype AuthInputDataResultOutput {\n  event_id: String!\n  val: Long!\n  message: String!\n  event_time: DateTime!\n}\n\n\"An RFC-3339 compliant Full Date Scalar\"\nscalar Date\n\n\"A DateTime scalar that handles both full RFC3339 and shorter timestamp formats\"\nscalar DateTime\n\n\"A JSON scalar\"\nscalar JSON\n\n\"24-hour clock time value string in the format `hh:mm:ss` or `hh:mm:ss.sss`.\"\nscalar LocalTime\n\n\"A 64-bit signed integer\"\nscalar Long\n\ntype Mutation {\n  AuthInputData(event: AuthInputDataInput!): AuthInputDataResultOutput!\n}\n\ntype MyStringTable {\n  val: String!\n}\n\ntype MyTable {\n  val: Int!\n}\n\ntype Query {\n  MyStringTable(limit: Int = 10, offset: Int = 0): [MyStringTable!]\n  MyTable(limit: Int = 10, offset: Int = 0): [MyTable!]\n  AuthMyTable(limit: Int = 10, offset: Int = 0): [MyTable!]\n  AuthMyTableValues(limit: Int = 10, offset: Int = 0): [MyTable!]\n  AuthStringMyTableValues(limit: Int = 10, offset: Int = 0): [MyStringTable!]\n}\n\ntype Subscription {\n  MessageSubscription: AuthInputData\n}\n\nenum _McpMethodType {\n  NONE\n  TOOL\n  RESOURCE\n}\n\nenum _RestMethodType {\n  NONE\n  GET\n  POST\n}\n\ndirective @api(mcp: _McpMethodType, rest: _RestMethodType, uri: String) on QUERY | MUTATION | FIELD_DEFINITION\n"
      }
    }
  }
}
