>>>pipeline_explain.json
[ {
  "id" : "access:Employees",
  "name" : "Employees",
  "type" : "query",
  "stage" : "postgres",
  "documentation" : "",
  "inputs" : [ "default_catalog.default_database.Employees" ],
  "annotations" : [ {
    "name" : "parameters",
    "description" : "employeeid, name"
  }, {
    "name" : "base-table",
    "description" : "Employees"
  } ],
  "plan" : "LogicalProject(employeeid=[$0], name=[$1], email=[$2], updatedDate=[$3])\n  LogicalFilter(condition=[AND(OR(=($0, ?0), IS NULL(?0)), OR(=($1, ?1), IS NULL(?1)))])\n    LogicalTableScan(table=[[default_catalog, default_database, Employees]])\n",
  "sql" : "SELECT *\nFROM `default_catalog`.`default_database`.`Employees`\nWHERE (`employeeid` = ? OR ? IS NULL) AND (`name` = ? OR ? IS NULL)"
}, {
  "id" : "access:Employees.allReports",
  "name" : "Employees.allReports",
  "type" : "query",
  "stage" : "postgres",
  "documentation" : "Returns all employees reporting to the given employee following down the reporting chain",
  "inputs" : [ "default_catalog.default_database.Employees", "default_catalog.default_database.Reporting" ],
  "annotations" : [ {
    "name" : "parameters",
    "description" : "employeeid"
  }, {
    "name" : "base-table",
    "description" : "Employees.allReports"
  } ],
  "plan" : "LogicalValues(tuples=[[]])\n",
  "sql" : "WITH RECURSIVE employee_hierarchy AS (\n    SELECT r.employeeid, r.managerid, 1 as level\n    FROM \"Reporting\" r\n    WHERE r.managerid = $?$0\n    UNION ALL\n    SELECT r.employeeid, r.managerid, eh.level + 1 as level\n    FROM \"Reporting\" r\n    INNER JOIN employee_hierarchy eh ON r.managerid = eh.employeeid\n  )\n  SELECT e.employeeid, e.name, eh.level\n  FROM employee_hierarchy eh\n  JOIN \"Employees\" e ON eh.employeeid = e.employeeid\n  ORDER BY eh.level, e.employeeid"
}, {
  "id" : "access:Employees.directReports",
  "name" : "Employees.directReports",
  "type" : "query",
  "stage" : "postgres",
  "documentation" : "",
  "inputs" : [ "default_catalog.default_database.Employees", "default_catalog.default_database.Reporting" ],
  "annotations" : [ {
    "name" : "parameters",
    "description" : "employeeid"
  }, {
    "name" : "base-table",
    "description" : "Employees"
  } ],
  "plan" : "LogicalSort(sort0=[$0], dir0=[ASC-nulls-first])\n  LogicalProject(employeeid=[$3], name=[$4], email=[$5], updatedDate=[$6])\n    LogicalFilter(condition=[=($1, ?0)])\n      LogicalJoin(condition=[=($0, $3)], joinType=[inner])\n        LogicalTableScan(table=[[default_catalog, default_database, Reporting]])\n        LogicalTableScan(table=[[default_catalog, default_database, Employees]])\n",
  "sql" : "CREATE VIEW `directReports` AS  SELECT e.* FROM Reporting r JOIN Employees e ON r.employeeid = e.employeeid\n                           WHERE r.managerid = ?               ORDER BY e.employeeid;\n"
}, {
  "id" : "default_catalog.default_database.Employees",
  "name" : "Employees",
  "type" : "state",
  "stage" : "flink",
  "documentation" : "",
  "inputs" : [ "default_catalog.hr.Employees" ],
  "annotations" : [ {
    "name" : "mostRecentDistinct",
    "description" : "true"
  } ],
  "plan" : "LogicalProject(employeeid=[$0], name=[$1], email=[$2], updatedDate=[$3])\n  LogicalFilter(condition=[=($4, 1)])\n    LogicalProject(employeeid=[$0], name=[$1], email=[$2], updatedDate=[$3], __sqrlinternal_rownum=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 DESC NULLS LAST)])\n      LogicalTableScan(table=[[default_catalog, hr, Employees]])\n",
  "sql" : "CREATE VIEW `Employees`\nAS\nSELECT `employeeid`, `name`, `email`, `updatedDate`\nFROM (SELECT `employeeid`, `name`, `email`, `updatedDate`, ROW_NUMBER() OVER (PARTITION BY `employeeid` ORDER BY `updatedDate` DESC NULLS LAST) AS `__sqrlinternal_rownum`\n  FROM `default_catalog`.`hr`.`Employees`) AS `t`\nWHERE `__sqrlinternal_rownum` = 1",
  "timestamp" : "updatedDate",
  "schema" : [ {
    "name" : "employeeid",
    "type" : "BIGINT NOT NULL"
  }, {
    "name" : "name",
    "type" : "VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\" NOT NULL"
  }, {
    "name" : "email",
    "type" : "VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\" NOT NULL"
  }, {
    "name" : "updatedDate",
    "type" : "TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL"
  } ],
  "primary_key" : [ "employeeid" ],
  "row_count" : "~2e7"
}, {
  "id" : "default_catalog.default_database.Reporting",
  "name" : "Reporting",
  "type" : "state",
  "stage" : "flink",
  "documentation" : "",
  "inputs" : [ "default_catalog.hr.Reporting" ],
  "annotations" : [ {
    "name" : "mostRecentDistinct",
    "description" : "true"
  } ],
  "plan" : "LogicalProject(employeeid=[$0], managerid=[$1], updatedDate=[$2])\n  LogicalFilter(condition=[=($3, 1)])\n    LogicalProject(employeeid=[$0], managerid=[$1], updatedDate=[$2], __sqrlinternal_rownum=[ROW_NUMBER() OVER (PARTITION BY $0, $1 ORDER BY $2 DESC NULLS LAST)])\n      LogicalTableScan(table=[[default_catalog, hr, Reporting]])\n",
  "sql" : "CREATE VIEW `Reporting`\nAS\nSELECT `employeeid`, `managerid`, `updatedDate`\nFROM (SELECT `employeeid`, `managerid`, `updatedDate`, ROW_NUMBER() OVER (PARTITION BY `employeeid`, `managerid` ORDER BY `updatedDate` DESC NULLS LAST) AS `__sqrlinternal_rownum`\n  FROM `default_catalog`.`hr`.`Reporting`) AS `t`\nWHERE `__sqrlinternal_rownum` = 1",
  "timestamp" : "updatedDate",
  "schema" : [ {
    "name" : "employeeid",
    "type" : "BIGINT NOT NULL"
  }, {
    "name" : "managerid",
    "type" : "BIGINT NOT NULL"
  }, {
    "name" : "updatedDate",
    "type" : "TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL"
  } ],
  "primary_key" : [ "employeeid", "managerid" ],
  "row_count" : "~2e7"
}, {
  "id" : "default_catalog.hr.Employees",
  "name" : "Employees",
  "type" : "stream",
  "stage" : "flink",
  "documentation" : "",
  "inputs" : [ "default_catalog.hr.Employees__base" ],
  "annotations" : [ ],
  "plan" : "LogicalWatermarkAssigner(rowtime=[updatedDate], watermark=[-($3, 1:INTERVAL SECOND)])\n  LogicalTableScan(table=[[default_catalog, hr, Employees]])\n",
  "sql" : "CREATE TEMPORARY TABLE `Employees__schema` (\n  `employeeid` BIGINT NOT NULL,\n  `name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n  `email` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n  `updatedDate` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL\n)\nWITH (\n  'connector' = 'filesystem',\n  'format' = 'flexible-json',\n  'path' = '${DATA_PATH}/employees.jsonl'\n);\nCREATE TABLE `Employees` (\n  WATERMARK FOR `updatedDate` AS `updatedDate` - INTERVAL '0.001' SECOND\n)\nWITH (\n  'source.monitor-interval' = '10 sec'\n)\nLIKE `Employees__schema`",
  "timestamp" : "updatedDate",
  "schema" : [ {
    "name" : "employeeid",
    "type" : "BIGINT NOT NULL"
  }, {
    "name" : "name",
    "type" : "VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\" NOT NULL"
  }, {
    "name" : "email",
    "type" : "VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\" NOT NULL"
  }, {
    "name" : "updatedDate",
    "type" : "TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL"
  } ],
  "row_count" : "~1e8"
}, {
  "id" : "default_catalog.hr.Employees__base",
  "name" : "Employees",
  "type" : "import",
  "stage" : "flink",
  "documentation" : "",
  "connector" : {
    "format" : "flexible-json",
    "path" : "${DATA_PATH}/employees.jsonl",
    "source.monitor-interval" : "10 sec",
    "connector" : "filesystem"
  }
}, {
  "id" : "default_catalog.hr.Reporting",
  "name" : "Reporting",
  "type" : "stream",
  "stage" : "flink",
  "documentation" : "",
  "inputs" : [ "default_catalog.hr.Reporting__base" ],
  "annotations" : [ ],
  "plan" : "LogicalWatermarkAssigner(rowtime=[updatedDate], watermark=[-($2, 1:INTERVAL SECOND)])\n  LogicalTableScan(table=[[default_catalog, hr, Reporting]])\n",
  "sql" : "CREATE TEMPORARY TABLE `Reporting__schema` (\n  `employeeid` BIGINT NOT NULL,\n  `managerid` BIGINT NOT NULL,\n  `updatedDate` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL\n)\nWITH (\n  'connector' = 'filesystem',\n  'format' = 'flexible-json',\n  'path' = '${DATA_PATH}/reporting.jsonl'\n);\nCREATE TABLE `Reporting` (\n  WATERMARK FOR `updatedDate` AS `updatedDate` - INTERVAL '0.001' SECOND\n)\nWITH (\n  'source.monitor-interval' = '10 sec'\n)\nLIKE `Reporting__schema`",
  "timestamp" : "updatedDate",
  "schema" : [ {
    "name" : "employeeid",
    "type" : "BIGINT NOT NULL"
  }, {
    "name" : "managerid",
    "type" : "BIGINT NOT NULL"
  }, {
    "name" : "updatedDate",
    "type" : "TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL"
  } ],
  "row_count" : "~1e8"
}, {
  "id" : "default_catalog.hr.Reporting__base",
  "name" : "Reporting",
  "type" : "import",
  "stage" : "flink",
  "documentation" : "",
  "connector" : {
    "format" : "flexible-json",
    "path" : "${DATA_PATH}/reporting.jsonl",
    "source.monitor-interval" : "10 sec",
    "connector" : "filesystem"
  }
} ]
>>>pipeline_source.sqrl
CREATE DATABASE IF NOT EXISTS `hr`;
USE `hr`;
CREATE TABLE `Employees` (
    WATERMARK FOR `updatedDate` AS `updatedDate` - INTERVAL '0.001' SECOND
) WITH (
    'source.monitor-interval' = '10 sec'
) LIKE `employees.jsonl`;
CREATE TABLE `Reporting` (
    WATERMARK FOR `updatedDate` AS `updatedDate` - INTERVAL '0.001' SECOND
) WITH (
    'source.monitor-interval' = '10 sec'
) LIKE `reporting.jsonl`;
USE CATALOG `default_catalog`;
CREATE DATABASE IF NOT EXISTS `default_database`;
USE `default_database`;
CREATE FUNCTION IF NOT EXISTS `default_catalog`.`default_database`.`binomial_distribution` AS 'com.datasqrl.flinkrunner.stdlib.math.binomial_distribution' LANGUAGE JAVA;
CREATE FUNCTION IF NOT EXISTS `default_catalog`.`default_database`.`cbrt` AS 'com.datasqrl.flinkrunner.stdlib.math.cbrt' LANGUAGE JAVA;
CREATE FUNCTION IF NOT EXISTS `default_catalog`.`default_database`.`copy_sign` AS 'com.datasqrl.flinkrunner.stdlib.math.copy_sign' LANGUAGE JAVA;
CREATE FUNCTION IF NOT EXISTS `default_catalog`.`default_database`.`expm1` AS 'com.datasqrl.flinkrunner.stdlib.math.expm1' LANGUAGE JAVA;
CREATE FUNCTION IF NOT EXISTS `default_catalog`.`default_database`.`exponential_distribution` AS 'com.datasqrl.flinkrunner.stdlib.math.exponential_distribution' LANGUAGE JAVA;
CREATE FUNCTION IF NOT EXISTS `default_catalog`.`default_database`.`hypot` AS 'com.datasqrl.flinkrunner.stdlib.math.hypot' LANGUAGE JAVA;
CREATE FUNCTION IF NOT EXISTS `default_catalog`.`default_database`.`log1p` AS 'com.datasqrl.flinkrunner.stdlib.math.log1p' LANGUAGE JAVA;
CREATE FUNCTION IF NOT EXISTS `default_catalog`.`default_database`.`next_after` AS 'com.datasqrl.flinkrunner.stdlib.math.next_after' LANGUAGE JAVA;
CREATE FUNCTION IF NOT EXISTS `default_catalog`.`default_database`.`normal_distribution` AS 'com.datasqrl.flinkrunner.stdlib.math.normal_distribution' LANGUAGE JAVA;
CREATE FUNCTION IF NOT EXISTS `default_catalog`.`default_database`.`poisson_distribution` AS 'com.datasqrl.flinkrunner.stdlib.math.poisson_distribution' LANGUAGE JAVA;
CREATE FUNCTION IF NOT EXISTS `default_catalog`.`default_database`.`scalb` AS 'com.datasqrl.flinkrunner.stdlib.math.scalb' LANGUAGE JAVA;
CREATE FUNCTION IF NOT EXISTS `default_catalog`.`default_database`.`ulp` AS 'com.datasqrl.flinkrunner.stdlib.math.ulp' LANGUAGE JAVA;
/*+ query_by_any(employeeid, name) */
Employees := DISTINCT hr.Employees ON employeeid ORDER BY updatedDate DESC;
/*+ no_query */
Reporting := DISTINCT hr.Reporting ON employeeid, managerid ORDER BY updatedDate DESC;
Employees.directReports := SELECT e.* FROM Reporting r JOIN Employees e ON r.employeeid = e.employeeid
                           WHERE r.managerid = this.employeeid ORDER BY e.employeeid;
/** Returns all employees reporting to the given employee following down the reporting chain
 */
Employees.allReports RETURNS (employeeid BIGINT NOT NULL, name STRING NOT NULL, level INT NOT NULL) :=
  WITH RECURSIVE employee_hierarchy AS (
    SELECT r.employeeid, r.managerid, 1 as level
    FROM "Reporting" r
    WHERE r.managerid = this.employeeid
    UNION ALL
    SELECT r.employeeid, r.managerid, eh.level + 1 as level
    FROM "Reporting" r
    INNER JOIN employee_hierarchy eh ON r.managerid = eh.employeeid
  )
  SELECT e.employeeid, e.name, eh.level
  FROM employee_hierarchy eh
  JOIN "Employees" e ON eh.employeeid = e.employeeid
  ORDER BY eh.level, e.employeeid;
/*+test */
EmployeeCountTest := SELECT count(*) AS num_employees FROM Employees;

