== Abstract Syntax Tree ==
LogicalProject(user_id=[$0])
+- LogicalFilter(condition=[>($1, TO_TIMESTAMP_LTZ(1000, 3))])
   +- LogicalJoin(condition=[=($0, $2)], joinType=[inner])
      :- LogicalProject(user_id=[$0], ts=[$2])
      :  +- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($2, 5000:INTERVAL SECOND)])
      :     +- LogicalProject(user_id=[$0], ts_ms=[$1], ts=[TO_TIMESTAMP_LTZ($1, 3)])
      :        +- LogicalTableScan(table=[[default_catalog, default_database, events]])
      +- LogicalTableScan(table=[[default_catalog, default_database, dim_users]])

== Optimized Physical Plan ==
Calc(select=[user_id])
+- Join(joinType=[InnerJoin], where=[=(user_id, user_id0)], select=[user_id, ts, user_id0, segment], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
   :- Exchange(distribution=[hash[user_id]])
   :  +- Calc(select=[user_id, CAST(ts AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS ts], where=[>(CAST(ts AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)), 1970-01-01 00:00:01:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))])
   :     +- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 5000:INTERVAL SECOND)])
   :        +- Calc(select=[user_id, TO_TIMESTAMP_LTZ(ts_ms, 3) AS ts])
   :           +- TableSourceScan(table=[[default_catalog, default_database, events]], fields=[user_id, ts_ms])
   +- Exchange(distribution=[hash[user_id]])
      +- TableSourceScan(table=[[default_catalog, default_database, dim_users]], fields=[user_id, segment])

== Optimized Execution Plan ==
Calc(select=[user_id])
+- Join(joinType=[InnerJoin], where=[(user_id = user_id0)], select=[user_id, ts, user_id0, segment], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
   :- Exchange(distribution=[hash[user_id]])
   :  +- Calc(select=[user_id, CAST(ts AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS ts], where=[(CAST(ts AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) > 1970-01-01 00:00:01)])
   :     +- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 5000:INTERVAL SECOND)])
   :        +- Calc(select=[user_id, TO_TIMESTAMP_LTZ(ts_ms, 3) AS ts])
   :           +- TableSourceScan(table=[[default_catalog, default_database, events]], fields=[user_id, ts_ms])
   +- Exchange(distribution=[hash[user_id]])
      +- TableSourceScan(table=[[default_catalog, default_database, dim_users]], fields=[user_id, segment])
