>>>pipeline_explain.txt
=== ApplicationInfo
ID:          default_catalog.default_database.ApplicationInfo
Type:        state
Stage:       iceberg
Primary key: id, loan_type_id
Timestamp:   updated_at
Row count:   ~1e8
---
Schema:
 - id: BIGINT NOT NULL
 - customer_id: BIGINT NOT NULL
 - loan_type_id: BIGINT NOT NULL
 - amount: DOUBLE NOT NULL
 - duration: BIGINT NOT NULL
 - application_date: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL
 - updated_at: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
 - id1: BIGINT NOT NULL
 - id2: BIGINT
Inputs:
 - default_catalog.default_database._Applications
 - default_catalog.default_database._LoanTypes

=== ApplicationStatus
ID:          default_catalog.default_database.ApplicationStatus
Type:        stream
Stage:       flink
Primary key: customer_id
Timestamp:   event_time
Row count:   ~8e7
---
Schema:
 - status: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - message: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - event_time: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
 - id: BIGINT NOT NULL
 - customer_id: BIGINT NOT NULL
 - loan_type_id: BIGINT NOT NULL
 - amount: DOUBLE NOT NULL
 - duration: BIGINT NOT NULL
 - max_amount: DOUBLE NOT NULL
 - min_amount: DOUBLE NOT NULL
Inputs:
 - default_catalog.default_database._Applications
 - default_catalog.default_database._LoanTypes
 - default_catalog.sources.ApplicationUpdates

=== _Applications
ID:          default_catalog.default_database._Applications
Type:        state
Stage:       flink
Primary key: id, loan_type_id
Timestamp:   updated_at
Row count:   ~2e7
---
Schema:
 - id: BIGINT NOT NULL
 - customer_id: BIGINT NOT NULL
 - loan_type_id: BIGINT NOT NULL
 - amount: DOUBLE NOT NULL
 - duration: BIGINT NOT NULL
 - application_date: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL
 - updated_at: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
Inputs:
 - default_catalog.sources.ApplicationsStream
Annotations:
 - mostRecentDistinct: true
 - stream-root: ApplicationsStream

=== _LoanTypes
ID:          default_catalog.default_database._LoanTypes
Type:        state
Stage:       flink
Primary key: id
Timestamp:   updated_at
Row count:   ~2e7
---
Schema:
 - id: BIGINT NOT NULL
 - name: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - description: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - interest_rate: DOUBLE NOT NULL
 - max_amount: DOUBLE NOT NULL
 - min_amount: DOUBLE NOT NULL
 - max_duration: BIGINT NOT NULL
 - min_duration: BIGINT NOT NULL
 - updated_at: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
Inputs:
 - default_catalog.sources.LoanTypesStream
Annotations:
 - mostRecentDistinct: true
 - stream-root: LoanTypesStream

=== ApplicationUpdates
ID:          default_catalog.sources.ApplicationUpdates
Type:        stream
Stage:       flink
Primary key: -
Timestamp:   event_time
Row count:   ~1e8
---
Schema:
 - loan_application_id: BIGINT NOT NULL
 - status: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - message: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - event_time: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
Inputs:
 - default_catalog.sources.ApplicationUpdates__base

=== ApplicationsStream
ID:          default_catalog.sources.ApplicationsStream
Type:        stream
Stage:       flink
Primary key: id, updated_at
Timestamp:   updated_at
Row count:   ~1e8
---
Schema:
 - id: BIGINT NOT NULL
 - customer_id: BIGINT NOT NULL
 - loan_type_id: BIGINT NOT NULL
 - amount: DOUBLE NOT NULL
 - duration: BIGINT NOT NULL
 - application_date: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL
 - updated_at: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
Inputs:
 - default_catalog.sources.ApplicationsStream__base
Annotations:
 - stream-root: ApplicationsStream

=== LoanTypesStream
ID:          default_catalog.sources.LoanTypesStream
Type:        stream
Stage:       flink
Primary key: id, updated_at
Timestamp:   updated_at
Row count:   ~1e8
---
Schema:
 - id: BIGINT NOT NULL
 - name: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - description: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
 - interest_rate: DOUBLE NOT NULL
 - max_amount: DOUBLE NOT NULL
 - min_amount: DOUBLE NOT NULL
 - max_duration: BIGINT NOT NULL
 - min_duration: BIGINT NOT NULL
 - updated_at: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
Inputs:
 - default_catalog.sources.LoanTypesStream__base
Annotations:
 - stream-root: LoanTypesStream

>>>flink-sql-no-functions.sql
CREATE DATABASE IF NOT EXISTS `sources`;
USE `sources`;
CREATE TEMPORARY TABLE `CustomersStream__schema` (
  `id` BIGINT NOT NULL,
  `first_name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `last_name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `email` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `phone` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `address` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `date_of_birth` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `updated_at` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL
)
WITH (
  'connector' = 'datagen'
);
CREATE TABLE `CustomersStream` (
  PRIMARY KEY (`id`, `updated_at`) NOT ENFORCED,
  WATERMARK FOR `updated_at` AS `updated_at` - INTERVAL '0.001' SECOND
)
WITH (
  'format' = 'flexible-json',
  'path' = '${DATA_PATH}/customers.jsonl',
  'source.monitor-interval' = '10 sec',
  'connector' = 'filesystem'
)
LIKE `CustomersStream__schema`;
CREATE TEMPORARY TABLE `ApplicationUpdates__schema` (
  `loan_application_id` BIGINT NOT NULL,
  `status` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `message` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `event_time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL
)
WITH (
  'connector' = 'filesystem',
  'format' = 'flexible-json',
  'path' = '${DATA_PATH}/application_updates.jsonl'
);
CREATE TABLE `ApplicationUpdates` (
  WATERMARK FOR `event_time` AS `event_time` - INTERVAL '0.001' SECOND
)
LIKE `ApplicationUpdates__schema`;
CREATE TEMPORARY TABLE `ApplicationsStream__schema` (
  `id` BIGINT NOT NULL,
  `customer_id` BIGINT NOT NULL,
  `loan_type_id` BIGINT NOT NULL,
  `amount` DOUBLE NOT NULL,
  `duration` BIGINT NOT NULL,
  `application_date` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  `updated_at` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL
)
WITH (
  'connector' = 'datagen'
);
CREATE TABLE `ApplicationsStream` (
  PRIMARY KEY (`id`, `updated_at`) NOT ENFORCED,
  WATERMARK FOR `updated_at` AS `updated_at` - INTERVAL '0.001' SECOND
)
WITH (
  'format' = 'flexible-json',
  'path' = '${DATA_PATH}/applications.jsonl',
  'source.monitor-interval' = '10 sec',
  'connector' = 'filesystem'
)
LIKE `ApplicationsStream__schema`;
CREATE TEMPORARY TABLE `LoanTypesStream__schema` (
  `id` BIGINT NOT NULL,
  `name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `description` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `interest_rate` DOUBLE NOT NULL,
  `max_amount` DOUBLE NOT NULL,
  `min_amount` DOUBLE NOT NULL,
  `max_duration` BIGINT NOT NULL,
  `min_duration` BIGINT NOT NULL,
  `updated_at` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL
)
WITH (
  'connector' = 'datagen'
);
CREATE TABLE `LoanTypesStream` (
  PRIMARY KEY (`id`, `updated_at`) NOT ENFORCED,
  WATERMARK FOR `updated_at` AS `updated_at` - INTERVAL '0.001' SECOND
)
WITH (
  'format' = 'flexible-json',
  'path' = '${DATA_PATH}/loan_types.jsonl',
  'source.monitor-interval' = '10 sec',
  'connector' = 'filesystem'
)
LIKE `LoanTypesStream__schema`;
USE CATALOG `default_catalog`;
CREATE DATABASE IF NOT EXISTS `default_database`;
USE `default_database`;
CREATE VIEW `_Applications`
AS
SELECT `id`, `customer_id`, `loan_type_id`, `amount`, `duration`, `application_date`, `updated_at`
FROM (SELECT `id`, `customer_id`, `loan_type_id`, `amount`, `duration`, `application_date`, `updated_at`, ROW_NUMBER() OVER (PARTITION BY `id` ORDER BY `updated_at` DESC NULLS LAST) AS `__sqrlinternal_rownum`
  FROM `default_catalog`.`sources`.`ApplicationsStream`) AS `t`
WHERE `__sqrlinternal_rownum` = 1;
CREATE VIEW `_LoanTypes`
AS
SELECT `id`, `name`, `description`, `interest_rate`, `max_amount`, `min_amount`, `max_duration`, `min_duration`, `updated_at`
FROM (SELECT `id`, `name`, `description`, `interest_rate`, `max_amount`, `min_amount`, `max_duration`, `min_duration`, `updated_at`, ROW_NUMBER() OVER (PARTITION BY `id` ORDER BY `updated_at` DESC NULLS LAST) AS `__sqrlinternal_rownum`
  FROM `default_catalog`.`sources`.`LoanTypesStream`) AS `t`
WHERE `__sqrlinternal_rownum` = 1;
CREATE VIEW `ApplicationStatus`
AS
SELECT `u`.`status`, `u`.`message`, `u`.`event_time`, `a`.`id`, `a`.`customer_id`, `a`.`loan_type_id`, `a`.`amount`, `a`.`duration`, `t`.`max_amount`, `t`.`min_amount`
FROM `sources`.`ApplicationUpdates` AS `u`
 INNER JOIN `_Applications` FOR SYSTEM_TIME AS OF `u`.`event_time` AS `a` ON `a`.`id` = `u`.`loan_application_id`
 INNER JOIN `_LoanTypes` FOR SYSTEM_TIME AS OF `u`.`event_time` AS `t` ON `t`.`id` = `a`.`loan_type_id`;
CREATE VIEW `ApplicationInfo`
AS
SELECT `a`.*, `t`.`id` AS `id1`, `t2`.`id` AS `id2`
FROM `_Applications` AS `a`
 INNER JOIN `_LoanTypes` AS `t` ON `t`.`id` = `a`.`loan_type_id`
 LEFT JOIN `_LoanTypes` AS `t2` ON `t2`.`id` = `a`.`customer_id`;
CREATE TABLE `ApplicationStatus_1` (
  `status` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `message` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `event_time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  `id` BIGINT NOT NULL,
  `customer_id` BIGINT NOT NULL,
  `loan_type_id` BIGINT NOT NULL,
  `amount` DOUBLE NOT NULL,
  `duration` BIGINT NOT NULL,
  `max_amount` DOUBLE NOT NULL,
  `min_amount` DOUBLE NOT NULL,
  PRIMARY KEY (`customer_id`) NOT ENFORCED
)
PARTITIONED BY (`customer_id`)
WITH (
  'catalog-name' = 'mydatabase',
  'catalog-table' = 'ApplicationStatus',
  'catalog-type' = 'hadoop',
  'commit.retry.max-wait-ms' = '5000',
  'commit.retry.min-wait-ms' = '100',
  'commit.retry.num-retries' = '20',
  'connector' = 'iceberg',
  'format-version' = '2',
  'warehouse' = '/tmp/duckdb',
  'write.distribution-mode' = 'hash',
  'write.parquet.page-size-bytes' = '1000'
);
CREATE TABLE `_Applications_2` (
  `id` BIGINT NOT NULL,
  `customer_id` BIGINT NOT NULL,
  `loan_type_id` BIGINT NOT NULL,
  `amount` DOUBLE NOT NULL,
  `duration` BIGINT NOT NULL,
  `application_date` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  `updated_at` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  PRIMARY KEY (`id`, `loan_type_id`) NOT ENFORCED
)
PARTITIONED BY (`loan_type_id`)
WITH (
  'catalog-name' = 'mydatabase',
  'catalog-table' = '_Applications',
  'catalog-type' = 'hadoop',
  'commit.retry.max-wait-ms' = '5000',
  'commit.retry.min-wait-ms' = '100',
  'commit.retry.num-retries' = '20',
  'connector' = 'iceberg',
  'format-version' = '2',
  'warehouse' = '/tmp/duckdb',
  'write.distribution-mode' = 'hash',
  'write.parquet.page-size-bytes' = '1000',
  'write.upsert.enabled' = 'true'
);
CREATE TABLE `_LoanTypes_3` (
  `id` BIGINT NOT NULL,
  `name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `description` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `interest_rate` DOUBLE NOT NULL,
  `max_amount` DOUBLE NOT NULL,
  `min_amount` DOUBLE NOT NULL,
  `max_duration` BIGINT NOT NULL,
  `min_duration` BIGINT NOT NULL,
  `updated_at` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  PRIMARY KEY (`id`) NOT ENFORCED
)
WITH (
  'catalog-name' = 'mydatabase',
  'catalog-table' = '_LoanTypes',
  'catalog-type' = 'hadoop',
  'commit.retry.max-wait-ms' = '5000',
  'commit.retry.min-wait-ms' = '100',
  'commit.retry.num-retries' = '20',
  'connector' = 'iceberg',
  'format-version' = '2',
  'warehouse' = '/tmp/duckdb',
  'write.distribution-mode' = 'hash',
  'write.parquet.page-size-bytes' = '1000',
  'write.upsert.enabled' = 'true'
);
EXECUTE STATEMENT SET BEGIN
INSERT INTO `default_catalog`.`default_database`.`ApplicationStatus_1`
SELECT *
 FROM `default_catalog`.`default_database`.`ApplicationStatus`
;
INSERT INTO `default_catalog`.`default_database`.`_Applications_2`
 SELECT *
  FROM `default_catalog`.`default_database`.`_Applications`
 ;
 INSERT INTO `default_catalog`.`default_database`.`_LoanTypes_3`
  SELECT *
   FROM `default_catalog`.`default_database`.`_LoanTypes`
  ;
  END
>>>iceberg-schema.sql
CREATE TABLE IF NOT EXISTS "ApplicationStatus" ("status" TEXT NOT NULL, "message" TEXT NOT NULL, "event_time" TIMESTAMP WITH TIME ZONE NOT NULL, "id" BIGINT NOT NULL, "customer_id" BIGINT NOT NULL, "loan_type_id" BIGINT NOT NULL, "amount" DOUBLE PRECISION NOT NULL, "duration" BIGINT NOT NULL, "max_amount" DOUBLE PRECISION NOT NULL, "min_amount" DOUBLE PRECISION NOT NULL, PRIMARY KEY ("customer_id")) PARTITIONED BY ("customer_id");
CREATE TABLE IF NOT EXISTS "_Applications" ("id" BIGINT NOT NULL, "customer_id" BIGINT NOT NULL, "loan_type_id" BIGINT NOT NULL, "amount" DOUBLE PRECISION NOT NULL, "duration" BIGINT NOT NULL, "application_date" TIMESTAMP WITH TIME ZONE NOT NULL, "updated_at" TIMESTAMP WITH TIME ZONE NOT NULL, PRIMARY KEY ("id","loan_type_id")) PARTITIONED BY ("loan_type_id");
CREATE TABLE IF NOT EXISTS "_LoanTypes" ("id" BIGINT NOT NULL, "name" TEXT NOT NULL, "description" TEXT NOT NULL, "interest_rate" DOUBLE PRECISION NOT NULL, "max_amount" DOUBLE PRECISION NOT NULL, "min_amount" DOUBLE PRECISION NOT NULL, "max_duration" BIGINT NOT NULL, "min_duration" BIGINT NOT NULL, "updated_at" TIMESTAMP WITH TIME ZONE NOT NULL, PRIMARY KEY ("id"))
>>>iceberg-snowflake-schema.sql
CREATE OR REPLACE ICEBERG TABLE ApplicationStatus EXTERNAL_VOLUME = 'MyNewVolume' CATALOG = 'MyCatalog' CATALOG_TABLE_NAME = 'ApplicationStatus';
CREATE OR REPLACE ICEBERG TABLE _Applications EXTERNAL_VOLUME = 'MyNewVolume' CATALOG = 'MyCatalog' CATALOG_TABLE_NAME = '_Applications';
CREATE OR REPLACE ICEBERG TABLE _LoanTypes EXTERNAL_VOLUME = 'MyNewVolume' CATALOG = 'MyCatalog' CATALOG_TABLE_NAME = '_LoanTypes'
>>>iceberg-snowflake-views.sql
CREATE OR REPLACE VIEW ApplicationInfo(id, customer_id, loan_type_id, amount, duration, application_date, updated_at, id1, id2) AS SELECT _Applications.id, _Applications.customer_id, _Applications.loan_type_id, _Applications.amount, _Applications.duration, _Applications.application_date, _Applications.updated_at, _LoanTypes.id AS id1, _LoanTypes0.id AS id2
FROM _Applications
INNER JOIN _LoanTypes ON _Applications.loan_type_id = _LoanTypes.id
LEFT JOIN _LoanTypes AS _LoanTypes0 ON _Applications.customer_id = _LoanTypes0.id
>>>iceberg-views.sql

