From 55d423f49c7bda7848fa0dc9fc2e3afafc06de6b Mon Sep 17 00:00:00 2001 From: lincoln lee Date: Tue, 24 Sep 2024 14:00:14 +0800 Subject: [PATCH] update tests --- .../stream/sql/NonDeterministicDagTest.xml | 3612 +++++++++++++++++ 1 file changed, 3612 insertions(+) create mode 100644 flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml new file mode 100644 index 00000000000000..a0a00b51d832bd --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml @@ -0,0 +1,3612 @@ + + + + + + UNIX_TIMESTAMP() - 180) as varchar) valid_uv +from T +group by a +]]> + + + ($1, -(UNIX_TIMESTAMP(), 180)))]) + +- LogicalTableScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a, b, c, d)]]]) +]]> + + + (UNIX_TIMESTAMP() - 180)) IS TRUE AS $f2]) + +- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d]) +]]> + + + + + UNIX_TIMESTAMP() - 180) as varchar) valid_uv +from T +group by a +]]> + + + ($1, -(UNIX_TIMESTAMP(), 180)))]) + +- LogicalTableScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a, b, c, d)]]]) +]]> + + + (UNIX_TIMESTAMP() - 180)) IS TRUE AS $f2]) + +- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d]) +]]> + + + + + UNIX_TIMESTAMP() - 180) as varchar) valid_uv +from cdc +group by a +]]> + + + ($1, -(UNIX_TIMESTAMP(), 180)))]) + +- LogicalTableScan(table=[[default_catalog, default_database, cdc]]) +]]> + + + (UNIX_TIMESTAMP() - 180)) IS TRUE AS $f2]) + +- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, c, b], metadata=[]]], fields=[a, c, b]) +]]> + + + + + UNIX_TIMESTAMP() - 180) as varchar) valid_uv +from cdc +group by a +]]> + + + ($1, -(UNIX_TIMESTAMP(), 180)))]) + +- LogicalTableScan(table=[[default_catalog, default_database, cdc]]) +]]> + + + (UNIX_TIMESTAMP() - 180)) IS TRUE AS $f2]) + +- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, c, b], metadata=[]]], fields=[a, c, b]) +]]> + + + + + 100 +) +]]> + + + ($1, 100)]) + LogicalTableScan(table=[[default_catalog, default_database, cdc]]) +}))]) + +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2, DATE_FORMAT(CURRENT_TIMESTAMP, _UTF-16LE'yyMMdd'))], d=[$3]) + +- LogicalTableScan(table=[[default_catalog, default_database, cdc]]) +]]> + + + 100)]) + +- Reused(reference_id=[1]) +]]> + + + + + 100 +) +]]> + + + ($1, 100)]) + LogicalTableScan(table=[[default_catalog, default_database, cdc]]) +}))]) + +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2, DATE_FORMAT(CURRENT_TIMESTAMP, _UTF-16LE'yyMMdd'))], d=[$3]) + +- LogicalTableScan(table=[[default_catalog, default_database, cdc]]) +]]> + + + 100)]) + +- Reused(reference_id=[1]) +]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + ndFunc(t2.b) +]]> + + + ($cor0.b, ndFunc($1)))]) + +- LogicalSnapshot(period=[$cor0.proctime]) + +- LogicalTableScan(table=[[default_catalog, default_database, dim_with_pk]]) +]]> + + + ndFunc(b0))], select=[a, b, a0, b0, c], upsertKey=[[0]]) + +- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, b], metadata=[]]], fields=[a, b]) +]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + UNIX_TIMESTAMP() - 300 +]]> + + + ($1, -(UNIX_TIMESTAMP(), 300)))]) + +- LogicalSnapshot(period=[$cor0.proctime]) + +- LogicalTableScan(table=[[default_catalog, default_database, dim_with_pk]]) +]]> + + + (UNIX_TIMESTAMP() - 300))], select=[a, a, b, c], upsertKey=[[0]]) + +- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a], metadata=[]]], fields=[a]) +]]> + + + + + 100 +]]> + + + (ndFunc($1), 100))]) + +- LogicalSnapshot(period=[$cor0.proctime]) + +- LogicalTableScan(table=[[default_catalog, default_database, dim_with_pk]]) +]]> + + + 100)], select=[a, b, c, a], upsertKey=[[0]]) + +- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, b, c], metadata=[]]], fields=[a, b, c]) +]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + UNIX_TIMESTAMP() - 300 +]]> + + + ($cor0.b, -(UNIX_TIMESTAMP(), 300)))]) + +- LogicalSnapshot(period=[$cor0.proctime]) + +- LogicalTableScan(table=[[default_catalog, default_database, dim_with_pk]]) +]]> + + + (UNIX_TIMESTAMP() - 300))], select=[a, b, a0, b0, c], upsertKey=[[0]]) + +- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, b], metadata=[]]], fields=[a, b]) +]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + t2.proctime - INTERVAL '5' SECOND + ]]> + + + ($4, -($9, 5000:INTERVAL SECOND)))], joinType=[inner]) + :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], proctime=[PROCTIME()]) + : +- LogicalTableScan(table=[[default_catalog, default_database, cdc]]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, cdc]]) +]]> + + + (proctime0 - 5000:INTERVAL SECOND)))], select=[b, c, proctime, a, b0, proctime0], leftInputSpec=[NoUniqueKey], rightInputSpec=[HasUniqueKey]) + :- Exchange(distribution=[hash[b]]) + : +- Calc(select=[b, c, PROCTIME_MATERIALIZE(PROCTIME()) AS proctime]) + : +- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, b, c], metadata=[]]], fields=[a, b, c])(reuse_id=[1]) + +- Exchange(distribution=[hash[b]]) + +- Calc(select=[a, b, PROCTIME_MATERIALIZE(PROCTIME()) AS proctime]) + +- Reused(reference_id=[1]) +]]> + + + + + t2.proctime - INTERVAL '5' SECOND + ]]> + + + ($4, -($9, 5000:INTERVAL SECOND)))], joinType=[inner]) + :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], proctime=[PROCTIME()]) + : +- LogicalTableScan(table=[[default_catalog, default_database, cdc]]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, cdc]]) +]]> + + + (proctime0 - 5000:INTERVAL SECOND)))], select=[a, c, proctime, a0, b, proctime0], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) + :- Exchange(distribution=[hash[a]]) + : +- Calc(select=[a, c, PROCTIME_MATERIALIZE(PROCTIME()) AS proctime]) + : +- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, b, c], metadata=[]]], fields=[a, b, c])(reuse_id=[1]) + +- Exchange(distribution=[hash[a]]) + +- Calc(select=[a, b, PROCTIME_MATERIALIZE(PROCTIME()) AS proctime]) + +- Reused(reference_id=[1]) +]]> + + + + + t2.proctime - INTERVAL '5' SECOND + ]]> + + + ($4, -($9, 5000:INTERVAL SECOND)))], joinType=[inner]) + :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], proctime=[PROCTIME()]) + : +- LogicalTableScan(table=[[default_catalog, default_database, cdc]]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, cdc]]) +]]> + + + (proctime0 - 5000:INTERVAL SECOND)))], select=[a, c, proctime, a0, b, proctime0], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) + :- Exchange(distribution=[hash[a]]) + : +- Calc(select=[a, c, PROCTIME_MATERIALIZE(PROCTIME()) AS proctime]) + : +- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, b, c], metadata=[]]], fields=[a, b, c])(reuse_id=[1]) + +- Exchange(distribution=[hash[a]]) + +- Calc(select=[a, b, PROCTIME_MATERIALIZE(PROCTIME()) AS proctime]) + +- Reused(reference_id=[1]) +]]> + + + + + t2.op_ts - INTERVAL '5' SECOND + ]]> + + + ($4, -($9, 5000:INTERVAL SECOND)))], joinType=[inner]) + :- LogicalWatermarkAssigner(rowtime=[op_ts], watermark=[-($4, 5000:INTERVAL SECOND)]) + : +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_watermark]]) + +- LogicalWatermarkAssigner(rowtime=[op_ts], watermark=[-($4, 5000:INTERVAL SECOND)]) + +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_watermark]]) +]]> + + + (op_ts0 - 5000:INTERVAL SECOND)))], select=[a, b, op_ts, a0, c, op_ts0], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) + :- Exchange(distribution=[hash[a]]) + : +- Calc(select=[a, b, CAST(op_ts AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS op_ts]) + : +- WatermarkAssigner(rowtime=[op_ts], watermark=[(op_ts - 5000:INTERVAL SECOND)]) + : +- Calc(select=[a, b, op_ts]) + : +- TableSourceScan(table=[[default_catalog, default_database, cdc_with_watermark, project=[a, b, c, op_ts], metadata=[]]], fields=[a, b, c, op_ts])(reuse_id=[1]) + +- Exchange(distribution=[hash[a]]) + +- Calc(select=[a, c, CAST(op_ts AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS op_ts]) + +- WatermarkAssigner(rowtime=[op_ts], watermark=[(op_ts - 5000:INTERVAL SECOND)]) + +- Calc(select=[a, c, op_ts]) + +- Reused(reference_id=[1]) +]]> + + + + + t2.op_ts - INTERVAL '5' SECOND + ]]> + + + ($4, -($9, 5000:INTERVAL SECOND)))], joinType=[inner]) + :- LogicalWatermarkAssigner(rowtime=[op_ts], watermark=[-($4, 5000:INTERVAL SECOND)]) + : +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_watermark]]) + +- LogicalWatermarkAssigner(rowtime=[op_ts], watermark=[-($4, 5000:INTERVAL SECOND)]) + +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_watermark]]) +]]> + + + (op_ts0 - 5000:INTERVAL SECOND)))], select=[a, b, op_ts, a0, c, op_ts0], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) + :- Exchange(distribution=[hash[a]]) + : +- Calc(select=[a, b, CAST(op_ts AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS op_ts]) + : +- WatermarkAssigner(rowtime=[op_ts], watermark=[(op_ts - 5000:INTERVAL SECOND)]) + : +- Calc(select=[a, b, op_ts]) + : +- TableSourceScan(table=[[default_catalog, default_database, cdc_with_watermark, project=[a, b, c, op_ts], metadata=[]]], fields=[a, b, c, op_ts])(reuse_id=[1]) + +- Exchange(distribution=[hash[a]]) + +- Calc(select=[a, c, CAST(op_ts AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS op_ts]) + +- WatermarkAssigner(rowtime=[op_ts], watermark=[(op_ts - 5000:INTERVAL SECOND)]) + +- Calc(select=[a, c, op_ts]) + +- Reused(reference_id=[1]) +]]> + + + + + t2.op_ts - INTERVAL '5' SECOND + ]]> + + + ($4, -($9, 5000:INTERVAL SECOND)))], joinType=[inner]) + :- LogicalWatermarkAssigner(rowtime=[op_ts], watermark=[-($4, 5000:INTERVAL SECOND)]) + : +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_watermark]]) + +- LogicalWatermarkAssigner(rowtime=[op_ts], watermark=[-($4, 5000:INTERVAL SECOND)]) + +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_watermark]]) +]]> + + + (op_ts0 - 5000:INTERVAL SECOND)))], select=[a, b, op_ts, a0, c, op_ts0], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) + :- Exchange(distribution=[hash[a]]) + : +- Calc(select=[a, b, CAST(op_ts AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS op_ts]) + : +- WatermarkAssigner(rowtime=[op_ts], watermark=[(op_ts - 5000:INTERVAL SECOND)]) + : +- DropUpdateBefore + : +- Calc(select=[a, b, op_ts]) + : +- TableSourceScan(table=[[default_catalog, default_database, cdc_with_watermark, project=[a, b, c, op_ts], metadata=[]]], fields=[a, b, c, op_ts])(reuse_id=[1]) + +- Exchange(distribution=[hash[a]]) + +- Calc(select=[a, c, CAST(op_ts AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS op_ts]) + +- WatermarkAssigner(rowtime=[op_ts], watermark=[(op_ts - 5000:INTERVAL SECOND)]) + +- DropUpdateBefore + +- Calc(select=[a, c, op_ts]) + +- Reused(reference_id=[1]) +]]> + + + + + t2.op_ts - INTERVAL '5' SECOND + ]]> + + + ($4, -($9, 5000:INTERVAL SECOND)))], joinType=[inner]) + :- LogicalWatermarkAssigner(rowtime=[op_ts], watermark=[-($4, 5000:INTERVAL SECOND)]) + : +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_watermark]]) + +- LogicalWatermarkAssigner(rowtime=[op_ts], watermark=[-($4, 5000:INTERVAL SECOND)]) + +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_watermark]]) +]]> + + + (op_ts0 - 5000:INTERVAL SECOND)))], select=[a, b, op_ts, a0, c, op_ts0], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) + :- Exchange(distribution=[hash[a]]) + : +- Calc(select=[a, b, CAST(op_ts AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS op_ts]) + : +- WatermarkAssigner(rowtime=[op_ts], watermark=[(op_ts - 5000:INTERVAL SECOND)]) + : +- DropUpdateBefore + : +- Calc(select=[a, b, op_ts]) + : +- TableSourceScan(table=[[default_catalog, default_database, cdc_with_watermark, project=[a, b, c, op_ts], metadata=[]]], fields=[a, b, c, op_ts])(reuse_id=[1]) + +- Exchange(distribution=[hash[a]]) + +- Calc(select=[a, c, CAST(op_ts AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS op_ts]) + +- WatermarkAssigner(rowtime=[op_ts], watermark=[(op_ts - 5000:INTERVAL SECOND)]) + +- DropUpdateBefore + +- Calc(select=[a, c, op_ts]) + +- Reused(reference_id=[1]) +]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + UNIX_TIMESTAMP() - 300 +]]> + + + ($1, -(UNIX_TIMESTAMP(), 300))]) + +- LogicalTableScan(table=[[default_catalog, default_database, cdc]]) +]]> + + + (UNIX_TIMESTAMP() - 300))]) + +- TableSourceScan(table=[[default_catalog, default_database, cdc, filter=[], project=[a, b, c], metadata=[]]], fields=[a, b, c]) +]]> + + + + + UNIX_TIMESTAMP() - 300 +]]> + + + ($1, -(UNIX_TIMESTAMP(), 300))]) + +- LogicalTableScan(table=[[default_catalog, default_database, cdc]]) +]]> + + + (UNIX_TIMESTAMP() - 300))]) + +- TableSourceScan(table=[[default_catalog, default_database, cdc, filter=[], project=[a, b, c], metadata=[]]], fields=[a, b, c]) +]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + 1 +) AS T1 +]]> + + + (PREV(A.$0, 0), 1)]], inputFields=[[a, b, c, d, proctime]]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, src]]) +]]> + + + (PREV(A.$0, 0), 1)}]) + +- Exchange(distribution=[hash[c]]) + +- Calc(select=[a, b, c, d, PROCTIME() AS proctime]) + +- TableSourceScan(table=[[default_catalog, default_database, src]], fields=[a, b, c, d]) +]]> + + + + + 1 +) AS T1 +]]> + + + (PREV(A.$0, 0), 1)]], inputFields=[[a, b, c, d, proctime]]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, src]]) +]]> + + + (PREV(A.$0, 0), 1)}]) + +- Exchange(distribution=[hash[c]]) + +- Calc(select=[a, b, c, d, PROCTIME() AS proctime]) + +- TableSourceScan(table=[[default_catalog, default_database, src]], fields=[a, b, c, d]) +]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + ($2, 100)]) + +- LogicalProject(a=[$0], day=[$2], b=[$3], c=[$5]) + +- LogicalJoin(condition=[=($0, $6)], joinType=[inner]) + :- LogicalProject(a=[$0], b=[$1], day=[DATE_FORMAT(CURRENT_TIMESTAMP, _UTF-16LE'yyMMdd')]) + : +- LogicalTableScan(table=[[default_catalog, default_database, src1]]) + +- LogicalProject(b=[$1], day=[CONCAT($2, DATE_FORMAT(CURRENT_TIMESTAMP, _UTF-16LE'yyMMdd'))], c=[$2], d=[$3]) + +- LogicalTableScan(table=[[default_catalog, default_database, src2]]) +]]> + + + 100)]) + +- Reused(reference_id=[2]) +]]> + + + + + + + + + + + + + + + + ($3, 100)]) + +- LogicalProject(id=[$0], a=[$1.nested2.num], name=[$1.nested1.name], b=[+(+($1.nested1.value, $1.nested2.num), $3)]) + +- LogicalProject(id=[$0], deepNested=[$1], name=[$2], metadata_1=[$3], metadata_2=[$4]) + +- LogicalTableScan(table=[[default_catalog, default_database, nested_src, metadata=[metadata_1, metadata_2]]]) +]]> + + + 100)]) + +- Reused(reference_id=[1]) +]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + 100, b+d, b) as b, + case when d > 100 then json_value(c, '$.count') + else cast(b as string) || '#' end as c +from ( +select a, b, c, d from ( + select *, row_number() over(partition by a order by d desc) as rn + from ( + select a, d as b, c, ndFunc(b) as d from cdc + ) tmp +) tmp where rn = 1) tmp +]]> + + + ($0, 100), +($1, $3), $1)], c=[CASE(>($3, 100), JSON_VALUE($2, _UTF-16LE'$.count'), ||(CAST($1):VARCHAR(2147483647) CHARACTER SET "UTF-16LE", _UTF-16LE'#'))]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3]) + +- LogicalFilter(condition=[=($4, 1)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rn=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 DESC NULLS LAST)]) + +- LogicalProject(a=[$0], b=[$3], c=[$2], d=[ndFunc($1)]) + +- LogicalTableScan(table=[[default_catalog, default_database, cdc]]) +]]> + + + 100), (b + d), b) AS b, CASE((d > 100), JSON_VALUE(c, '$.count'), ||(CAST(b AS VARCHAR(2147483647)), '#')) AS c]) + +- Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[d DESC], select=[a, b, c, d]) + +- Exchange(distribution=[hash[a]]) + +- Calc(select=[a, d AS b, c, ndFunc(b) AS d]) + +- TableSourceScan(table=[[default_catalog, default_database, cdc]], fields=[a, b, c, d]) +]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + t2.proctime - INTERVAL '5' SECOND + ]]> + + + ($3, -($8, 5000:INTERVAL SECOND)))], joinType=[inner]) + :- LogicalTableScan(table=[[default_catalog, default_database, T1]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T1]]) +]]> + + + (proctime0 - 5000:INTERVAL SECOND)))], select=[a, b, proctime, a0, c, proctime0], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + :- Exchange(distribution=[hash[a]]) + : +- Calc(select=[a, b, PROCTIME_MATERIALIZE(proctime) AS proctime]) + : +- DataStreamScan(table=[[default_catalog, default_database, T1]], fields=[a, b, c, proctime, rowtime])(reuse_id=[1]) + +- Exchange(distribution=[hash[a]]) + +- Calc(select=[a, c, PROCTIME_MATERIALIZE(proctime) AS proctime]) + +- Reused(reference_id=[1]) +]]> + + + + + t2.proctime - INTERVAL '5' SECOND + ]]> + + + ($3, -($8, 5000:INTERVAL SECOND)))], joinType=[inner]) + :- LogicalTableScan(table=[[default_catalog, default_database, T1]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T1]]) +]]> + + + (proctime0 - 5000:INTERVAL SECOND)))], select=[a, b, proctime, a0, c, proctime0], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + :- Exchange(distribution=[hash[a]]) + : +- Calc(select=[a, b, PROCTIME_MATERIALIZE(proctime) AS proctime]) + : +- DataStreamScan(table=[[default_catalog, default_database, T1]], fields=[a, b, c, proctime, rowtime])(reuse_id=[1]) + +- Exchange(distribution=[hash[a]]) + +- Calc(select=[a, c, PROCTIME_MATERIALIZE(proctime) AS proctime]) + +- Reused(reference_id=[1]) +]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + 100 +) +]]> + + + ($1, 100)]) + LogicalTableScan(table=[[default_catalog, default_database, cdc]]) +})]) + +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2, DATE_FORMAT(CURRENT_TIMESTAMP, _UTF-16LE'yyMMdd'))], d=[$3]) + +- LogicalTableScan(table=[[default_catalog, default_database, cdc]]) +]]> + + + 100)]) + +- Reused(reference_id=[1]) +]]> + + + + + 100 +) +]]> + + + ($1, 100)]) + LogicalTableScan(table=[[default_catalog, default_database, cdc]]) +})]) + +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2, DATE_FORMAT(CURRENT_TIMESTAMP, _UTF-16LE'yyMMdd'))], d=[$3]) + +- LogicalTableScan(table=[[default_catalog, default_database, cdc]]) +]]> + + + 100)]) + +- Reused(reference_id=[1]) +]]> + + + + + 100 +) +]]> + + + ($1, 100)]) + LogicalTableScan(table=[[default_catalog, default_database, cdc]]) +})]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3]) + +- LogicalTableScan(table=[[default_catalog, default_database, cdc]]) +]]> + + + 100)]) + +- Reused(reference_id=[1]) +]]> + + + + + ($1, 100)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], day=[DATE_FORMAT(CURRENT_TIMESTAMP, _UTF-16LE'yyMMdd')]) + +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_computed_col]]) +]]> + + + 100)]) + +- Reused(reference_id=[1]) +]]> + + + + + 100 +]]> + + + ($1, 100)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], day=[DATE_FORMAT(CURRENT_TIMESTAMP, _UTF-16LE'yyMMdd')]) + +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_computed_col]]) +]]> + + + 100)]) + +- TableSourceScan(table=[[default_catalog, default_database, cdc_with_computed_col, filter=[], project=[a, b], metadata=[]]], fields=[a, b]) +]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + ($1, 0))]) + +- LogicalTableScan(table=[[default_catalog, default_database, src]]) +]]> + + + 0) IS TRUE AS $f3]) + +- TableSourceScan(table=[[default_catalog, default_database, src, project=[a, c, b], metadata=[]]], fields=[a, c, b]) +]]> + + + + + + + + ($1, 0))]) + +- LogicalTableScan(table=[[default_catalog, default_database, src]]) +]]> + + + 0) IS TRUE AS $f3]) + +- TableSourceScan(table=[[default_catalog, default_database, src, project=[a, c, b], metadata=[]]], fields=[a, c, b]) +]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +