>>>inferred_schema.graphqls
"An RFC-3339 compliant Full Date Scalar"
scalar Date

"A DateTime scalar that handles both full RFC3339 and shorter timestamp formats"
scalar DateTime

type Employees {
  employeeid: Long!
  name: String!
  email: String!
  updatedDate: DateTime!
  "Returns all employees reporting to the given employee following down the reporting chain"
  allReports(limit: Int = 10, offset: Int = 0): [Employees_allReports!]
  directReports(limit: Int = 10, offset: Int = 0): [Employees!]
}

"Returns all employees reporting to the given employee following down the reporting chain"
type Employees_allReports {
  employeeid: Long!
  name: String!
  level: Int!
}

"A JSON scalar"
scalar JSON

"24-hour clock time value string in the format `hh:mm:ss` or `hh:mm:ss.sss`."
scalar LocalTime

"A 64-bit signed integer"
scalar Long

type Query {
  Employees(employeeid: Long, name: String, limit: Int = 10, offset: Int = 0): [Employees!]
}

enum _McpMethodType {
  NONE
  TOOL
  RESOURCE
}

enum _RestMethodType {
  NONE
  GET
  POST
}

directive @api(mcp: _McpMethodType, rest: _RestMethodType, uri: String) on QUERY | MUTATION | FIELD_DEFINITION

>>>pipeline_explain.txt
=== Employees
ID:          default_catalog.default_database.Employees
Type:        state
Stage:       flink
Primary key: employeeid
Timestamp:   updatedDate
Row count:   ~2e7
---
Schema:
 - employeeid: BIGINT NOT NULL
 - name: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - email: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - updatedDate: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
Inputs:
 - default_catalog.hr.Employees
Annotations:
 - mostRecentDistinct: true

=== Reporting
ID:          default_catalog.default_database.Reporting
Type:        state
Stage:       flink
Primary key: employeeid, managerid
Timestamp:   updatedDate
Row count:   ~2e7
---
Schema:
 - employeeid: BIGINT NOT NULL
 - managerid: BIGINT NOT NULL
 - updatedDate: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
Inputs:
 - default_catalog.hr.Reporting
Annotations:
 - mostRecentDistinct: true

=== Employees
ID:          default_catalog.hr.Employees
Type:        stream
Stage:       flink
Primary key: -
Timestamp:   updatedDate
Row count:   ~1e8
---
Schema:
 - employeeid: BIGINT NOT NULL
 - name: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - email: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - updatedDate: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
Inputs:
 - default_catalog.hr.Employees__base

=== Reporting
ID:          default_catalog.hr.Reporting
Type:        stream
Stage:       flink
Primary key: -
Timestamp:   updatedDate
Row count:   ~1e8
---
Schema:
 - employeeid: BIGINT NOT NULL
 - managerid: BIGINT NOT NULL
 - updatedDate: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
Inputs:
 - default_catalog.hr.Reporting__base

>>>flink-sql-no-functions.sql
CREATE DATABASE IF NOT EXISTS `hr`;
USE `hr`;
CREATE TEMPORARY TABLE `Employees__schema` (
  `employeeid` BIGINT NOT NULL,
  `name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `email` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `updatedDate` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL
)
WITH (
  'connector' = 'filesystem',
  'format' = 'flexible-json',
  'path' = '${DATA_PATH}/employees.jsonl'
);
CREATE TABLE `Employees` (
  WATERMARK FOR `updatedDate` AS `updatedDate` - INTERVAL '0.001' SECOND
)
WITH (
  'source.monitor-interval' = '10 sec'
)
LIKE `Employees__schema`;
CREATE TEMPORARY TABLE `Reporting__schema` (
  `employeeid` BIGINT NOT NULL,
  `managerid` BIGINT NOT NULL,
  `updatedDate` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL
)
WITH (
  'connector' = 'filesystem',
  'format' = 'flexible-json',
  'path' = '${DATA_PATH}/reporting.jsonl'
);
CREATE TABLE `Reporting` (
  WATERMARK FOR `updatedDate` AS `updatedDate` - INTERVAL '0.001' SECOND
)
WITH (
  'source.monitor-interval' = '10 sec'
)
LIKE `Reporting__schema`;
USE CATALOG `default_catalog`;
CREATE DATABASE IF NOT EXISTS `default_database`;
USE `default_database`;
CREATE VIEW `Employees`
AS
SELECT `employeeid`, `name`, `email`, `updatedDate`
FROM (SELECT `employeeid`, `name`, `email`, `updatedDate`, ROW_NUMBER() OVER (PARTITION BY `employeeid` ORDER BY `updatedDate` DESC NULLS LAST) AS `__sqrlinternal_rownum`
  FROM `default_catalog`.`hr`.`Employees`) AS `t`
WHERE `__sqrlinternal_rownum` = 1;
CREATE VIEW `Reporting`
AS
SELECT `employeeid`, `managerid`, `updatedDate`
FROM (SELECT `employeeid`, `managerid`, `updatedDate`, ROW_NUMBER() OVER (PARTITION BY `employeeid`, `managerid` ORDER BY `updatedDate` DESC NULLS LAST) AS `__sqrlinternal_rownum`
  FROM `default_catalog`.`hr`.`Reporting`) AS `t`
WHERE `__sqrlinternal_rownum` = 1;
CREATE TABLE `Employees_1` (
  `employeeid` BIGINT NOT NULL,
  `name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `email` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `updatedDate` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  PRIMARY KEY (`employeeid`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'sink.on-conflict.action' = 'TIMESTAMP',
  'sink.on-conflict.timestamp-column' = 'updatedDate',
  'table-name' = 'Employees',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
CREATE TABLE `Reporting_2` (
  `employeeid` BIGINT NOT NULL,
  `managerid` BIGINT NOT NULL,
  `updatedDate` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  PRIMARY KEY (`employeeid`, `managerid`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'sink.on-conflict.action' = 'TIMESTAMP',
  'sink.on-conflict.timestamp-column' = 'updatedDate',
  'table-name' = 'Reporting',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
EXECUTE STATEMENT SET BEGIN
INSERT INTO `default_catalog`.`default_database`.`Employees_1`
SELECT *
 FROM `default_catalog`.`hr`.`Employees`
;
INSERT INTO `default_catalog`.`default_database`.`Reporting_2`
 SELECT *
  FROM `default_catalog`.`hr`.`Reporting`
 ;
 END
>>>postgres-schema.sql
CREATE TABLE IF NOT EXISTS "Employees" ("employeeid" BIGINT NOT NULL, "name" TEXT NOT NULL, "email" TEXT NOT NULL, "updatedDate" TIMESTAMP WITH TIME ZONE NOT NULL, PRIMARY KEY ("employeeid"));
CREATE TABLE IF NOT EXISTS "Reporting" ("employeeid" BIGINT NOT NULL, "managerid" BIGINT NOT NULL, "updatedDate" TIMESTAMP WITH TIME ZONE NOT NULL, PRIMARY KEY ("employeeid","managerid"));

CREATE INDEX IF NOT EXISTS "Employees_hash_c1" ON "Employees" USING hash ("name");
CREATE INDEX IF NOT EXISTS "Reporting_hash_c1" ON "Reporting" USING hash ("managerid")
>>>postgres-views.sql

>>>vertx.json
{
  "models" : {
    "v1" : {
      "queries" : [
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "Employees",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "employeeid"
              },
              {
                "type" : "variable",
                "path" : "name"
              },
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM \"Employees\"\nWHERE (\"employeeid\" = $1 OR $1 IS NULL) AND (\"name\" = $2 OR $2 IS NULL)",
              "parameters" : [
                {
                  "type" : "arg",
                  "path" : "employeeid",
                  "sqlType" : "BIGINT"
                },
                {
                  "type" : "arg",
                  "path" : "name",
                  "sqlType" : "VARCHAR"
                }
              ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        },
        {
          "type" : "args",
          "parentType" : "Employees",
          "fieldName" : "allReports",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "WITH RECURSIVE employee_hierarchy AS (\n    SELECT r.employeeid, r.managerid, 1 as level\n    FROM \"Reporting\" r\n    WHERE r.managerid = $1\n    UNION ALL\n    SELECT r.employeeid, r.managerid, eh.level + 1 as level\n    FROM \"Reporting\" r\n    INNER JOIN employee_hierarchy eh ON r.managerid = eh.employeeid\n  )\n  SELECT e.employeeid, e.name, eh.level\n  FROM employee_hierarchy eh\n  JOIN \"Employees\" e ON eh.employeeid = e.employeeid\n  ORDER BY eh.level, e.employeeid",
              "parameters" : [
                {
                  "type" : "source",
                  "key" : "employeeid"
                }
              ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        },
        {
          "type" : "args",
          "parentType" : "Employees",
          "fieldName" : "directReports",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT \"Employees\".\"employeeid\", \"Employees\".\"name\", \"Employees\".\"email\", \"Employees\".\"updatedDate\"\nFROM \"Reporting\"\n INNER JOIN \"Employees\" ON \"Reporting\".\"employeeid\" = \"Employees\".\"employeeid\"\nWHERE \"Reporting\".\"managerid\" = $1\nORDER BY \"Employees\".\"employeeid\" NULLS FIRST",
              "parameters" : [
                {
                  "type" : "source",
                  "key" : "employeeid"
                }
              ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        }
      ],
      "mutations" : [ ],
      "subscriptions" : [ ],
      "operations" : [
        {
          "function" : {
            "name" : "GetEmployees",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "allReports_limit" : {
                  "type" : "integer"
                },
                "offset" : {
                  "type" : "integer"
                },
                "name" : {
                  "type" : "string"
                },
                "limit" : {
                  "type" : "integer"
                },
                "employeeid" : {
                  "type" : "integer"
                },
                "allReports_offset" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query Employees($employeeid: Long, $name: String, $limit: Int = 10, $offset: Int = 0$allReports_limit: Int = 10, $allReports_offset: Int = 0) {\nEmployees(employeeid: $employeeid, name: $name, limit: $limit, offset: $offset) {\nemployeeid\nname\nemail\nupdatedDate\nallReports(limit: $allReports_limit, offset: $allReports_offset) {\nemployeeid\nname\nlevel\n}\n}\n\n}",
            "queryName" : "Employees",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/Employees{?allReports_limit,offset,name,limit,employeeid,allReports_offset}"
        }
      ],
      "schema" : {
        "type" : "string",
        "schema" : "\"An RFC-3339 compliant Full Date Scalar\"\nscalar Date\n\n\"A DateTime scalar that handles both full RFC3339 and shorter timestamp formats\"\nscalar DateTime\n\ntype Employees {\n  employeeid: Long!\n  name: String!\n  email: String!\n  updatedDate: DateTime!\n  \"Returns all employees reporting to the given employee following down the reporting chain\"\n  allReports(limit: Int = 10, offset: Int = 0): [Employees_allReports!]\n  directReports(limit: Int = 10, offset: Int = 0): [Employees!]\n}\n\n\"Returns all employees reporting to the given employee following down the reporting chain\"\ntype Employees_allReports {\n  employeeid: Long!\n  name: String!\n  level: Int!\n}\n\n\"A JSON scalar\"\nscalar JSON\n\n\"24-hour clock time value string in the format `hh:mm:ss` or `hh:mm:ss.sss`.\"\nscalar LocalTime\n\n\"A 64-bit signed integer\"\nscalar Long\n\ntype Query {\n  Employees(employeeid: Long, name: String, limit: Int = 10, offset: Int = 0): [Employees!]\n}\n\nenum _McpMethodType {\n  NONE\n  TOOL\n  RESOURCE\n}\n\nenum _RestMethodType {\n  NONE\n  GET\n  POST\n}\n\ndirective @api(mcp: _McpMethodType, rest: _RestMethodType, uri: String) on QUERY | MUTATION | FIELD_DEFINITION\n"
      }
    }
  }
}
