>>>pipeline_explain.json
[ {
  "id" : "access:ApplicationInfo",
  "name" : "ApplicationInfo",
  "type" : "query",
  "stage" : "iceberg",
  "documentation" : "",
  "inputs" : [ "default_catalog.default_database.ApplicationInfo" ],
  "annotations" : [ {
    "name" : "base-table",
    "description" : "ApplicationInfo"
  } ],
  "plan" : "LogicalProject(id=[$0], customer_id=[$1], loan_type_id=[$2], amount=[$3], duration=[$4], application_date=[$5], updated_at=[$6], id1=[$7], id2=[$8])\n  LogicalTableScan(table=[[default_catalog, default_database, ApplicationInfo]])\n",
  "sql" : "SELECT *\nFROM `default_catalog`.`default_database`.`ApplicationInfo`"
}, {
  "id" : "access:ApplicationStatus",
  "name" : "ApplicationStatus",
  "type" : "query",
  "stage" : "iceberg",
  "documentation" : "Status of an Application",
  "inputs" : [ "default_catalog.default_database.ApplicationStatus" ],
  "annotations" : [ {
    "name" : "base-table",
    "description" : "ApplicationStatus"
  } ],
  "plan" : "LogicalProject(status=[$0], message=[$1], event_time=[$2], id=[$3], customer_id=[$4], loan_type_id=[$5], amount=[$6], duration=[$7], max_amount=[$8], min_amount=[$9])\n  LogicalTableScan(table=[[default_catalog, default_database, ApplicationStatus]])\n",
  "sql" : "SELECT *\nFROM `default_catalog`.`default_database`.`ApplicationStatus`"
}, {
  "id" : "default_catalog.default_database.ApplicationInfo",
  "name" : "ApplicationInfo",
  "type" : "state",
  "stage" : "iceberg",
  "documentation" : "",
  "inputs" : [ "default_catalog.default_database._Applications", "default_catalog.default_database._LoanTypes" ],
  "annotations" : [ ],
  "plan" : "LogicalProject(id=[$0], customer_id=[$1], loan_type_id=[$2], amount=[$3], duration=[$4], application_date=[$5], updated_at=[$6], id1=[$7], id2=[$16])\n  LogicalJoin(condition=[=($16, $1)], joinType=[left])\n    LogicalJoin(condition=[=($7, $2)], joinType=[inner])\n      LogicalTableScan(table=[[default_catalog, default_database, _Applications]])\n      LogicalTableScan(table=[[default_catalog, default_database, _LoanTypes]])\n    LogicalTableScan(table=[[default_catalog, default_database, _LoanTypes]])\n",
  "sql" : "CREATE VIEW `ApplicationInfo` AS  SELECT a.*, t.id AS id1, t2.id AS id2 FROM _Applications a\n    JOIN _LoanTypes t ON t.id = a.loan_type_id\n    LEFT JOIN _LoanTypes t2 ON t2.id = a.customer_id;\n",
  "timestamp" : "updated_at",
  "schema" : [ {
    "name" : "id",
    "type" : "BIGINT NOT NULL"
  }, {
    "name" : "customer_id",
    "type" : "BIGINT NOT NULL"
  }, {
    "name" : "loan_type_id",
    "type" : "BIGINT NOT NULL"
  }, {
    "name" : "amount",
    "type" : "DOUBLE NOT NULL"
  }, {
    "name" : "duration",
    "type" : "BIGINT NOT NULL"
  }, {
    "name" : "application_date",
    "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL"
  }, {
    "name" : "updated_at",
    "type" : "TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL"
  }, {
    "name" : "id1",
    "type" : "BIGINT NOT NULL"
  }, {
    "name" : "id2",
    "type" : "BIGINT"
  } ],
  "primary_key" : [ "id", "loan_type_id" ],
  "row_count" : "~1e8"
}, {
  "id" : "default_catalog.default_database.ApplicationStatus",
  "name" : "ApplicationStatus",
  "type" : "stream",
  "stage" : "flink",
  "documentation" : "Status of an Application",
  "inputs" : [ "default_catalog.default_database._Applications", "default_catalog.default_database._LoanTypes", "default_catalog.sources.ApplicationUpdates" ],
  "annotations" : [ ],
  "plan" : "LogicalProject(status=[$1], message=[$2], event_time=[$3], id=[$4], customer_id=[$5], loan_type_id=[$6], amount=[$7], duration=[$8], max_amount=[$15], min_amount=[$16])\n  LogicalCorrelate(correlation=[$cor3], joinType=[inner], requiredColumns=[{3, 6}])\n    LogicalCorrelate(correlation=[$cor2], joinType=[inner], requiredColumns=[{0, 3}])\n      LogicalTableScan(table=[[default_catalog, sources, ApplicationUpdates]])\n      LogicalFilter(condition=[=($0, $cor2.loan_application_id)])\n        LogicalSnapshot(period=[$cor2.event_time])\n          LogicalTableScan(table=[[default_catalog, default_database, _Applications]])\n    LogicalFilter(condition=[=($0, $cor3.loan_type_id)])\n      LogicalSnapshot(period=[$cor3.event_time])\n        LogicalTableScan(table=[[default_catalog, default_database, _LoanTypes]])\n",
  "sql" : "CREATE VIEW `ApplicationStatus` AS  SELECT u.status, u.message, u.event_time, a.id, a.customer_id, a.loan_type_id,\n                            a.amount, a.duration, t.max_amount, t.min_amount\n                     FROM sources.ApplicationUpdates u JOIN _Applications FOR SYSTEM_TIME AS OF u.`event_time` a ON a.id = u.loan_application_id\n                                               JOIN _LoanTypes FOR SYSTEM_TIME AS OF u.`event_time` t ON t.id = a.loan_type_id;\n",
  "timestamp" : "event_time",
  "schema" : [ {
    "name" : "status",
    "type" : "VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\" NOT NULL"
  }, {
    "name" : "message",
    "type" : "VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\" NOT NULL"
  }, {
    "name" : "event_time",
    "type" : "TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL"
  }, {
    "name" : "id",
    "type" : "BIGINT NOT NULL"
  }, {
    "name" : "customer_id",
    "type" : "BIGINT NOT NULL"
  }, {
    "name" : "loan_type_id",
    "type" : "BIGINT NOT NULL"
  }, {
    "name" : "amount",
    "type" : "DOUBLE NOT NULL"
  }, {
    "name" : "duration",
    "type" : "BIGINT NOT NULL"
  }, {
    "name" : "max_amount",
    "type" : "DOUBLE NOT NULL"
  }, {
    "name" : "min_amount",
    "type" : "DOUBLE NOT NULL"
  } ],
  "primary_key" : [ "customer_id" ],
  "row_count" : "~8e7"
}, {
  "id" : "default_catalog.default_database._Applications",
  "name" : "_Applications",
  "type" : "state",
  "stage" : "flink",
  "documentation" : "",
  "inputs" : [ "default_catalog.sources.ApplicationsStream" ],
  "annotations" : [ {
    "name" : "mostRecentDistinct",
    "description" : "true"
  }, {
    "name" : "stream-root",
    "description" : "ApplicationsStream"
  } ],
  "plan" : "LogicalProject(id=[$0], customer_id=[$1], loan_type_id=[$2], amount=[$3], duration=[$4], application_date=[$5], updated_at=[$6])\n  LogicalFilter(condition=[=($7, 1)])\n    LogicalProject(id=[$0], customer_id=[$1], loan_type_id=[$2], amount=[$3], duration=[$4], application_date=[$5], updated_at=[$6], __sqrlinternal_rownum=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $6 DESC NULLS LAST)])\n      LogicalTableScan(table=[[default_catalog, sources, ApplicationsStream]])\n",
  "sql" : "CREATE VIEW `_Applications`\nAS\nSELECT `id`, `customer_id`, `loan_type_id`, `amount`, `duration`, `application_date`, `updated_at`\nFROM (SELECT `id`, `customer_id`, `loan_type_id`, `amount`, `duration`, `application_date`, `updated_at`, ROW_NUMBER() OVER (PARTITION BY `id` ORDER BY `updated_at` DESC NULLS LAST) AS `__sqrlinternal_rownum`\n  FROM `default_catalog`.`sources`.`ApplicationsStream`) AS `t`\nWHERE `__sqrlinternal_rownum` = 1",
  "timestamp" : "updated_at",
  "schema" : [ {
    "name" : "id",
    "type" : "BIGINT NOT NULL"
  }, {
    "name" : "customer_id",
    "type" : "BIGINT NOT NULL"
  }, {
    "name" : "loan_type_id",
    "type" : "BIGINT NOT NULL"
  }, {
    "name" : "amount",
    "type" : "DOUBLE NOT NULL"
  }, {
    "name" : "duration",
    "type" : "BIGINT NOT NULL"
  }, {
    "name" : "application_date",
    "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL"
  }, {
    "name" : "updated_at",
    "type" : "TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL"
  } ],
  "primary_key" : [ "id", "loan_type_id" ],
  "row_count" : "~2e7"
}, {
  "id" : "default_catalog.default_database._LoanTypes",
  "name" : "_LoanTypes",
  "type" : "state",
  "stage" : "flink",
  "documentation" : "",
  "inputs" : [ "default_catalog.sources.LoanTypesStream" ],
  "annotations" : [ {
    "name" : "mostRecentDistinct",
    "description" : "true"
  }, {
    "name" : "stream-root",
    "description" : "LoanTypesStream"
  } ],
  "plan" : "LogicalProject(id=[$0], name=[$1], description=[$2], interest_rate=[$3], max_amount=[$4], min_amount=[$5], max_duration=[$6], min_duration=[$7], updated_at=[$8])\n  LogicalFilter(condition=[=($9, 1)])\n    LogicalProject(id=[$0], name=[$1], description=[$2], interest_rate=[$3], max_amount=[$4], min_amount=[$5], max_duration=[$6], min_duration=[$7], updated_at=[$8], __sqrlinternal_rownum=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $8 DESC NULLS LAST)])\n      LogicalTableScan(table=[[default_catalog, sources, LoanTypesStream]])\n",
  "sql" : "CREATE VIEW `_LoanTypes`\nAS\nSELECT `id`, `name`, `description`, `interest_rate`, `max_amount`, `min_amount`, `max_duration`, `min_duration`, `updated_at`\nFROM (SELECT `id`, `name`, `description`, `interest_rate`, `max_amount`, `min_amount`, `max_duration`, `min_duration`, `updated_at`, ROW_NUMBER() OVER (PARTITION BY `id` ORDER BY `updated_at` DESC NULLS LAST) AS `__sqrlinternal_rownum`\n  FROM `default_catalog`.`sources`.`LoanTypesStream`) AS `t`\nWHERE `__sqrlinternal_rownum` = 1",
  "timestamp" : "updated_at",
  "schema" : [ {
    "name" : "id",
    "type" : "BIGINT NOT NULL"
  }, {
    "name" : "name",
    "type" : "VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\" NOT NULL"
  }, {
    "name" : "description",
    "type" : "VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\" NOT NULL"
  }, {
    "name" : "interest_rate",
    "type" : "DOUBLE NOT NULL"
  }, {
    "name" : "max_amount",
    "type" : "DOUBLE NOT NULL"
  }, {
    "name" : "min_amount",
    "type" : "DOUBLE NOT NULL"
  }, {
    "name" : "max_duration",
    "type" : "BIGINT NOT NULL"
  }, {
    "name" : "min_duration",
    "type" : "BIGINT NOT NULL"
  }, {
    "name" : "updated_at",
    "type" : "TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL"
  } ],
  "primary_key" : [ "id" ],
  "row_count" : "~2e7"
}, {
  "id" : "default_catalog.sources.ApplicationUpdates",
  "name" : "ApplicationUpdates",
  "type" : "stream",
  "stage" : "flink",
  "documentation" : "",
  "inputs" : [ "default_catalog.sources.ApplicationUpdates__base" ],
  "annotations" : [ ],
  "plan" : "LogicalWatermarkAssigner(rowtime=[event_time], watermark=[-($3, 1:INTERVAL SECOND)])\n  LogicalTableScan(table=[[default_catalog, sources, ApplicationUpdates]])\n",
  "sql" : "CREATE TEMPORARY TABLE `ApplicationUpdates__schema` (\n  `loan_application_id` BIGINT NOT NULL,\n  `status` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n  `message` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n  `event_time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL\n)\nWITH (\n  'connector' = 'filesystem',\n  'format' = 'flexible-json',\n  'path' = '${DATA_PATH}/application_updates.jsonl'\n);\nCREATE TABLE `ApplicationUpdates` (\n  WATERMARK FOR `event_time` AS `event_time` - INTERVAL '0.001' SECOND\n)\nLIKE `ApplicationUpdates__schema`",
  "timestamp" : "event_time",
  "schema" : [ {
    "name" : "loan_application_id",
    "type" : "BIGINT NOT NULL"
  }, {
    "name" : "status",
    "type" : "VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\" NOT NULL"
  }, {
    "name" : "message",
    "type" : "VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\" NOT NULL"
  }, {
    "name" : "event_time",
    "type" : "TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL"
  } ],
  "row_count" : "~1e8"
}, {
  "id" : "default_catalog.sources.ApplicationUpdates__base",
  "name" : "ApplicationUpdates",
  "type" : "import",
  "stage" : "flink",
  "documentation" : "",
  "connector" : {
    "format" : "flexible-json",
    "path" : "${DATA_PATH}/application_updates.jsonl",
    "connector" : "filesystem"
  }
}, {
  "id" : "default_catalog.sources.ApplicationsStream",
  "name" : "ApplicationsStream",
  "type" : "stream",
  "stage" : "flink",
  "documentation" : "",
  "inputs" : [ "default_catalog.sources.ApplicationsStream__base" ],
  "annotations" : [ {
    "name" : "stream-root",
    "description" : "ApplicationsStream"
  } ],
  "plan" : "LogicalWatermarkAssigner(rowtime=[updated_at], watermark=[-($6, 1:INTERVAL SECOND)])\n  LogicalTableScan(table=[[default_catalog, sources, ApplicationsStream]])\n",
  "sql" : "CREATE TEMPORARY TABLE `ApplicationsStream__schema` (\n  `id` BIGINT NOT NULL,\n  `customer_id` BIGINT NOT NULL,\n  `loan_type_id` BIGINT NOT NULL,\n  `amount` DOUBLE NOT NULL,\n  `duration` BIGINT NOT NULL,\n  `application_date` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,\n  `updated_at` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL\n)\nWITH (\n  'connector' = 'datagen'\n);\nCREATE TABLE `ApplicationsStream` (\n  PRIMARY KEY (`id`, `updated_at`) NOT ENFORCED,\n  WATERMARK FOR `updated_at` AS `updated_at` - INTERVAL '0.001' SECOND\n)\nWITH (\n  'format' = 'flexible-json',\n  'path' = '${DATA_PATH}/applications.jsonl',\n  'source.monitor-interval' = '10 sec',\n  'connector' = 'filesystem'\n)\nLIKE `ApplicationsStream__schema`",
  "timestamp" : "updated_at",
  "schema" : [ {
    "name" : "id",
    "type" : "BIGINT NOT NULL"
  }, {
    "name" : "customer_id",
    "type" : "BIGINT NOT NULL"
  }, {
    "name" : "loan_type_id",
    "type" : "BIGINT NOT NULL"
  }, {
    "name" : "amount",
    "type" : "DOUBLE NOT NULL"
  }, {
    "name" : "duration",
    "type" : "BIGINT NOT NULL"
  }, {
    "name" : "application_date",
    "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL"
  }, {
    "name" : "updated_at",
    "type" : "TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL"
  } ],
  "primary_key" : [ "id", "updated_at" ],
  "row_count" : "~1e8"
}, {
  "id" : "default_catalog.sources.ApplicationsStream__base",
  "name" : "ApplicationsStream",
  "type" : "import",
  "stage" : "flink",
  "documentation" : "",
  "connector" : {
    "format" : "flexible-json",
    "path" : "${DATA_PATH}/applications.jsonl",
    "source.monitor-interval" : "10 sec",
    "connector" : "filesystem"
  }
}, {
  "id" : "default_catalog.sources.LoanTypesStream",
  "name" : "LoanTypesStream",
  "type" : "stream",
  "stage" : "flink",
  "documentation" : "",
  "inputs" : [ "default_catalog.sources.LoanTypesStream__base" ],
  "annotations" : [ {
    "name" : "stream-root",
    "description" : "LoanTypesStream"
  } ],
  "plan" : "LogicalWatermarkAssigner(rowtime=[updated_at], watermark=[-($8, 1:INTERVAL SECOND)])\n  LogicalTableScan(table=[[default_catalog, sources, LoanTypesStream]])\n",
  "sql" : "CREATE TEMPORARY TABLE `LoanTypesStream__schema` (\n  `id` BIGINT NOT NULL,\n  `name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n  `description` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n  `interest_rate` DOUBLE NOT NULL,\n  `max_amount` DOUBLE NOT NULL,\n  `min_amount` DOUBLE NOT NULL,\n  `max_duration` BIGINT NOT NULL,\n  `min_duration` BIGINT NOT NULL,\n  `updated_at` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL\n)\nWITH (\n  'connector' = 'datagen'\n);\nCREATE TABLE `LoanTypesStream` (\n  PRIMARY KEY (`id`, `updated_at`) NOT ENFORCED,\n  WATERMARK FOR `updated_at` AS `updated_at` - INTERVAL '0.001' SECOND\n)\nWITH (\n  'format' = 'flexible-json',\n  'path' = '${DATA_PATH}/loan_types.jsonl',\n  'source.monitor-interval' = '10 sec',\n  'connector' = 'filesystem'\n)\nLIKE `LoanTypesStream__schema`",
  "timestamp" : "updated_at",
  "schema" : [ {
    "name" : "id",
    "type" : "BIGINT NOT NULL"
  }, {
    "name" : "name",
    "type" : "VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\" NOT NULL"
  }, {
    "name" : "description",
    "type" : "VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\" NOT NULL"
  }, {
    "name" : "interest_rate",
    "type" : "DOUBLE NOT NULL"
  }, {
    "name" : "max_amount",
    "type" : "DOUBLE NOT NULL"
  }, {
    "name" : "min_amount",
    "type" : "DOUBLE NOT NULL"
  }, {
    "name" : "max_duration",
    "type" : "BIGINT NOT NULL"
  }, {
    "name" : "min_duration",
    "type" : "BIGINT NOT NULL"
  }, {
    "name" : "updated_at",
    "type" : "TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL"
  } ],
  "primary_key" : [ "id", "updated_at" ],
  "row_count" : "~1e8"
}, {
  "id" : "default_catalog.sources.LoanTypesStream__base",
  "name" : "LoanTypesStream",
  "type" : "import",
  "stage" : "flink",
  "documentation" : "",
  "connector" : {
    "format" : "flexible-json",
    "path" : "${DATA_PATH}/loan_types.jsonl",
    "source.monitor-interval" : "10 sec",
    "connector" : "filesystem"
  }
} ]
>>>pipeline_source.sqrl
CREATE DATABASE IF NOT EXISTS `sources`;
USE `sources`;
CREATE TABLE CustomersStream (
PRIMARY KEY (id, `updated_at`) NOT ENFORCED,
WATERMARK FOR `updated_at` AS `updated_at` - INTERVAL '0.001' SECOND
) WITH (
'format' = 'flexible-json',
'path' = '${DATA_PATH}/customers.jsonl',
'source.monitor-interval' = '10 sec',
'connector' = 'filesystem'
)
LIKE `customers.schema.yml`;
CREATE TABLE ApplicationUpdates (
    WATERMARK FOR `event_time` AS `event_time` - INTERVAL '0.001' SECOND
) LIKE `application_updates.jsonl`;
CREATE TABLE ApplicationsStream (
PRIMARY KEY (id, `updated_at`) NOT ENFORCED,
WATERMARK FOR `updated_at` AS `updated_at` - INTERVAL '0.001' SECONDS
) WITH (
'format' = 'flexible-json',
'path' = '${DATA_PATH}/applications.jsonl',
'source.monitor-interval' = '10 sec',
'connector' = 'filesystem'
)
LIKE `applications.schema.yml`;
CREATE TABLE LoanTypesStream (
PRIMARY KEY (id, `updated_at`) NOT ENFORCED,
WATERMARK FOR `updated_at` AS `updated_at` - INTERVAL '0.001' SECOND
) WITH (
'format' = 'flexible-json',
'path' = '${DATA_PATH}/loan_types.jsonl',
'source.monitor-interval' = '10 sec',
'connector' = 'filesystem'
)
LIKE `loan_types.schema.yml`;
USE CATALOG `default_catalog`;
CREATE DATABASE IF NOT EXISTS `default_database`;
USE `default_database`;
/*+primary_key(id, loan_type_id) */
/*+partition_key(loan_type_id) */
_Applications := DISTINCT sources.ApplicationsStream ON id ORDER BY updated_at DESC;
_LoanTypes := DISTINCT sources.LoanTypesStream ON id ORDER BY updated_at DESC;
/** Status of an Application */
/*+primary_key(customer_id) */
/*+partition_key(customer_id) */
ApplicationStatus := SELECT u.status, u.message, u.event_time, a.id, a.customer_id, a.loan_type_id,
                            a.amount, a.duration, t.max_amount, t.min_amount
                     FROM sources.ApplicationUpdates u JOIN _Applications FOR SYSTEM_TIME AS OF u.`event_time` a ON a.id = u.loan_application_id
                                               JOIN _LoanTypes FOR SYSTEM_TIME AS OF u.`event_time` t ON t.id = a.loan_type_id;
ApplicationInfo := SELECT a.*, t.id AS id1, t2.id AS id2 FROM _Applications a
    JOIN _LoanTypes t ON t.id = a.loan_type_id
    LEFT JOIN _LoanTypes t2 ON t2.id = a.customer_id;
/*+test */
ApplicationInfoTest := SELECT COUNT(1) AS num FROM ApplicationInfo WHERE id > 0;

