>>>pipeline_explain.txt
=== CombinedStream
ID:          default_catalog.default_database.CombinedStream
Type:        stream
Stage:       flink
Primary key: -
Timestamp:   -
Row count:   ~2e8
---
Schema:
 - id_a: VARCHAR(2147483647) CHARACTER SET "UTF-16LE"
 - val: VARCHAR(2147483647) CHARACTER SET "UTF-16LE"
Inputs:
 - default_catalog.default_database.source_a
 - default_catalog.default_database.source_b
Plan:
LogicalUnion(all=[true])
  LogicalProject(id_a=[$0], val=[$1])
    LogicalTableScan(table=[[default_catalog, default_database, source_a]])
  LogicalProject(val=[$1], id_b=[$0])
    LogicalTableScan(table=[[default_catalog, default_database, source_b]])
SQL:
CREATE VIEW `CombinedStream` AS  SELECT id_a, val FROM source_a UNION ALL SELECT val, id_b FROM source_b;

=== source_a
ID:          default_catalog.default_database.source_a
Type:        stream
Stage:       flink
Primary key: id_a
Timestamp:   -
Row count:   ~1e8
---
Schema:
 - id_a: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - val: VARCHAR(2147483647) CHARACTER SET "UTF-16LE"
Inputs:
 - default_catalog.default_database.source_a__base
Annotations:
 - stream-root: source_a
Plan:
LogicalTableScan(table=[[default_catalog, default_database, source_a]])
SQL:
CREATE TABLE `source_a` (
  `id_a` STRING NOT NULL,
  `val` STRING,
  PRIMARY KEY (`id_a`) NOT ENFORCED
)
WITH (
  'connector' = 'datagen'
)
=== source_b
ID:          default_catalog.default_database.source_b
Type:        stream
Stage:       flink
Primary key: id_b
Timestamp:   -
Row count:   ~1e8
---
Schema:
 - id_b: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - val: VARCHAR(2147483647) CHARACTER SET "UTF-16LE"
Inputs:
 - default_catalog.default_database.source_b__base
Annotations:
 - stream-root: source_b
Plan:
LogicalTableScan(table=[[default_catalog, default_database, source_b]])
SQL:
CREATE TABLE `source_b` (
  `id_b` STRING NOT NULL,
  `val` STRING,
  PRIMARY KEY (`id_b`) NOT ENFORCED
)
WITH (
  'connector' = 'datagen'
)
>>>flink-sql-no-functions.sql
CREATE TABLE `source_a` (
  `id_a` STRING,
  `val` STRING,
  PRIMARY KEY (`id_a`) NOT ENFORCED
)
WITH (
  'connector' = 'datagen'
);
CREATE TABLE `source_b` (
  `id_b` STRING,
  `val` STRING,
  PRIMARY KEY (`id_b`) NOT ENFORCED
)
WITH (
  'connector' = 'datagen'
);
CREATE VIEW `CombinedStream`
AS
SELECT `id_a`, `val`
FROM `source_a`
UNION ALL
SELECT `val`, `id_b`
FROM `source_b`;
CREATE TABLE `CombinedStream_1` (
  `id_a` VARCHAR(2147483647) CHARACTER SET `UTF-16LE`,
  `val` VARCHAR(2147483647) CHARACTER SET `UTF-16LE`,
  `__pk_hash` CHAR(32) CHARACTER SET `UTF-16LE`,
  PRIMARY KEY (`__pk_hash`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'sink.on-conflict.action' = 'IGNORE',
  'table-name' = 'CombinedStream_1',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
CREATE TABLE `source_a_2` (
  `id_a` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `val` VARCHAR(2147483647) CHARACTER SET `UTF-16LE`,
  PRIMARY KEY (`id_a`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'sink.on-conflict.action' = 'IGNORE',
  'table-name' = 'source_a_2',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
CREATE TABLE `source_b_3` (
  `id_b` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `val` VARCHAR(2147483647) CHARACTER SET `UTF-16LE`,
  PRIMARY KEY (`id_b`) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc-sqrl',
  'driver' = 'org.postgresql.Driver',
  'password' = '${POSTGRES_PASSWORD}',
  'sink.on-conflict.action' = 'IGNORE',
  'table-name' = 'source_b_3',
  'url' = 'jdbc:postgresql://${POSTGRES_AUTHORITY}',
  'username' = '${POSTGRES_USERNAME}'
);
EXECUTE STATEMENT SET BEGIN
INSERT INTO `default_catalog`.`default_database`.`CombinedStream_1`
SELECT `id_a`, `val`, `hash_columns`(`id_a`, `val`) AS `__pk_hash`
 FROM `default_catalog`.`default_database`.`CombinedStream`
;
INSERT INTO `default_catalog`.`default_database`.`source_a_2`
 SELECT *
  FROM `default_catalog`.`default_database`.`source_a`
 ;
 INSERT INTO `default_catalog`.`default_database`.`source_b_3`
  SELECT *
   FROM `default_catalog`.`default_database`.`source_b`
  ;
  END
>>>kafka.json
{
  "topics" : [ ],
  "testRunnerTopics" : [ ]
}
>>>postgres.json
{
  "statements" : [
    {
      "name" : "CombinedStream_1",
      "type" : "TABLE",
      "sql" : "CREATE TABLE IF NOT EXISTS \"CombinedStream_1\" (\"id_a\" TEXT, \"val\" TEXT, \"__pk_hash\" TEXT, PRIMARY KEY (\"__pk_hash\"))",
      "fields" : [
        {
          "name" : "id_a",
          "type" : "TEXT",
          "nullable" : true
        },
        {
          "name" : "val",
          "type" : "TEXT",
          "nullable" : true
        },
        {
          "name" : "__pk_hash",
          "type" : "TEXT",
          "nullable" : true
        }
      ],
      "primaryKey" : [
        "__pk_hash"
      ],
      "partitionKey" : [ ],
      "partitionType" : "NONE",
      "numPartitions" : 0,
      "ttl" : 0.0
    },
    {
      "name" : "source_a_2",
      "type" : "TABLE",
      "sql" : "CREATE TABLE IF NOT EXISTS \"source_a_2\" (\"id_a\" TEXT NOT NULL, \"val\" TEXT, PRIMARY KEY (\"id_a\"))",
      "fields" : [
        {
          "name" : "id_a",
          "type" : "TEXT",
          "nullable" : false
        },
        {
          "name" : "val",
          "type" : "TEXT",
          "nullable" : true
        }
      ],
      "primaryKey" : [
        "id_a"
      ],
      "partitionKey" : [ ],
      "partitionType" : "NONE",
      "numPartitions" : 0,
      "ttl" : 0.0
    },
    {
      "name" : "source_b_3",
      "type" : "TABLE",
      "sql" : "CREATE TABLE IF NOT EXISTS \"source_b_3\" (\"id_b\" TEXT NOT NULL, \"val\" TEXT, PRIMARY KEY (\"id_b\"))",
      "fields" : [
        {
          "name" : "id_b",
          "type" : "TEXT",
          "nullable" : false
        },
        {
          "name" : "val",
          "type" : "TEXT",
          "nullable" : true
        }
      ],
      "primaryKey" : [
        "id_b"
      ],
      "partitionKey" : [ ],
      "partitionType" : "NONE",
      "numPartitions" : 0,
      "ttl" : 0.0
    },
    {
      "name" : "CombinedStream",
      "type" : "VIEW",
      "sql" : "CREATE OR REPLACE VIEW \"CombinedStream\"(\"id_a\", \"val\") AS SELECT \"id_a\", \"val\"\nFROM \"CombinedStream_1\"",
      "fields" : [
        {
          "name" : "id_a",
          "type" : "TEXT",
          "nullable" : true
        },
        {
          "name" : "val",
          "type" : "TEXT",
          "nullable" : true
        }
      ]
    },
    {
      "name" : "source_a",
      "type" : "VIEW",
      "sql" : "CREATE OR REPLACE VIEW \"source_a\"(\"id_a\", \"val\") AS SELECT *\nFROM \"source_a_2\"",
      "fields" : [
        {
          "name" : "id_a",
          "type" : "TEXT",
          "nullable" : false
        },
        {
          "name" : "val",
          "type" : "TEXT",
          "nullable" : true
        }
      ]
    },
    {
      "name" : "source_b",
      "type" : "VIEW",
      "sql" : "CREATE OR REPLACE VIEW \"source_b\"(\"id_b\", \"val\") AS SELECT *\nFROM \"source_b_3\"",
      "fields" : [
        {
          "name" : "id_b",
          "type" : "TEXT",
          "nullable" : false
        },
        {
          "name" : "val",
          "type" : "TEXT",
          "nullable" : true
        }
      ]
    }
  ]
}
>>>vertx.json
{
  "models" : {
    "v1" : {
      "queries" : [
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "CombinedStream",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT \"id_a\", \"val\"\nFROM \"CombinedStream_1\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        },
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "source_a",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM \"source_a_2\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        },
        {
          "type" : "args",
          "parentType" : "Query",
          "fieldName" : "source_b",
          "exec" : {
            "arguments" : [
              {
                "type" : "variable",
                "path" : "limit"
              },
              {
                "type" : "variable",
                "path" : "offset"
              }
            ],
            "query" : {
              "type" : "SqlQuery",
              "sql" : "SELECT *\nFROM \"source_b_3\"",
              "parameters" : [ ],
              "pagination" : "LIMIT_AND_OFFSET",
              "cacheDurationMs" : 0,
              "database" : "POSTGRES"
            }
          }
        }
      ],
      "mutations" : [ ],
      "subscriptions" : [ ],
      "operations" : [
        {
          "function" : {
            "name" : "GetCombinedStream",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query CombinedStream($limit: Int = 10, $offset: Int = 0) {\nCombinedStream(limit: $limit, offset: $offset) {\nid_a\nval\n}\n\n}",
            "queryName" : "CombinedStream",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/CombinedStream{?offset,limit}"
        },
        {
          "function" : {
            "name" : "Getsource_a",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query source_a($limit: Int = 10, $offset: Int = 0) {\nsource_a(limit: $limit, offset: $offset) {\nid_a\nval\n}\n\n}",
            "queryName" : "source_a",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/source_a{?offset,limit}"
        },
        {
          "function" : {
            "name" : "Getsource_b",
            "parameters" : {
              "type" : "object",
              "properties" : {
                "offset" : {
                  "type" : "integer"
                },
                "limit" : {
                  "type" : "integer"
                }
              },
              "required" : [ ]
            }
          },
          "format" : "JSON",
          "apiQuery" : {
            "query" : "query source_b($limit: Int = 10, $offset: Int = 0) {\nsource_b(limit: $limit, offset: $offset) {\nid_b\nval\n}\n\n}",
            "queryName" : "source_b",
            "operationType" : "QUERY"
          },
          "mcpMethod" : "TOOL",
          "restMethod" : "GET",
          "uriTemplate" : "queries/source_b{?offset,limit}"
        }
      ],
      "schema" : {
        "type" : "string",
        "schema" : "type CombinedStream {\n  id_a: String\n  val: String\n}\n\n\"An RFC-3339 compliant Full Date Scalar\"\nscalar Date\n\n\"A DateTime scalar that handles both full RFC3339 and shorter timestamp formats\"\nscalar DateTime\n\n\"A JSON scalar\"\nscalar JSON\n\n\"24-hour clock time value string in the format `hh:mm:ss` or `hh:mm:ss.sss`.\"\nscalar LocalTime\n\n\"A 64-bit signed integer\"\nscalar Long\n\ntype Query {\n  CombinedStream(limit: Int = 10, offset: Int = 0): [CombinedStream!]\n  source_a(limit: Int = 10, offset: Int = 0): [source_a!]\n  source_b(limit: Int = 10, offset: Int = 0): [source_b!]\n}\n\nenum _McpMethodType {\n  NONE\n  TOOL\n  RESOURCE\n}\n\nenum _RestMethodType {\n  NONE\n  GET\n  POST\n}\n\ntype source_a {\n  id_a: String!\n  val: String\n}\n\ntype source_b {\n  id_b: String!\n  val: String\n}\n\ndirective @api(mcp: _McpMethodType, rest: _RestMethodType, uri: String) on QUERY | MUTATION | FIELD_DEFINITION\n"
      }
    }
  }
}
