>>>pipeline_explain.json
[ {
  "id" : "access:Click",
  "name" : "Click",
  "type" : "query",
  "stage" : "postgres",
  "documentation" : "",
  "inputs" : [ "default_catalog.default_database.Click" ],
  "annotations" : [ {
    "name" : "stream-root",
    "description" : "Click"
  }, {
    "name" : "base-table",
    "description" : "Click"
  } ],
  "plan" : "LogicalProject(url=[$0], timestamp=[$1], userid=[$2])\n  LogicalTableScan(table=[[default_catalog, default_database, Click]])\n",
  "sql" : "SELECT *\nFROM `default_catalog`.`default_database`.`Click`"
}, {
  "id" : "access:Recommendation",
  "name" : "Recommendation",
  "type" : "query",
  "stage" : "postgres",
  "documentation" : "",
  "inputs" : [ "default_catalog.default_database.Recommendation" ],
  "annotations" : [ {
    "name" : "parameters",
    "description" : "url"
  }, {
    "name" : "base-table",
    "description" : "Recommendation"
  } ],
  "plan" : "LogicalProject(url=[$0], rec=[$1], frequency=[$2])\n  LogicalFilter(condition=[=($0, ?0)])\n    LogicalTableScan(table=[[default_catalog, default_database, Recommendation]])\n",
  "sql" : "SELECT *\nFROM `default_catalog`.`default_database`.`Recommendation`\nWHERE `url` = ?"
}, {
  "id" : "access:Trending",
  "name" : "Trending",
  "type" : "query",
  "stage" : "postgres",
  "documentation" : "",
  "inputs" : [ "default_catalog.default_database.Trending" ],
  "annotations" : [ {
    "name" : "base-table",
    "description" : "Trending"
  } ],
  "plan" : "LogicalProject(url=[$0], total=[$1])\n  LogicalTableScan(table=[[default_catalog, default_database, Trending]])\n",
  "sql" : "SELECT *\nFROM `default_catalog`.`default_database`.`Trending`"
}, {
  "id" : "access:VisitAfter",
  "name" : "VisitAfter",
  "type" : "query",
  "stage" : "postgres",
  "documentation" : "",
  "inputs" : [ "default_catalog.default_database.VisitAfter" ],
  "annotations" : [ {
    "name" : "base-table",
    "description" : "VisitAfter"
  } ],
  "plan" : "LogicalProject(beforeURL=[$0], afterURL=[$1], timestamp=[$2])\n  LogicalTableScan(table=[[default_catalog, default_database, VisitAfter]])\n",
  "sql" : "SELECT *\nFROM `default_catalog`.`default_database`.`VisitAfter`"
}, {
  "id" : "default_catalog.default_database.Click",
  "name" : "Click",
  "type" : "stream",
  "stage" : "flink",
  "documentation" : "",
  "inputs" : [ "default_catalog.default_database.Click__base" ],
  "annotations" : [ {
    "name" : "stream-root",
    "description" : "Click"
  } ],
  "plan" : "LogicalWatermarkAssigner(rowtime=[timestamp], watermark=[-($1, 1000:INTERVAL SECOND)])\n  LogicalTableScan(table=[[default_catalog, default_database, Click]])\n",
  "sql" : "CREATE TEMPORARY TABLE `Click__schema` (\n  `url` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n  `timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,\n  `userid` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL\n)\nWITH (\n  'connector' = 'datagen'\n);\nCREATE TABLE `Click` (\n  PRIMARY KEY (`url`, `userid`, `timestamp`) NOT ENFORCED,\n  WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '1' SECOND\n)\nWITH (\n  'format' = 'flexible-json',\n  'path' = '${DATA_PATH}/click.jsonl',\n  'source.monitor-interval' = '10 sec',\n  'connector' = 'filesystem'\n)\nLIKE `Click__schema`",
  "timestamp" : "timestamp",
  "schema" : [ {
    "name" : "url",
    "type" : "VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\" NOT NULL"
  }, {
    "name" : "timestamp",
    "type" : "TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL"
  }, {
    "name" : "userid",
    "type" : "VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\" NOT NULL"
  } ],
  "primary_key" : [ "url", "userid", "timestamp" ],
  "row_count" : "~1e8"
}, {
  "id" : "default_catalog.default_database.Click__base",
  "name" : "Click",
  "type" : "import",
  "stage" : "flink",
  "documentation" : "",
  "connector" : {
    "format" : "flexible-json",
    "path" : "${DATA_PATH}/click.jsonl",
    "source.monitor-interval" : "10 sec",
    "connector" : "filesystem"
  }
}, {
  "id" : "default_catalog.default_database.Recommendation",
  "name" : "Recommendation",
  "type" : "state",
  "stage" : "flink",
  "documentation" : "",
  "inputs" : [ "default_catalog.default_database.VisitAfter" ],
  "annotations" : [ {
    "name" : "sort",
    "description" : "[0 ASC-nulls-first, 2 DESC-nulls-last]"
  } ],
  "plan" : "LogicalAggregate(group=[{0, 1}], frequency=[COUNT()])\n  LogicalProject(url=[$0], rec=[$1])\n    LogicalTableScan(table=[[default_catalog, default_database, VisitAfter]])\n",
  "sql" : "CREATE VIEW `Recommendation` AS  SELECT beforeURL AS url, afterURL AS rec,\n    count(1) AS frequency FROM VisitAfter\n    GROUP BY beforeURL, afterURL ORDER BY url ASC, frequency DESC;\n",
  "timestamp" : "-",
  "schema" : [ {
    "name" : "url",
    "type" : "VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\" NOT NULL"
  }, {
    "name" : "rec",
    "type" : "VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\" NOT NULL"
  }, {
    "name" : "frequency",
    "type" : "BIGINT NOT NULL"
  } ],
  "primary_key" : [ "url", "rec" ],
  "row_count" : "~2e7"
}, {
  "id" : "default_catalog.default_database.Trending",
  "name" : "Trending",
  "type" : "state",
  "stage" : "flink",
  "documentation" : "",
  "inputs" : [ "default_catalog.default_database.Click" ],
  "annotations" : [ {
    "name" : "sort",
    "description" : "[1 DESC-nulls-last, 0 ASC-nulls-first]"
  } ],
  "plan" : "LogicalAggregate(group=[{0}], total=[COUNT()])\n  LogicalProject(url=[$0])\n    LogicalTableScan(table=[[default_catalog, default_database, Click]])\n",
  "sql" : "CREATE VIEW `Trending` AS  SELECT url, count(1) AS total\n    FROM Click\n    GROUP BY url ORDER BY total DESC, url ASC;\n",
  "timestamp" : "-",
  "schema" : [ {
    "name" : "url",
    "type" : "VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\" NOT NULL"
  }, {
    "name" : "total",
    "type" : "BIGINT NOT NULL"
  } ],
  "primary_key" : [ "url" ],
  "row_count" : "~1e7"
}, {
  "id" : "default_catalog.default_database.VisitAfter",
  "name" : "VisitAfter",
  "type" : "stream",
  "stage" : "flink",
  "documentation" : "",
  "inputs" : [ "default_catalog.default_database.Click" ],
  "annotations" : [ ],
  "plan" : "LogicalProject(beforeURL=[$0], afterURL=[$3], timestamp=[$4])\n  LogicalJoin(condition=[AND(=($2, $5), <($1, $4), >=($1, -($4, *(10, 60000:INTERVAL MINUTE))))], joinType=[inner])\n    LogicalTableScan(table=[[default_catalog, default_database, Click]])\n    LogicalTableScan(table=[[default_catalog, default_database, Click]])\n",
  "sql" : "CREATE VIEW `VisitAfter` AS  SELECT b.url AS beforeURL, a.url AS afterURL,\n    a.`timestamp` AS `timestamp`\n    FROM Click b JOIN Click a ON b.userid=a.userid AND\n        b.`timestamp` < a.`timestamp` AND\n        b.`timestamp` >= a.`timestamp` - INTERVAL 10 MINUTE;\n",
  "timestamp" : "timestamp",
  "schema" : [ {
    "name" : "beforeURL",
    "type" : "VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\" NOT NULL"
  }, {
    "name" : "afterURL",
    "type" : "VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\" NOT NULL"
  }, {
    "name" : "timestamp",
    "type" : "TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL"
  } ],
  "row_count" : "~3e7"
} ]
>>>pipeline_source.sqrl
CREATE TEMPORARY TABLE `Click__schema` (
  `url` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,
  `timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  `userid` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL
)
WITH (
  'connector' = 'datagen'
);
CREATE TABLE `Click` (
  PRIMARY KEY (`url`, `userid`, `timestamp`) NOT ENFORCED,
  WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '1' SECOND
)
WITH (
  'format' = 'flexible-json',
  'path' = '${DATA_PATH}/click.jsonl',
  'source.monitor-interval' = '10 sec',
  'connector' = 'filesystem'
)
LIKE `Click__schema`;
Trending := SELECT url, count(1) AS total
    FROM Click
    GROUP BY url ORDER BY total DESC, url ASC;
VisitAfter := SELECT b.url AS beforeURL, a.url AS afterURL,
    a.`timestamp` AS `timestamp`
    FROM Click b JOIN Click a ON b.userid=a.userid AND
        b.`timestamp` < a.`timestamp` AND
        b.`timestamp` >= a.`timestamp` - INTERVAL 10 MINUTE;
/*+query_by_all(url) */
Recommendation := SELECT beforeURL AS url, afterURL AS rec,
    count(1) AS frequency FROM VisitAfter
    GROUP BY beforeURL, afterURL ORDER BY url ASC, frequency DESC;
/*+test */
RankTest := SELECT url, count(1) AS total
            FROM Click
            GROUP BY url ORDER BY total DESC, url ASC;

