>>>pipeline_explain.txt
=== ApplicationsStream
ID:          default_catalog.sources.ApplicationsStream
Type:        stream
Stage:       flink
Primary key: id, updated_at
Timestamp:   updated_at
Row count:   ~1e8
---
Schema:
 - id: BIGINT NOT NULL
 - customer_id: BIGINT NOT NULL
 - loan_type_id: BIGINT NOT NULL
 - amount: DOUBLE NOT NULL
 - duration: BIGINT NOT NULL
 - application_date: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL
 - updated_at: TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
Inputs:
 - default_catalog.sources.ApplicationsStream__base
Annotations:
 - stream-root: ApplicationsStream

=== iceberg-sink
ID:          iceberg.iceberg-sink
Type:        export
Stage:       flink
Connector:   iceberg
---
Inputs:
 - default_catalog.sources.ApplicationsStream

>>>flink-sql-no-functions.sql
CREATE DATABASE IF NOT EXISTS `sources`;
USE `sources`;
CREATE TEMPORARY TABLE `CustomersStream__schema` (
  `id` BIGINT NOT NULL,
  `first_name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `last_name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `email` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `phone` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `address` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `date_of_birth` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `updated_at` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL
)
WITH (
  'connector' = 'datagen'
);
CREATE TABLE `CustomersStream` (
  PRIMARY KEY (`id`, `updated_at`) NOT ENFORCED,
  WATERMARK FOR `updated_at` AS `updated_at` - INTERVAL '0.001' SECOND
)
WITH (
  'format' = 'flexible-json',
  'path' = '${DATA_PATH}/customers.jsonl',
  'source.monitor-interval' = '10 sec',
  'connector' = 'filesystem'
)
LIKE `CustomersStream__schema`;
CREATE TEMPORARY TABLE `ApplicationUpdates__schema` (
  `loan_application_id` BIGINT NOT NULL,
  `status` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `message` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `event_time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL
)
WITH (
  'connector' = 'filesystem',
  'format' = 'flexible-json',
  'path' = '${DATA_PATH}/application_updates.jsonl'
);
CREATE TABLE `ApplicationUpdates` (
  WATERMARK FOR `event_time` AS `event_time` - INTERVAL '0.001' SECOND
)
LIKE `ApplicationUpdates__schema`;
CREATE TEMPORARY TABLE `ApplicationsStream__schema` (
  `id` BIGINT NOT NULL,
  `customer_id` BIGINT NOT NULL,
  `loan_type_id` BIGINT NOT NULL,
  `amount` DOUBLE NOT NULL,
  `duration` BIGINT NOT NULL,
  `application_date` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  `updated_at` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL
)
WITH (
  'connector' = 'datagen'
);
CREATE TABLE `ApplicationsStream` (
  PRIMARY KEY (`id`, `updated_at`) NOT ENFORCED,
  WATERMARK FOR `updated_at` AS `updated_at` - INTERVAL '0.001' SECOND
)
WITH (
  'format' = 'flexible-json',
  'path' = '${DATA_PATH}/applications.jsonl',
  'source.monitor-interval' = '10 sec',
  'connector' = 'filesystem'
)
LIKE `ApplicationsStream__schema`;
CREATE TEMPORARY TABLE `LoanTypesStream__schema` (
  `id` BIGINT NOT NULL,
  `name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `description` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `interest_rate` DOUBLE NOT NULL,
  `max_amount` DOUBLE NOT NULL,
  `min_amount` DOUBLE NOT NULL,
  `max_duration` BIGINT NOT NULL,
  `min_duration` BIGINT NOT NULL,
  `updated_at` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL
)
WITH (
  'connector' = 'datagen'
);
CREATE TABLE `LoanTypesStream` (
  PRIMARY KEY (`id`, `updated_at`) NOT ENFORCED,
  WATERMARK FOR `updated_at` AS `updated_at` - INTERVAL '0.001' SECOND
)
WITH (
  'format' = 'flexible-json',
  'path' = '${DATA_PATH}/loan_types.jsonl',
  'source.monitor-interval' = '10 sec',
  'connector' = 'filesystem'
)
LIKE `LoanTypesStream__schema`;
USE CATALOG `default_catalog`;
CREATE DATABASE IF NOT EXISTS `default_database`;
USE `default_database`;
CREATE TEMPORARY TABLE `Iceberg_sink_ex1__schema` (
  `id` BIGINT NOT NULL,
  `customer_id` BIGINT NOT NULL,
  `loan_type_id` BIGINT NOT NULL,
  `amount` DOUBLE NOT NULL,
  `duration` BIGINT NOT NULL,
  `application_date` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  `updated_at` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL
)
WITH (
  'connector' = 'datagen'
);
CREATE TABLE `Iceberg_sink_ex1` (
  PRIMARY KEY (`id`, `updated_at`) NOT ENFORCED
)
WITH (
  'connector' = 'iceberg',
  'catalog-table' = 'my-table',
  'warehouse' = '/tmp/test_iceberg_wh',
  'catalog-type' = 'hadoop',
  'catalog-name' = 'mydatabase'
)
LIKE `Iceberg_sink_ex1__schema`;
EXECUTE STATEMENT SET BEGIN
INSERT INTO `default_catalog`.`default_database`.`Iceberg_sink_ex1`
SELECT *
 FROM `default_catalog`.`sources`.`ApplicationsStream`
;
END
>>>iceberg-schema.sql

>>>iceberg-views.sql

