>>>pipeline_explain.txt
=== NestedCustomers
ID:          default_catalog.default_database.NestedCustomers
Type:        stream
Stage:       flink
Primary key: -
Timestamp:   -
Row count:   ~1e8
---
Schema:
 - customer_id: BIGINT
 - customer: RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" name, RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" email, RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" street, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" city, RecordType:peek_no_expand(DOUBLE lat, DOUBLE lon, RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" source, INTEGER meters) prec) geo) address) profile)
Inputs:
 - default_catalog.default_database.NestedCustomers__base
Annotations:
 - features: DENORMALIZE (feature)
Plan:
LogicalTableScan(table=[[default_catalog, default_database, NestedCustomers]])
SQL:
CREATE TABLE `NestedCustomers` (
  `customer_id` BIGINT,
  `customer` ROW(`name` VARCHAR, `profile` ROW(`email` VARCHAR, `address` ROW(`street` VARCHAR, `city` VARCHAR, `geo` ROW(`lat` DOUBLE, `lon` DOUBLE, `prec` ROW(`source` VARCHAR, `meters` INTEGER)))))
)
WITH (
  'format' = 'flexible-json',
  'path' = 'file:/mock',
  'source.monitor-interval' = '10 sec',
  'connector' = 'filesystem'
)
>>>flink-sql-no-functions.sql
CREATE TABLE `NestedCustomers` (
  `customer_id` BIGINT,
  `customer` ROW(`name` VARCHAR, `profile` ROW(`email` VARCHAR, `address` ROW(`street` VARCHAR, `city` VARCHAR, `geo` ROW(`lat` DOUBLE, `lon` DOUBLE, `prec` ROW(`source` VARCHAR, `meters` INTEGER)))))
)
WITH (
  'format' = 'flexible-json',
  'path' = 'file:/mock',
  'source.monitor-interval' = '10 sec',
  'connector' = 'filesystem'
);
CREATE TABLE `NestedCustomers_1` (
  `customer_id` BIGINT,
  `customer` RAW('com.datasqrl.flinkrunner.stdlib.json.FlinkJsonType', 'AERjb20uZGF0YXNxcmwuZmxpbmtydW5uZXIuc3RkbGliLmpzb24uRmxpbmtKc29uVHlwZVNlcmlhbGl6ZXJTbmFwc2hvdAAAAAM='),
  `__pk_hash` CHAR(32) CHARACTER SET `UTF-16LE`,
  PRIMARY KEY (`__pk_hash`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'sink.on-conflict.action' = 'IGNORE',
  'table-name' = 'NestedCustomers_1',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
EXECUTE STATEMENT SET BEGIN
INSERT INTO `default_catalog`.`default_database`.`NestedCustomers_1`
SELECT `customer_id`, `to_jsonb`(`customer`) AS `customer`, `hash_columns`(`customer_id`, `customer`) AS `__pk_hash`
 FROM `default_catalog`.`default_database`.`NestedCustomers`
;
END
>>>kafka.json
{
  "topics" : [ ],
  "testRunnerTopics" : [ ]
}
>>>postgres.json
{
  "statements" : [
    {
      "name" : "NestedCustomers_1",
      "type" : "TABLE",
      "sql" : "CREATE TABLE IF NOT EXISTS \"NestedCustomers_1\" (\"customer_id\" BIGINT, \"customer\" JSONB, \"__pk_hash\" TEXT, PRIMARY KEY (\"__pk_hash\"))",
      "fields" : [
        {
          "name" : "customer_id",
          "type" : "BIGINT",
          "nullable" : true
        },
        {
          "name" : "customer",
          "type" : "JSONB",
          "nullable" : true
        },
        {
          "name" : "__pk_hash",
          "type" : "TEXT",
          "nullable" : true
        }
      ],
      "primaryKey" : [
        "__pk_hash"
      ],
      "partitionKey" : [ ],
      "partitionType" : "NONE",
      "numPartitions" : 0,
      "ttl" : 0.0
    },
    {
      "name" : "NestedCustomers",
      "type" : "VIEW",
      "sql" : "CREATE OR REPLACE VIEW \"NestedCustomers\"(\"customer_id\", \"customer\") AS SELECT \"customer_id\", \"customer\"\nFROM \"NestedCustomers_1\"",
      "fields" : [
        {
          "name" : "customer_id",
          "type" : "BIGINT",
          "nullable" : true
        },
        {
          "name" : "customer",
          "type" : "JSONB",
          "nullable" : true
        }
      ]
    }
  ]
}
>>>vertx.json
{
  "models" : {
    "v1" : {
      "queries" : [
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "NestedCustomers",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT \"customer_id\", \"customer\"\nFROM \"NestedCustomers_1\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        }
      ],
      "mutations" : [ ],
      "subscriptions" : [ ],
      "operations" : [
        {
          "function" : {
            "name" : "GetNestedCustomers",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query NestedCustomers($limit: Int = 10, $offset: Int = 0) {\nNestedCustomers(limit: $limit, offset: $offset) {\ncustomer_id\ncustomer {\nname\nprofile {\nemail\n}\n}\n}\n\n}",
            "queryName" : "NestedCustomers",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/NestedCustomers{?offset,limit}"
        }
      ],
      "schema" : {
        "type" : "string",
        "schema" : "\"An RFC-3339 compliant Full Date Scalar\"\nscalar Date\n\n\"A DateTime scalar that handles both full RFC3339 and shorter timestamp formats\"\nscalar DateTime\n\n\"A JSON scalar\"\nscalar JSON\n\n\"24-hour clock time value string in the format `hh:mm:ss` or `hh:mm:ss.sss`.\"\nscalar LocalTime\n\n\"A 64-bit signed integer\"\nscalar Long\n\ntype NestedCustomers {\n  customer_id: Long\n  customer: NestedCustomers_customerOutput\n}\n\ntype NestedCustomers_customerOutput {\n  name: String\n  profile: NestedCustomers_customer_profileOutput\n}\n\ntype NestedCustomers_customer_profileOutput {\n  email: String\n  address: NestedCustomers_customer_profile_addressOutput\n}\n\ntype NestedCustomers_customer_profile_addressOutput {\n  street: String\n  city: String\n  geo: NestedCustomers_customer_profile_address_geoOutput\n}\n\ntype NestedCustomers_customer_profile_address_geoOutput {\n  lat: Float\n  lon: Float\n  prec: NestedCustomers_customer_profile_address_geo_precOutput\n}\n\ntype NestedCustomers_customer_profile_address_geo_precOutput {\n  source: String\n  meters: Int\n}\n\ntype Query {\n  NestedCustomers(limit: Int = 10, offset: Int = 0): [NestedCustomers!]\n}\n\nenum _McpMethodType {\n  NONE\n  TOOL\n  RESOURCE\n}\n\nenum _RestMethodType {\n  NONE\n  GET\n  POST\n}\n\ndirective @api(mcp: _McpMethodType, rest: _RestMethodType, uri: String) on QUERY | MUTATION | FIELD_DEFINITION\n"
      }
    }
  }
}
