>>>pipeline_explain.txt
=== ApplicationStatus
ID:          default_catalog.default_database.ApplicationStatus
Type:        stream
Stage:       flink
Primary key: -
Timestamp:   event_time
Row count:   ~8e7
---
Schema:
 - status: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - message: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - event_time: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
 - id: BIGINT NOT NULL
 - customer_id: BIGINT NOT NULL
 - loan_type_id: BIGINT NOT NULL
 - amount: DOUBLE NOT NULL
 - duration: BIGINT NOT NULL
 - max_amount: DOUBLE NOT NULL
 - min_amount: DOUBLE NOT NULL
Inputs:
 - default_catalog.default_database._Applications
 - default_catalog.default_database._LoanTypes
 - default_catalog.sources.ApplicationUpdates

=== _Applications
ID:          default_catalog.default_database._Applications
Type:        state
Stage:       flink
Primary key: id
Timestamp:   updated_at
Row count:   ~2e7
---
Schema:
 - id: BIGINT NOT NULL
 - customer_id: BIGINT NOT NULL
 - loan_type_id: BIGINT NOT NULL
 - amount: DOUBLE NOT NULL
 - duration: BIGINT NOT NULL
 - application_date: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL
 - updated_at: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
Inputs:
 - default_catalog.sources.ApplicationsStream
Annotations:
 - mostRecentDistinct: true
 - stream-root: ApplicationsStream

=== _FakeStreamJoin
ID:          default_catalog.default_database._FakeStreamJoin
Type:        stream
Stage:       flink
Primary key: -
Timestamp:   event_time
Row count:   ~1e8
---
Schema:
 - id: BIGINT NOT NULL
 - status: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - message: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - event_time: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
Inputs:
 - default_catalog.sources.ApplicationUpdates
 - default_catalog.sources.ApplicationsStream

=== _LoanTypes
ID:          default_catalog.default_database._LoanTypes
Type:        state
Stage:       flink
Primary key: id
Timestamp:   updated_at
Row count:   ~2e7
---
Schema:
 - id: BIGINT NOT NULL
 - name: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - description: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - interest_rate: DOUBLE NOT NULL
 - max_amount: DOUBLE NOT NULL
 - min_amount: DOUBLE NOT NULL
 - max_duration: BIGINT NOT NULL
 - min_duration: BIGINT NOT NULL
 - updated_at: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
Inputs:
 - default_catalog.sources.LoanTypesStream
Annotations:
 - mostRecentDistinct: true
 - stream-root: LoanTypesStream

=== ApplicationUpdates
ID:          default_catalog.sources.ApplicationUpdates
Type:        stream
Stage:       flink
Primary key: -
Timestamp:   event_time
Row count:   ~1e8
---
Schema:
 - loan_application_id: BIGINT NOT NULL
 - status: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - message: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - event_time: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
Inputs:
 - default_catalog.sources.ApplicationUpdates__base

=== ApplicationsStream
ID:          default_catalog.sources.ApplicationsStream
Type:        stream
Stage:       flink
Primary key: id, updated_at
Timestamp:   updated_at
Row count:   ~1e8
---
Schema:
 - id: BIGINT NOT NULL
 - customer_id: BIGINT NOT NULL
 - loan_type_id: BIGINT NOT NULL
 - amount: DOUBLE NOT NULL
 - duration: BIGINT NOT NULL
 - application_date: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL
 - updated_at: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
Inputs:
 - default_catalog.sources.ApplicationsStream__base
Annotations:
 - stream-root: ApplicationsStream

=== LoanTypesStream
ID:          default_catalog.sources.LoanTypesStream
Type:        stream
Stage:       flink
Primary key: id, updated_at
Timestamp:   updated_at
Row count:   ~1e8
---
Schema:
 - id: BIGINT NOT NULL
 - name: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - description: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - interest_rate: DOUBLE NOT NULL
 - max_amount: DOUBLE NOT NULL
 - min_amount: DOUBLE NOT NULL
 - max_duration: BIGINT NOT NULL
 - min_duration: BIGINT NOT NULL
 - updated_at: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
Inputs:
 - default_catalog.sources.LoanTypesStream__base
Annotations:
 - stream-root: LoanTypesStream

=== StreamJoin
ID:          logger.StreamJoin
Type:        export
Stage:       flink
---
Inputs:
 - default_catalog.default_database._FakeStreamJoin

>>>flink-sql-no-functions.sql
CREATE DATABASE IF NOT EXISTS `sources`;
USE `sources`;
CREATE TEMPORARY TABLE `CustomersStream__schema` (
  `id` BIGINT NOT NULL,
  `first_name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `last_name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `email` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `phone` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `address` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `date_of_birth` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `updated_at` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL
)
WITH (
  'connector' = 'datagen'
);
CREATE TABLE `CustomersStream` (
  PRIMARY KEY (`id`, `updated_at`) NOT ENFORCED,
  WATERMARK FOR `updated_at` AS `updated_at` - INTERVAL '0.001' SECOND
)
WITH (
  'format' = 'flexible-json',
  'path' = '${DATA_PATH}/customers.jsonl',
  'source.monitor-interval' = '10 sec',
  'connector' = 'filesystem'
)
LIKE `CustomersStream__schema`;
CREATE TEMPORARY TABLE `ApplicationUpdates__schema` (
  `loan_application_id` BIGINT NOT NULL,
  `status` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `message` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `event_time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL
)
WITH (
  'connector' = 'filesystem',
  'format' = 'flexible-json',
  'path' = '${DATA_PATH}/application_updates.jsonl'
);
CREATE TABLE `ApplicationUpdates` (
  WATERMARK FOR `event_time` AS `event_time` - INTERVAL '0.001' SECOND
)
LIKE `ApplicationUpdates__schema`;
CREATE TEMPORARY TABLE `ApplicationsStream__schema` (
  `id` BIGINT NOT NULL,
  `customer_id` BIGINT NOT NULL,
  `loan_type_id` BIGINT NOT NULL,
  `amount` DOUBLE NOT NULL,
  `duration` BIGINT NOT NULL,
  `application_date` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  `updated_at` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL
)
WITH (
  'connector' = 'datagen'
);
CREATE TABLE `ApplicationsStream` (
  PRIMARY KEY (`id`, `updated_at`) NOT ENFORCED,
  WATERMARK FOR `updated_at` AS `updated_at` - INTERVAL '0.001' SECOND
)
WITH (
  'format' = 'flexible-json',
  'path' = '${DATA_PATH}/applications.jsonl',
  'source.monitor-interval' = '10 sec',
  'connector' = 'filesystem'
)
LIKE `ApplicationsStream__schema`;
CREATE TEMPORARY TABLE `LoanTypesStream__schema` (
  `id` BIGINT NOT NULL,
  `name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `description` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `interest_rate` DOUBLE NOT NULL,
  `max_amount` DOUBLE NOT NULL,
  `min_amount` DOUBLE NOT NULL,
  `max_duration` BIGINT NOT NULL,
  `min_duration` BIGINT NOT NULL,
  `updated_at` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL
)
WITH (
  'connector' = 'datagen'
);
CREATE TABLE `LoanTypesStream` (
  PRIMARY KEY (`id`, `updated_at`) NOT ENFORCED,
  WATERMARK FOR `updated_at` AS `updated_at` - INTERVAL '0.001' SECOND
)
WITH (
  'format' = 'flexible-json',
  'path' = '${DATA_PATH}/loan_types.jsonl',
  'source.monitor-interval' = '10 sec',
  'connector' = 'filesystem'
)
LIKE `LoanTypesStream__schema`;
USE CATALOG `default_catalog`;
CREATE DATABASE IF NOT EXISTS `default_database`;
USE `default_database`;
CREATE VIEW `_Applications`
AS
SELECT `id`, `customer_id`, `loan_type_id`, `amount`, `duration`, `application_date`, `updated_at`
FROM (SELECT `id`, `customer_id`, `loan_type_id`, `amount`, `duration`, `application_date`, `updated_at`, ROW_NUMBER() OVER (PARTITION BY `id` ORDER BY `updated_at` DESC NULLS LAST) AS `__sqrlinternal_rownum`
  FROM `default_catalog`.`sources`.`ApplicationsStream`) AS `t`
WHERE `__sqrlinternal_rownum` = 1;
CREATE VIEW `_LoanTypes`
AS
SELECT `id`, `name`, `description`, `interest_rate`, `max_amount`, `min_amount`, `max_duration`, `min_duration`, `updated_at`
FROM (SELECT `id`, `name`, `description`, `interest_rate`, `max_amount`, `min_amount`, `max_duration`, `min_duration`, `updated_at`, ROW_NUMBER() OVER (PARTITION BY `id` ORDER BY `updated_at` DESC NULLS LAST) AS `__sqrlinternal_rownum`
  FROM `default_catalog`.`sources`.`LoanTypesStream`) AS `t`
WHERE `__sqrlinternal_rownum` = 1;
CREATE VIEW `ApplicationStatus`
AS
SELECT `u`.`status`, `u`.`message`, `u`.`event_time`, `a`.`id`, `a`.`customer_id`, `a`.`loan_type_id`, `a`.`amount`, `a`.`duration`, `t`.`max_amount`, `t`.`min_amount`
FROM `sources`.`ApplicationUpdates` AS `u`
 INNER JOIN `_Applications` FOR SYSTEM_TIME AS OF `u`.`event_time` AS `a` ON `a`.`id` = `u`.`loan_application_id`
 INNER JOIN `_LoanTypes` FOR SYSTEM_TIME AS OF `u`.`event_time` AS `t` ON `t`.`id` = `a`.`loan_type_id`;
CREATE VIEW `_FakeStreamJoin`
AS
SELECT `a`.`id`, `u`.`status`, `u`.`message`, `u`.`event_time`
FROM `sources`.`ApplicationUpdates` AS `u`
 INNER JOIN `sources`.`ApplicationsStream` AS `a` ON `a`.`loan_type_id` = `u`.`loan_application_id`;
CREATE TABLE `ApplicationStatus_1` (
  `status` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `message` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `event_time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  `id` BIGINT NOT NULL,
  `customer_id` BIGINT NOT NULL,
  `loan_type_id` BIGINT NOT NULL,
  `amount` DOUBLE NOT NULL,
  `duration` BIGINT NOT NULL,
  `max_amount` DOUBLE NOT NULL,
  `min_amount` DOUBLE NOT NULL
)
WITH (
  'connector' = 'kafka',
  'format' = 'flexible-json',
  'properties.bootstrap.servers' = '${KAFKA_BOOTSTRAP_SERVERS}',
  'properties.compression.type' = 'zstd',
  'properties.group.id' = '${KAFKA_GROUP_ID}',
  'topic' = 'ApplicationStatus'
);
CREATE TABLE `StreamJoin_2` (
  `id` BIGINT NOT NULL,
  `status` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `message` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `event_time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL
)
WITH (
  'connector' = 'print',
  'print-identifier' = 'StreamJoin'
);
EXECUTE STATEMENT SET BEGIN
INSERT INTO `default_catalog`.`default_database`.`ApplicationStatus_1`
SELECT *
 FROM `default_catalog`.`default_database`.`ApplicationStatus`
;
INSERT INTO `default_catalog`.`default_database`.`StreamJoin_2`
 SELECT *
  FROM `default_catalog`.`default_database`.`_FakeStreamJoin`
 ;
 END
>>>kafka.json
{
  "topics" : [
    {
      "topicName" : "ApplicationStatus",
      "tableName" : "ApplicationStatus_1",
      "format" : "flexible-json",
      "numPartitions" : 1,
      "replicationFactor" : 3,
      "type" : "SUBSCRIPTION",
      "messageKeys" : [ ],
      "messageSchema" : "",
      "config" : { }
    }
  ],
  "testRunnerTopics" : [
    {
      "topicName" : "input_topic1",
      "tableName" : "input_topic1",
      "numPartitions" : 1,
      "replicationFactor" : 1,
      "type" : "SUBSCRIPTION",
      "messageKeys" : [ ],
      "messageSchema" : "",
      "config" : { }
    },
    {
      "topicName" : "input_topic2",
      "tableName" : "input_topic2",
      "numPartitions" : 1,
      "replicationFactor" : 1,
      "type" : "SUBSCRIPTION",
      "messageKeys" : [ ],
      "messageSchema" : "",
      "config" : { }
    }
  ]
}
