>>>inferred_schema.graphqls
type ApplicationUpdates {
  loan_application_id: Long!
  status: String!
  message: String!
  event_time: DateTime!
}

type Applications {
  id: Long!
  customer_id: Long!
  loan_type_id: Long!
  amount: Float!
  duration: Long!
  application_date: DateTime!
  updated_at: DateTime!
}

type CustomerUpdates {
  customer_id: Long!
  num_updates: Long!
}

type Customers {
  id: Long!
  first_name: String!
  last_name: String!
  email: String!
  phone: String!
  address: String!
  date_of_birth: String!
  updated_at: DateTime!
}

"An RFC-3339 compliant Full Date Scalar"
scalar Date

"A DateTime scalar that handles both full RFC3339 and shorter timestamp formats"
scalar DateTime

"A JSON scalar"
scalar JSON

"24-hour clock time value string in the format `hh:mm:ss` or `hh:mm:ss.sss`."
scalar LocalTime

"A 64-bit signed integer"
scalar Long

type Query {
  ApplicationUpdates(limit: Int = 10, offset: Int = 0): [ApplicationUpdates!]
  Applications(limit: Int = 10, offset: Int = 0): [Applications!]
  CustomerUpdates(limit: Int = 10, offset: Int = 0): [CustomerUpdates!]
  Customers(limit: Int = 10, offset: Int = 0): [Customers!]
}

enum _McpMethodType {
  NONE
  TOOL
  RESOURCE
}

enum _RestMethodType {
  NONE
  GET
  POST
}

directive @api(mcp: _McpMethodType, rest: _RestMethodType, uri: String) on QUERY | MUTATION | FIELD_DEFINITION

>>>pipeline_explain.txt
=== ApplicationUpdates
ID:          default_catalog.default_database.ApplicationUpdates
Type:        stream
Stage:       flink
Primary key: loan_application_id, event_time
Timestamp:   -
Row count:   ~1e8
---
Schema:
 - loan_application_id: BIGINT NOT NULL
 - status: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - message: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - event_time: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL
Inputs:
 - default_catalog.default_database.ApplicationUpdates__base
Annotations:
 - stream-root: ApplicationUpdates

=== Applications
ID:          default_catalog.default_database.Applications
Type:        state
Stage:       flink
Primary key: id
Timestamp:   -
Row count:   ~2e7
---
Schema:
 - id: BIGINT NOT NULL
 - customer_id: BIGINT NOT NULL
 - loan_type_id: BIGINT NOT NULL
 - amount: DOUBLE NOT NULL
 - duration: BIGINT NOT NULL
 - application_date: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL
 - updated_at: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL
Inputs:
 - default_catalog.default_database._ApplicationsStream
Annotations:
 - stream-root: _ApplicationsStream

=== CustomerUpdates
ID:          default_catalog.default_database.CustomerUpdates
Type:        state
Stage:       flink
Primary key: customer_id
Timestamp:   -
Row count:   ~1e7
---
Schema:
 - customer_id: BIGINT NOT NULL
 - num_updates: BIGINT NOT NULL
Inputs:
 - default_catalog.default_database.ApplicationUpdates
 - default_catalog.default_database.Applications
 - default_catalog.default_database.Customers

=== Customers
ID:          default_catalog.default_database.Customers
Type:        state
Stage:       flink
Primary key: id
Timestamp:   -
Row count:   ~2e7
---
Schema:
 - id: BIGINT NOT NULL
 - first_name: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - last_name: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - email: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - phone: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - address: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - date_of_birth: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - updated_at: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL
Inputs:
 - default_catalog.default_database._CustomersStream
Annotations:
 - stream-root: _CustomersStream

=== _ApplicationsStream
ID:          default_catalog.default_database._ApplicationsStream
Type:        stream
Stage:       flink
Primary key: id, updated_at
Timestamp:   -
Row count:   ~1e8
---
Schema:
 - id: BIGINT NOT NULL
 - customer_id: BIGINT NOT NULL
 - loan_type_id: BIGINT NOT NULL
 - amount: DOUBLE NOT NULL
 - duration: BIGINT NOT NULL
 - application_date: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL
 - updated_at: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL
Inputs:
 - default_catalog.default_database._ApplicationsStream__base
Annotations:
 - stream-root: _ApplicationsStream

=== _CustomersStream
ID:          default_catalog.default_database._CustomersStream
Type:        stream
Stage:       flink
Primary key: id, updated_at
Timestamp:   -
Row count:   ~1e8
---
Schema:
 - id: BIGINT NOT NULL
 - first_name: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - last_name: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - email: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - phone: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - address: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - date_of_birth: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - updated_at: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL
Inputs:
 - default_catalog.default_database._CustomersStream__base
Annotations:
 - stream-root: _CustomersStream

>>>flink-sql-no-functions.sql
CREATE TEMPORARY TABLE `_CustomersStream__schema` (
  `id` BIGINT NOT NULL,
  `first_name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `last_name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `email` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `phone` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `address` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `date_of_birth` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `updated_at` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL
)
WITH (
  'connector' = 'datagen'
);
CREATE TABLE `_CustomersStream` (
  PRIMARY KEY (`id`, `updated_at`) NOT ENFORCED
)
WITH (
  'format' = 'flexible-json',
  'path' = '${DATA_PATH}/customers.jsonl',
  'connector' = 'filesystem'
)
LIKE `_CustomersStream__schema`;
CREATE TEMPORARY TABLE `_ApplicationsStream__schema` (
  `id` BIGINT NOT NULL,
  `customer_id` BIGINT NOT NULL,
  `loan_type_id` BIGINT NOT NULL,
  `amount` DOUBLE NOT NULL,
  `duration` BIGINT NOT NULL,
  `application_date` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  `updated_at` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL
)
WITH (
  'connector' = 'datagen'
);
CREATE TABLE `_ApplicationsStream` (
  PRIMARY KEY (`id`, `updated_at`) NOT ENFORCED
)
WITH (
  'format' = 'flexible-json',
  'path' = '${DATA_PATH}/applications.jsonl',
  'connector' = 'filesystem'
)
LIKE `_ApplicationsStream__schema`;
CREATE TEMPORARY TABLE `ApplicationUpdates__schema` (
  `loan_application_id` BIGINT NOT NULL,
  `status` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `message` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `event_time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL
)
WITH (
  'connector' = 'datagen'
);
CREATE TABLE `ApplicationUpdates` (
  PRIMARY KEY (`loan_application_id`, `event_time`) NOT ENFORCED
)
WITH (
  'format' = 'flexible-json',
  'path' = '${DATA_PATH}/application_updates.jsonl',
  'connector' = 'filesystem'
)
LIKE `ApplicationUpdates__schema`;
CREATE VIEW `Customers`
AS
SELECT `id`, `first_name`, `last_name`, `email`, `phone`, `address`, `date_of_birth`, `updated_at`
FROM (SELECT `id`, `first_name`, `last_name`, `email`, `phone`, `address`, `date_of_birth`, `updated_at`, ROW_NUMBER() OVER (PARTITION BY `id` ORDER BY `updated_at` DESC NULLS LAST) AS `__sqrlinternal_rownum`
  FROM `default_catalog`.`default_database`.`_CustomersStream`) AS `t`
WHERE `__sqrlinternal_rownum` = 1;
CREATE VIEW `Applications`
AS
SELECT `id`, `customer_id`, `loan_type_id`, `amount`, `duration`, `application_date`, `updated_at`
FROM (SELECT `id`, `customer_id`, `loan_type_id`, `amount`, `duration`, `application_date`, `updated_at`, ROW_NUMBER() OVER (PARTITION BY `id` ORDER BY `updated_at` DESC NULLS LAST) AS `__sqrlinternal_rownum`
  FROM `default_catalog`.`default_database`.`_ApplicationsStream`) AS `t`
WHERE `__sqrlinternal_rownum` = 1;
CREATE VIEW `CustomerUpdates`
AS
SELECT `c`.`id` AS `customer_id`, COUNT(*) AS `num_updates`
FROM `Customers` AS `c`
 INNER JOIN `Applications` AS `a` ON `c`.`id` = `a`.`customer_id`
 INNER JOIN `ApplicationUpdates` AS `u` ON `a`.`id` = `u`.`loan_application_id`
GROUP BY `c`.`id`;
CREATE TABLE `ApplicationUpdates_1` (
  `loan_application_id` BIGINT NOT NULL,
  `status` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `message` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `event_time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  PRIMARY KEY (`loan_application_id`, `event_time`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'sink.on-conflict.action' = 'IGNORE',
  'table-name' = 'ApplicationUpdates',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
CREATE TABLE `Applications_2` (
  `id` BIGINT NOT NULL,
  `customer_id` BIGINT NOT NULL,
  `loan_type_id` BIGINT NOT NULL,
  `amount` DOUBLE NOT NULL,
  `duration` BIGINT NOT NULL,
  `application_date` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  `updated_at` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  PRIMARY KEY (`id`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'table-name' = 'Applications',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
CREATE TABLE `CustomerUpdates_3` (
  `customer_id` BIGINT NOT NULL,
  `num_updates` BIGINT NOT NULL,
  PRIMARY KEY (`customer_id`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'table-name' = 'CustomerUpdates',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
CREATE TABLE `Customers_4` (
  `id` BIGINT NOT NULL,
  `first_name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `last_name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `email` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `phone` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `address` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `date_of_birth` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `updated_at` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  PRIMARY KEY (`id`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'table-name' = 'Customers',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
EXECUTE STATEMENT SET BEGIN
INSERT INTO `default_catalog`.`default_database`.`ApplicationUpdates_1`
SELECT *
 FROM `default_catalog`.`default_database`.`ApplicationUpdates`
;
INSERT INTO `default_catalog`.`default_database`.`Applications_2`
 SELECT *
  FROM `default_catalog`.`default_database`.`Applications`
 ;
 INSERT INTO `default_catalog`.`default_database`.`CustomerUpdates_3`
  SELECT *
   FROM `default_catalog`.`default_database`.`CustomerUpdates`
  ;
  INSERT INTO `default_catalog`.`default_database`.`Customers_4`
   SELECT *
    FROM `default_catalog`.`default_database`.`Customers`
   ;
   END
>>>postgres-schema.sql
CREATE TABLE IF NOT EXISTS "ApplicationUpdates" ("loan_application_id" BIGINT NOT NULL, "status" TEXT NOT NULL, "message" TEXT NOT NULL, "event_time" TIMESTAMP WITH TIME ZONE NOT NULL, PRIMARY KEY ("loan_application_id","event_time"));
CREATE TABLE IF NOT EXISTS "Applications" ("id" BIGINT NOT NULL, "customer_id" BIGINT NOT NULL, "loan_type_id" BIGINT NOT NULL, "amount" DOUBLE PRECISION NOT NULL, "duration" BIGINT NOT NULL, "application_date" TIMESTAMP WITH TIME ZONE NOT NULL, "updated_at" TIMESTAMP WITH TIME ZONE NOT NULL, PRIMARY KEY ("id"));
CREATE TABLE IF NOT EXISTS "CustomerUpdates" ("customer_id" BIGINT NOT NULL, "num_updates" BIGINT NOT NULL, PRIMARY KEY ("customer_id"));
CREATE TABLE IF NOT EXISTS "Customers" ("id" BIGINT NOT NULL, "first_name" TEXT NOT NULL, "last_name" TEXT NOT NULL, "email" TEXT NOT NULL, "phone" TEXT NOT NULL, "address" TEXT NOT NULL, "date_of_birth" TEXT NOT NULL, "updated_at" TIMESTAMP WITH TIME ZONE NOT NULL, PRIMARY KEY ("id"))
>>>postgres-views.sql

>>>vertx.json
{
  "models" : {
    "v1" : {
      "queries" : [
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "ApplicationUpdates",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM \"ApplicationUpdates\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        },
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "Applications",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM \"Applications\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        },
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "CustomerUpdates",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM \"CustomerUpdates\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        },
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "Customers",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM \"Customers\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        }
      ],
      "mutations" : [ ],
      "subscriptions" : [ ],
      "operations" : [
        {
          "function" : {
            "name" : "GetApplicationUpdates",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query ApplicationUpdates($limit: Int = 10, $offset: Int = 0) {\nApplicationUpdates(limit: $limit, offset: $offset) {\nloan_application_id\nstatus\nmessage\nevent_time\n}\n\n}",
            "queryName" : "ApplicationUpdates",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/ApplicationUpdates{?offset,limit}"
        },
        {
          "function" : {
            "name" : "GetApplications",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query Applications($limit: Int = 10, $offset: Int = 0) {\nApplications(limit: $limit, offset: $offset) {\nid\ncustomer_id\nloan_type_id\namount\nduration\napplication_date\nupdated_at\n}\n\n}",
            "queryName" : "Applications",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/Applications{?offset,limit}"
        },
        {
          "function" : {
            "name" : "GetCustomerUpdates",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query CustomerUpdates($limit: Int = 10, $offset: Int = 0) {\nCustomerUpdates(limit: $limit, offset: $offset) {\ncustomer_id\nnum_updates\n}\n\n}",
            "queryName" : "CustomerUpdates",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/CustomerUpdates{?offset,limit}"
        },
        {
          "function" : {
            "name" : "GetCustomers",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query Customers($limit: Int = 10, $offset: Int = 0) {\nCustomers(limit: $limit, offset: $offset) {\nid\nfirst_name\nlast_name\nemail\nphone\naddress\ndate_of_birth\nupdated_at\n}\n\n}",
            "queryName" : "Customers",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/Customers{?offset,limit}"
        }
      ],
      "schema" : {
        "type" : "string",
        "schema" : "type ApplicationUpdates {\n  loan_application_id: Long!\n  status: String!\n  message: String!\n  event_time: DateTime!\n}\n\ntype Applications {\n  id: Long!\n  customer_id: Long!\n  loan_type_id: Long!\n  amount: Float!\n  duration: Long!\n  application_date: DateTime!\n  updated_at: DateTime!\n}\n\ntype CustomerUpdates {\n  customer_id: Long!\n  num_updates: Long!\n}\n\ntype Customers {\n  id: Long!\n  first_name: String!\n  last_name: String!\n  email: String!\n  phone: String!\n  address: String!\n  date_of_birth: String!\n  updated_at: DateTime!\n}\n\n\"An RFC-3339 compliant Full Date Scalar\"\nscalar Date\n\n\"A DateTime scalar that handles both full RFC3339 and shorter timestamp formats\"\nscalar DateTime\n\n\"A JSON scalar\"\nscalar JSON\n\n\"24-hour clock time value string in the format `hh:mm:ss` or `hh:mm:ss.sss`.\"\nscalar LocalTime\n\n\"A 64-bit signed integer\"\nscalar Long\n\ntype Query {\n  ApplicationUpdates(limit: Int = 10, offset: Int = 0): [ApplicationUpdates!]\n  Applications(limit: Int = 10, offset: Int = 0): [Applications!]\n  CustomerUpdates(limit: Int = 10, offset: Int = 0): [CustomerUpdates!]\n  Customers(limit: Int = 10, offset: Int = 0): [Customers!]\n}\n\nenum _McpMethodType {\n  NONE\n  TOOL\n  RESOURCE\n}\n\nenum _RestMethodType {\n  NONE\n  GET\n  POST\n}\n\ndirective @api(mcp: _McpMethodType, rest: _RestMethodType, uri: String) on QUERY | MUTATION | FIELD_DEFINITION\n"
      }
    }
  }
}
