>>>pipeline_explain.txt
=== Customer
ID:          default_catalog.default_database.Customer
Type:        stream
Stage:       flink
Primary key: customerid, lastUpdated
Timestamp:   timestamp
Row count:   ~1e8
---
Schema:
 - customerid: BIGINT NOT NULL
 - email: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - name: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - lastUpdated: BIGINT NOT NULL
 - timestamp: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
Inputs:
 - default_catalog.default_database.Customer__base
Annotations:
 - stream-root: Customer
Plan:
LogicalWatermarkAssigner(rowtime=[timestamp], watermark=[-($4, 1:INTERVAL SECOND)])
  LogicalProject(customerid=[$0], email=[$1], name=[$2], lastUpdated=[$3], timestamp=[COALESCE(TO_TIMESTAMP_LTZ($3, 0), 1970-01-01 08:00:00:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))])
    LogicalTableScan(table=[[default_catalog, default_database, Customer]])
SQL:
CREATE TEMPORARY TABLE `Customer__schema` (
  `customerid` BIGINT NOT NULL,
  `email` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `lastUpdated` BIGINT NOT NULL
)
WITH (
  'connector' = 'datagen'
);
CREATE TABLE `Customer` (
  `timestamp` AS COALESCE(`TO_TIMESTAMP_LTZ`(`lastUpdated`, 0), CAST(TIMESTAMP '1970-01-01 00:00:00.000' AS TIMESTAMP(3) WITH LOCAL TIME ZONE)),
  PRIMARY KEY (`customerid`, `lastUpdated`) NOT ENFORCED,
  WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '0.001' SECOND
)
WITH (
  'format' = 'flexible-json',
  'path' = 'file:/mock',
  'source.monitor-interval' = '10 sec',
  'connector' = 'filesystem'
)
LIKE `Customer__schema`
=== CustomerVectorEmbed
ID:          default_catalog.default_database.CustomerVectorEmbed
Type:        stream
Stage:       flink
Primary key: -
Timestamp:   -
Row count:   ~1e8
---
Schema:
 - customerid: BIGINT NOT NULL
 - name: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - name_vector: RAW('com.datasqrl.flinkrunner.stdlib.vector.FlinkVectorType', 'AEhjb20uZGF0YXNxcmwuZmxpbmtydW5uZXIuc3RkbGliLnZlY3Rvci5GbGlua1ZlY3RvclR5cGVTZXJpYWxpemVyU25hcHNob3QAAAAD')
Inputs:
 - default_catalog.default_database.Customer
Annotations:
 - stream-root: Customer
Plan:
LogicalProject(customerid=[$0], name=[$2], name_vector=[vector_embed($2, 'test-model')])
  LogicalTableScan(table=[[default_catalog, default_database, Customer]])
SQL:
CREATE VIEW `CustomerVectorEmbed` AS  SELECT customerid, name, vector_embed(name, 'test-model') AS name_vector
                       FROM Customer;

=== CustomersByName
ID:          default_catalog.default_database.CustomersByName
Type:        query
Stage:       postgres
---
Inputs:
 - default_catalog.default_database.Customer
Annotations:
 - stream-root: Customer
 - parameters: inputName, shortName
 - base-table: Customer
Plan:
LogicalProject(customerid=[$0], email=[$1], name=[$2], lastUpdated=[$3], timestamp=[$4])
  LogicalFilter(condition=[LIKE($2, ?1)])
    LogicalTableScan(table=[[default_catalog, default_database, Customer]])
SQL:
CREATE VIEW `CustomersByName` AS 
    SELECT * FROM Customer WHERE name LIKE ?         ;

=== CustomersByVector
ID:          default_catalog.default_database.CustomersByVector
Type:        query
Stage:       postgres
---
Inputs:
 - default_catalog.default_database.CustomerVectorEmbed
Annotations:
 - stream-root: Customer
 - parameters: name, vector
 - base-table: CustomersByVector
Plan:
LogicalSort(sort0=[$3], dir0=[DESC-nulls-last])
  LogicalProject(customerid=[$0], name=[$1], name_vector=[$2], EXPR$3=[cosine_similarity(?1, $2)])
    LogicalTableScan(table=[[default_catalog, default_database, CustomerVectorEmbed]])
SQL:
CREATE VIEW `CustomersByVector` AS 
                SELECT * FROM CustomerVectorEmbed ORDER BY cosine_similarity(?      , name_vector) DESC;

>>>flink-sql-no-functions.sql
CREATE TEMPORARY TABLE `Customer__schema` (
  `customerid` BIGINT NOT NULL,
  `email` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `lastUpdated` BIGINT NOT NULL
)
WITH (
  'connector' = 'datagen'
);
CREATE TABLE `Customer` (
  `timestamp` AS COALESCE(`TO_TIMESTAMP_LTZ`(`lastUpdated`, 0), TIMESTAMP '1970-01-01 00:00:00.000'),
  PRIMARY KEY (`customerid`, `lastUpdated`) NOT ENFORCED,
  WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '0.001' SECOND
)
WITH (
  'format' = 'flexible-json',
  'path' = 'file:/mock',
  'source.monitor-interval' = '10 sec',
  'connector' = 'filesystem'
)
LIKE `Customer__schema`;
CREATE VIEW `CustomerVectorEmbed`
AS
SELECT `customerid`, `name`, `vector_embed`(`name`, 'test-model') AS `name_vector`
FROM `Customer`;
CREATE TABLE `Customer_1` (
  `customerid` BIGINT NOT NULL,
  `email` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `lastUpdated` BIGINT NOT NULL,
  `timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  PRIMARY KEY (`customerid`, `lastUpdated`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'sink.on-conflict.action' = 'IGNORE',
  'table-name' = 'Customer_1',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
CREATE TABLE `CustomerVectorEmbed_2` (
  `customerid` BIGINT NOT NULL,
  `name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `name_vector` RAW('com.datasqrl.flinkrunner.stdlib.vector.FlinkVectorType', 'AEhjb20uZGF0YXNxcmwuZmxpbmtydW5uZXIuc3RkbGliLnZlY3Rvci5GbGlua1ZlY3RvclR5cGVTZXJpYWxpemVyU25hcHNob3QAAAAD'),
  `__pk_hash` CHAR(32) CHARACTER SET `UTF-16LE`,
  PRIMARY KEY (`__pk_hash`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'sink.on-conflict.action' = 'IGNORE',
  'table-name' = 'CustomerVectorEmbed_2',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
EXECUTE STATEMENT SET BEGIN
INSERT INTO `default_catalog`.`default_database`.`Customer_1`
SELECT *
 FROM `default_catalog`.`default_database`.`Customer`
;
INSERT INTO `default_catalog`.`default_database`.`CustomerVectorEmbed_2`
 SELECT `customerid`, `name`, `name_vector`, `hash_columns`(`customerid`, `name`, `name_vector`) AS `__pk_hash`
  FROM `default_catalog`.`default_database`.`CustomerVectorEmbed`
 ;
 END
>>>kafka.json
{
  "topics" : [ ],
  "testRunnerTopics" : [ ]
}
>>>postgres.json
{
  "statements" : [
    {
      "name" : "VectorPgExtension",
      "type" : "EXTENSION",
      "sql" : "CREATE EXTENSION IF NOT EXISTS vector"
    },
    {
      "name" : "Customer_1",
      "type" : "TABLE",
      "sql" : "CREATE TABLE IF NOT EXISTS \"Customer_1\" (\"customerid\" BIGINT NOT NULL, \"email\" TEXT NOT NULL, \"name\" TEXT NOT NULL, \"lastUpdated\" BIGINT NOT NULL, \"timestamp\" TIMESTAMP WITH TIME ZONE NOT NULL, PRIMARY KEY (\"customerid\",\"lastUpdated\"))",
      "fields" : [
        {
          "name" : "customerid",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "email",
          "type" : "TEXT",
          "nullable" : false
        },
        {
          "name" : "name",
          "type" : "TEXT",
          "nullable" : false
        },
        {
          "name" : "lastUpdated",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "timestamp",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : false
        }
      ],
      "primaryKey" : [
        "customerid",
        "lastUpdated"
      ],
      "partitionKey" : [ ],
      "partitionType" : "NONE",
      "numPartitions" : 0,
      "ttl" : 0.0
    },
    {
      "name" : "CustomerVectorEmbed_2",
      "type" : "TABLE",
      "sql" : "CREATE TABLE IF NOT EXISTS \"CustomerVectorEmbed_2\" (\"customerid\" BIGINT NOT NULL, \"name\" TEXT NOT NULL, \"name_vector\" VECTOR, \"__pk_hash\" TEXT, PRIMARY KEY (\"__pk_hash\"))",
      "fields" : [
        {
          "name" : "customerid",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "name",
          "type" : "TEXT",
          "nullable" : false
        },
        {
          "name" : "name_vector",
          "type" : "VECTOR",
          "nullable" : true
        },
        {
          "name" : "__pk_hash",
          "type" : "TEXT",
          "nullable" : true
        }
      ],
      "primaryKey" : [
        "__pk_hash"
      ],
      "partitionKey" : [ ],
      "partitionType" : "NONE",
      "numPartitions" : 0,
      "ttl" : 0.0
    },
    {
      "name" : "Customer",
      "type" : "VIEW",
      "sql" : "CREATE OR REPLACE VIEW \"Customer\"(\"customerid\", \"email\", \"name\", \"lastUpdated\", \"timestamp\") AS SELECT *\nFROM \"Customer_1\"",
      "fields" : [
        {
          "name" : "customerid",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "email",
          "type" : "TEXT",
          "nullable" : false
        },
        {
          "name" : "name",
          "type" : "TEXT",
          "nullable" : false
        },
        {
          "name" : "lastUpdated",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "timestamp",
          "type" : "TIMESTAMP WITH TIME ZONE",
          "nullable" : false
        }
      ]
    },
    {
      "name" : "CustomerVectorEmbed",
      "type" : "VIEW",
      "sql" : "CREATE OR REPLACE VIEW \"CustomerVectorEmbed\"(\"customerid\", \"name\", \"name_vector\") AS SELECT \"customerid\", \"name\", \"name_vector\"\nFROM \"CustomerVectorEmbed_2\"",
      "fields" : [
        {
          "name" : "customerid",
          "type" : "BIGINT",
          "nullable" : false
        },
        {
          "name" : "name",
          "type" : "TEXT",
          "nullable" : false
        },
        {
          "name" : "name_vector",
          "type" : "VECTOR",
          "nullable" : true
        }
      ]
    }
  ]
}
>>>vertx-exec-functions.json
{
  "functions" : [
    {
      "functionId" : "SqrlExecFn0",
      "functionDescription" : "LEFT(`hash_columns`(`inputName`), 5)",
      "inputType" : {
        "children" : [
          {
            "children" : [ ],
            "defaultConversion" : "java.lang.String",
            "length" : 2147483647,
            "nullable" : false,
            "typeRoot" : "VARCHAR"
          }
        ],
        "defaultConversion" : "org.apache.flink.types.Row",
        "fieldCount" : 1,
        "fieldNames" : [
          "inputName"
        ],
        "fields" : [
          {
            "description" : null,
            "name" : "inputName",
            "type" : {
              "children" : [ ],
              "defaultConversion" : "java.lang.String",
              "length" : 2147483647,
              "nullable" : false,
              "typeRoot" : "VARCHAR"
            }
          }
        ],
        "nullable" : false,
        "typeRoot" : "ROW"
      },
      "outputType" : {
        "children" : [
          {
            "children" : [ ],
            "defaultConversion" : "java.lang.String",
            "length" : 32,
            "nullable" : true,
            "typeRoot" : "VARCHAR"
          }
        ],
        "defaultConversion" : "org.apache.flink.types.Row",
        "fieldCount" : 1,
        "fieldNames" : [
          "f0"
        ],
        "fields" : [
          {
            "description" : null,
            "name" : "f0",
            "type" : {
              "children" : [ ],
              "defaultConversion" : "java.lang.String",
              "length" : 32,
              "nullable" : true,
              "typeRoot" : "VARCHAR"
            }
          }
        ],
        "nullable" : false,
        "typeRoot" : "ROW"
      },
      "listOutput" : false,
      "code" : "\n      \n      // \n          \n      public class SqrlExecFn0$9\n          extends org.apache.flink.api.common.functions.RichFlatMapFunction {\n\n        private transient org.apache.flink.table.runtime.typeutils.StringDataSerializer typeSerializer$1;\n        private transient com.datasqrl.flinkrunner.stdlib.commons.hash_columns function_com$datasqrl$flinkrunner$stdlib$commons$hash_columns;\n        private transient org.apache.flink.table.data.conversion.StringStringConverter converter$3;\n        private transient org.apache.flink.table.data.conversion.StringStringConverter converter$5;\n        org.apache.flink.table.data.GenericRowData out = new org.apache.flink.table.data.GenericRowData(1);\n\n        public SqrlExecFn0$9(Object[] references) throws Exception {\n          typeSerializer$1 = (((org.apache.flink.table.runtime.typeutils.StringDataSerializer) references[0]));\n          function_com$datasqrl$flinkrunner$stdlib$commons$hash_columns = (((com.datasqrl.flinkrunner.stdlib.commons.hash_columns) references[1]));\n          converter$3 = (((org.apache.flink.table.data.conversion.StringStringConverter) references[2]));\n          converter$5 = (((org.apache.flink.table.data.conversion.StringStringConverter) references[3]));\n        }\n\n        \n\n        @Override\n        public void open(org.apache.flink.api.common.functions.OpenContext openContext) throws Exception {\n          \n          function_com$datasqrl$flinkrunner$stdlib$commons$hash_columns.open(new org.apache.flink.table.functions.FunctionContext(getRuntimeContext()));\n                 \n          \n          converter$3.open(getRuntimeContext().getUserCodeClassLoader());\n                     \n          \n          converter$5.open(getRuntimeContext().getUserCodeClassLoader());\n                     \n        }\n\n        @Override\n        public void flatMap(Object _in1, org.apache.flink.util.Collector c) throws Exception {\n          org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) _in1;\n          \n          org.apache.flink.table.data.binary.BinaryStringData field$0;\n          boolean isNull$0;\n          org.apache.flink.table.data.binary.BinaryStringData field$2;\n          java.lang.String externalResult$4;\n          org.apache.flink.table.data.binary.BinaryStringData result$6;\n          boolean isNull$6;\n          boolean isNull$7;\n          org.apache.flink.table.data.binary.BinaryStringData result$8;\n          \n          isNull$0 = in1.isNullAt(0);\n          field$0 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;\n          if (!isNull$0) {\n            field$0 = ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0));\n          }\n          field$2 = field$0;\n          if (!isNull$0) {\n            field$2 = (org.apache.flink.table.data.binary.BinaryStringData) (typeSerializer$1.copy(field$2));\n          }\n                  \n          \n          \n          \n          \n          \n          \n          \n          \n          \n          \n          \n          externalResult$4 = (java.lang.String) function_com$datasqrl$flinkrunner$stdlib$commons$hash_columns\n            .eval(isNull$0 ? null : ((java.lang.String) converter$3.toExternal((org.apache.flink.table.data.binary.BinaryStringData) field$2)));\n          \n          isNull$6 = externalResult$4 == null;\n          result$6 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;\n          if (!isNull$6) {\n            result$6 = (org.apache.flink.table.data.binary.BinaryStringData) converter$5.toInternalOrNull((java.lang.String) externalResult$4);\n          }\n          \n          \n          isNull$7 = isNull$6 || false;\n          result$8 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;\n          if (!isNull$7) {\n            \n          \n          result$8 = ((int) 5) <= 0 ? org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8 : org.apache.flink.table.data.binary.BinaryStringDataUtil.substringSQL(result$6, 1, ((int) 5));\n          \n            isNull$7 = (result$8 == null);\n          }\n          \n          if (isNull$7) {\n            out.setField(0, null);\n          } else {\n            out.setField(0, result$8);\n          }\n                    \n                  \n          c.collect(out);\n          \n          \n        }\n\n        @Override\n        public void close() throws Exception {\n          function_com$datasqrl$flinkrunner$stdlib$commons$hash_columns.close();\n        }\n\n        \n      }\n    "
    },
    {
      "functionId" : "SqrlExecFn1",
      "functionDescription" : "`default_catalog`.`default_database`.`vector_embed`(`name`, 'test-model')",
      "inputType" : {
        "children" : [
          {
            "children" : [ ],
            "defaultConversion" : "java.lang.String",
            "length" : 2147483647,
            "nullable" : false,
            "typeRoot" : "VARCHAR"
          }
        ],
        "defaultConversion" : "org.apache.flink.types.Row",
        "fieldCount" : 1,
        "fieldNames" : [
          "name"
        ],
        "fields" : [
          {
            "description" : null,
            "name" : "name",
            "type" : {
              "children" : [ ],
              "defaultConversion" : "java.lang.String",
              "length" : 2147483647,
              "nullable" : false,
              "typeRoot" : "VARCHAR"
            }
          }
        ],
        "nullable" : false,
        "typeRoot" : "ROW"
      },
      "outputType" : {
        "children" : [
          {
            "children" : [ ],
            "defaultConversion" : "com.datasqrl.flinkrunner.stdlib.vector.FlinkVectorType",
            "nullable" : true,
            "originatingClass" : "com.datasqrl.flinkrunner.stdlib.vector.FlinkVectorType",
            "serializerString" : "AEhjb20uZGF0YXNxcmwuZmxpbmtydW5uZXIuc3RkbGliLnZlY3Rvci5GbGlua1ZlY3RvclR5cGVTZXJpYWxpemVyU25hcHNob3QAAAAD",
            "typeRoot" : "RAW",
            "typeSerializer" : {
              "immutableType" : true,
              "length" : -1
            }
          }
        ],
        "defaultConversion" : "org.apache.flink.types.Row",
        "fieldCount" : 1,
        "fieldNames" : [
          "f0"
        ],
        "fields" : [
          {
            "description" : null,
            "name" : "f0",
            "type" : {
              "children" : [ ],
              "defaultConversion" : "com.datasqrl.flinkrunner.stdlib.vector.FlinkVectorType",
              "nullable" : true,
              "originatingClass" : "com.datasqrl.flinkrunner.stdlib.vector.FlinkVectorType",
              "serializerString" : "AEhjb20uZGF0YXNxcmwuZmxpbmtydW5uZXIuc3RkbGliLnZlY3Rvci5GbGlua1ZlY3RvclR5cGVTZXJpYWxpemVyU25hcHNob3QAAAAD",
              "typeRoot" : "RAW",
              "typeSerializer" : {
                "immutableType" : true,
                "length" : -1
              }
            }
          }
        ],
        "nullable" : false,
        "typeRoot" : "ROW"
      },
      "listOutput" : false,
      "code" : "\n      \n      // \n          \n      public class SqrlExecFn1$8\n          extends org.apache.flink.api.common.functions.RichFlatMapFunction {\n\n        private transient org.apache.flink.table.runtime.typeutils.StringDataSerializer typeSerializer$1;\n        \n        private final org.apache.flink.table.data.binary.BinaryStringData str$3 = org.apache.flink.table.data.binary.BinaryStringData.fromString(\"test-model\");\n                   \n        private transient com.datasqrl.flinkrunner.stdlib.openai.vector_embed function_com$datasqrl$flinkrunner$stdlib$openai$vector_embed;\n        private transient org.apache.flink.table.data.conversion.StringStringConverter converter$4;\n        private transient org.apache.flink.table.data.conversion.RawObjectConverter converter$6;\n        org.apache.flink.table.data.GenericRowData out = new org.apache.flink.table.data.GenericRowData(1);\n\n        public SqrlExecFn1$8(Object[] references) throws Exception {\n          typeSerializer$1 = (((org.apache.flink.table.runtime.typeutils.StringDataSerializer) references[0]));\n          function_com$datasqrl$flinkrunner$stdlib$openai$vector_embed = (((com.datasqrl.flinkrunner.stdlib.openai.vector_embed) references[1]));\n          converter$4 = (((org.apache.flink.table.data.conversion.StringStringConverter) references[2]));\n          converter$6 = (((org.apache.flink.table.data.conversion.RawObjectConverter) references[3]));\n        }\n\n        \n\n        @Override\n        public void open(org.apache.flink.api.common.functions.OpenContext openContext) throws Exception {\n          \n          function_com$datasqrl$flinkrunner$stdlib$openai$vector_embed.open(new org.apache.flink.table.functions.FunctionContext(getRuntimeContext()));\n                 \n          \n          converter$4.open(getRuntimeContext().getUserCodeClassLoader());\n                     \n          \n          converter$6.open(getRuntimeContext().getUserCodeClassLoader());\n                     \n        }\n\n        @Override\n        public void flatMap(Object _in1, org.apache.flink.util.Collector c) throws Exception {\n          org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) _in1;\n          \n          org.apache.flink.table.data.binary.BinaryStringData field$0;\n          boolean isNull$0;\n          org.apache.flink.table.data.binary.BinaryStringData field$2;\n          com.datasqrl.flinkrunner.stdlib.vector.FlinkVectorType externalResult$5;\n          org.apache.flink.table.data.binary.BinaryRawValueData result$7;\n          boolean isNull$7;\n          \n          isNull$0 = in1.isNullAt(0);\n          field$0 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;\n          if (!isNull$0) {\n            field$0 = ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0));\n          }\n          field$2 = field$0;\n          if (!isNull$0) {\n            field$2 = (org.apache.flink.table.data.binary.BinaryStringData) (typeSerializer$1.copy(field$2));\n          }\n                  \n          \n          \n          \n          \n          \n          \n          \n          \n          \n          \n          \n          externalResult$5 = (com.datasqrl.flinkrunner.stdlib.vector.FlinkVectorType) function_com$datasqrl$flinkrunner$stdlib$openai$vector_embed\n            .eval(isNull$0 ? null : ((java.lang.String) converter$4.toExternal((org.apache.flink.table.data.binary.BinaryStringData) field$2)), false ? null : ((java.lang.String) converter$4.toExternal((org.apache.flink.table.data.binary.BinaryStringData) ((org.apache.flink.table.data.binary.BinaryStringData) str$3))));\n          \n          isNull$7 = externalResult$5 == null;\n          result$7 = null;\n          if (!isNull$7) {\n            result$7 = (org.apache.flink.table.data.binary.BinaryRawValueData) converter$6.toInternalOrNull((com.datasqrl.flinkrunner.stdlib.vector.FlinkVectorType) externalResult$5);\n          }\n          \n          if (isNull$7) {\n            out.setField(0, null);\n          } else {\n            out.setField(0, result$7);\n          }\n                    \n                  \n          c.collect(out);\n          \n          \n        }\n\n        @Override\n        public void close() throws Exception {\n          function_com$datasqrl$flinkrunner$stdlib$openai$vector_embed.close();\n        }\n\n        \n      }\n    "
    }
  ]
}
>>>vertx.json
{
  "models" : {
    "v1" : {
      "queries" : [
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "Customer",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM \"Customer_1\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        },
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "CustomerVectorEmbed",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT \"customerid\", \"name\", \"name_vector\"\nFROM \"CustomerVectorEmbed_2\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        },
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "CustomersByName",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              },
              {
                "type" : "variable",
                "path" : "inputName"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM \"Customer_1\"\nWHERE \"name\" LIKE $2",
              "parameters" : [
                {
                  "type" : "arg",
                  "path" : "inputName",
                  "sqlType" : "VARCHAR"
                },
                {
                  "type" : "computed",
                  "functionId" : "SqrlExecFn0"
                }
              ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        },
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "CustomersByVector",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "name"
              },
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT \"customerid\", \"name\", \"name_vector\", 1 - (($2 ::vector) <=> \"name_vector\")\nFROM \"CustomerVectorEmbed_2\"\nORDER BY 1 - (($2 ::vector) <=> \"name_vector\") DESC NULLS LAST",
              "parameters" : [
                {
                  "type" : "arg",
                  "path" : "name",
                  "sqlType" : "VARCHAR"
                },
                {
                  "type" : "computed",
                  "functionId" : "SqrlExecFn1"
                }
              ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        }
      ],
      "mutations" : [ ],
      "subscriptions" : [ ],
      "operations" : [
        {
          "function" : {
            "name" : "GetCustomer",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query Customer($limit: Int = 10, $offset: Int = 0) {\nCustomer(limit: $limit, offset: $offset) {\ncustomerid\nemail\nname\nlastUpdated\ntimestamp\n}\n\n}",
            "queryName" : "Customer",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/Customer{?offset,limit}"
        },
        {
          "function" : {
            "name" : "GetCustomerVectorEmbed",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query CustomerVectorEmbed($limit: Int = 10, $offset: Int = 0) {\nCustomerVectorEmbed(limit: $limit, offset: $offset) {\ncustomerid\nname\n}\n\n}",
            "queryName" : "CustomerVectorEmbed",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/CustomerVectorEmbed{?offset,limit}"
        },
        {
          "function" : {
            "name" : "GetCustomersByName",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                },
                "inputName" : {
                  "type" : "string"
                }
              },
              "required" : [
                "inputName"
              ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query CustomersByName($inputName: String!, $limit: Int = 10, $offset: Int = 0) {\nCustomersByName(inputName: $inputName, limit: $limit, offset: $offset) {\ncustomerid\nemail\nname\nlastUpdated\ntimestamp\n}\n\n}",
            "queryName" : "CustomersByName",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/CustomersByName{?offset,limit,inputName}"
        },
        {
          "function" : {
            "name" : "GetCustomersByVector",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "name" : {
                  "type" : "string"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [
                "name"
              ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query CustomersByVector($name: String!, $limit: Int = 10, $offset: Int = 0) {\nCustomersByVector(name: $name, limit: $limit, offset: $offset) {\ncustomerid\nname\n}\n\n}",
            "queryName" : "CustomersByVector",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/CustomersByVector{?offset,name,limit}"
        }
      ],
      "schema" : {
        "type" : "string",
        "schema" : "type Customer {\n  customerid: Long!\n  email: String!\n  name: String!\n  lastUpdated: Long!\n  timestamp: DateTime!\n}\n\ntype CustomerVectorEmbed {\n  customerid: Long!\n  name: String!\n}\n\ntype CustomersByVector {\n  customerid: Long!\n  name: String!\n}\n\n\"An RFC-3339 compliant Full Date Scalar\"\nscalar Date\n\n\"A DateTime scalar that handles both full RFC3339 and shorter timestamp formats\"\nscalar DateTime\n\n\"A JSON scalar\"\nscalar JSON\n\n\"24-hour clock time value string in the format `hh:mm:ss` or `hh:mm:ss.sss`.\"\nscalar LocalTime\n\n\"A 64-bit signed integer\"\nscalar Long\n\ntype Query {\n  Customer(limit: Int = 10, offset: Int = 0): [Customer!]\n  CustomerVectorEmbed(limit: Int = 10, offset: Int = 0): [CustomerVectorEmbed!]\n  CustomersByName(inputName: String!, limit: Int = 10, offset: Int = 0): [Customer!]\n  CustomersByVector(name: String!, limit: Int = 10, offset: Int = 0): [CustomersByVector!]\n}\n\nenum _McpMethodType {\n  NONE\n  TOOL\n  RESOURCE\n}\n\nenum _RestMethodType {\n  NONE\n  GET\n  POST\n}\n\ndirective @api(mcp: _McpMethodType, rest: _RestMethodType, uri: String) on QUERY | MUTATION | FIELD_DEFINITION\n"
      }
    }
  }
}
