Skip to content

Commit

Permalink
fix all uts
Browse files Browse the repository at this point in the history
  • Loading branch information
lincoln-lil committed Sep 24, 2024
1 parent 4764be8 commit 592ebf7
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 3,569 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2])
== Optimized Physical Plan ==
Calc(select=[a, b, c])
+- Deduplicate(keep=[FirstRow], key=[a], order=[ROWTIME])
+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[ROWTIME rowtime ASC], select=[a, b, c, rowtime])
+- Exchange(distribution=[hash[a]])
+- Calc(select=[a, b, c, rowtime])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
Expand Down Expand Up @@ -377,7 +377,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2])
== Optimized Physical Plan ==
Calc(select=[a, b, c])
+- Deduplicate(keep=[FirstRow], key=[a], order=[ROWTIME])
+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[ROWTIME rowtime ASC], select=[a, b, c, rowtime])
+- Exchange(distribution=[hash[a]])
+- Calc(select=[a, b, c, rowtime])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ Calc(select=[amount, currency, rowtime, PROCTIME_MATERIALIZE(proctime) AS procti
: +- Calc(select=[amount, currency, rowtime, PROCTIME() AS proctime], changelogMode=[I])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, Orders, source: [CollectionTableSource(amount, currency, rowtime)]]], fields=[amount, currency, rowtime], changelogMode=[I])
+- Exchange(distribution=[hash[currency]], changelogMode=[I,UA,D])
+- Deduplicate(keep=[LastRow], key=[currency], order=[ROWTIME], changelogMode=[I,UA,D])
+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[currency], orderBy=[ROWTIME rowtime DESC], select=[currency, rate, rowtime], changelogMode=[I,UA,D])
+- Exchange(distribution=[hash[currency]], changelogMode=[I])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime], changelogMode=[I])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, ratesHistory, source: [CollectionTableSource(currency, rate, rowtime)]]], fields=[currency, rate, rowtime], changelogMode=[I])
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ Calc(select=[w$start AS EXPR$0, cnt], changelogMode=[I])
+- GroupWindowAggregate(window=[TumblingGroupWindow('w$, rowtime, 1000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) AS cnt, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime], changelogMode=[I])
+- Exchange(distribution=[single], changelogMode=[I,UB,UA,D])
+- Calc(select=[rowtime], changelogMode=[I,UB,UA,D])
+- Deduplicate(keep=[LastRow], key=[a], order=[ROWTIME], changelogMode=[I,UB,UA,D])
+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[ROWTIME rowtime DESC], select=[a, rowtime], changelogMode=[I,UB,UA,D])
+- Exchange(distribution=[hash[a]], changelogMode=[I])
+- Calc(select=[a, rowtime], changelogMode=[I])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime], changelogMode=[I])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2454,7 +2454,7 @@ LogicalProject(c=[$2], EXPR$1=[$4])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
]]>
</Resource>
<Resource name="optimized rel plan">
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[c, EXPR$1])
+- GroupAggregate(groupBy=[window_start, window_end, c, window_time], select=[window_start, window_end, c, window_time, COUNT_RETRACT(a) AS EXPR$1])
Expand All @@ -2464,7 +2464,7 @@ Calc(select=[c, EXPR$1])
+- Exchange(distribution=[hash[c]])
+- Calc(select=[a, c, proctime, window_start, window_end, window_time])
+- WindowTableFunction(window=[CUMULATE(time_col=[proctime], max_size=[5 min], step=[10 s])])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])
+- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS proctime])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e, rowtime])
]]>
Expand Down Expand Up @@ -2497,7 +2497,7 @@ LogicalProject(c=[$2], EXPR$1=[$4])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
]]>
</Resource>
<Resource name="optimized rel plan">
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[c, EXPR$1])
+- GroupAggregate(groupBy=[window_start, window_end, c, window_time], select=[window_start, window_end, c, window_time, COUNT_RETRACT(a) AS EXPR$1])
Expand All @@ -2507,7 +2507,7 @@ Calc(select=[c, EXPR$1])
+- Exchange(distribution=[hash[c]])
+- Calc(select=[a, c, proctime, window_start, window_end, window_time])
+- WindowTableFunction(window=[CUMULATE(time_col=[proctime], max_size=[5 min], step=[10 s])])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])
+- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS proctime])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e, rowtime])
]]>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1626,68 +1626,80 @@ class NonDeterministicDagTest(nonDeterministicUpdateStrategy: NonDeterministicUp

@TestTemplate
def testProctimeDedupOnCdcWithMetadataSinkWithPk(): Unit = {
// TODO this should be updated after StreamPhysicalDeduplicate supports consuming update
assertThatThrownBy(
() =>
util.verifyExecPlanInsert(
"""
|insert into sink_with_pk
|SELECT a, metadata_3, c
|FROM (
| SELECT *,
| ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as rowNum
| FROM cdc_with_meta
|)
|WHERE rowNum = 1
""".stripMargin))
.hasMessageContaining(
"StreamPhysicalDeduplicate doesn't support consuming update and delete changes")
.isInstanceOf[TableException]
// now deduplicate query with updating will translate to retract rank
val callable: ThrowingCallable = () =>
util.verifyExecPlanInsert(
"""
|insert into sink_with_pk
|SELECT a, metadata_3, c
|FROM (
| SELECT *,
| ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as rowNum
| FROM cdc_with_meta
|)
|WHERE rowNum = 1
""".stripMargin)

if (tryResolve) {
assertThatThrownBy(callable)
.hasMessageContaining(
"The column(s): $7(generated by non-deterministic function: PROCTIME ) can not satisfy the determinism requirement")
.isInstanceOf[TableException]
} else {
assertThatCode(callable).doesNotThrowAnyException()
}
}

@TestTemplate
def testProctimeDedupOnCdcWithMetadataSinkWithoutPk(): Unit = {
// TODO this should be updated after StreamPhysicalDeduplicate supports consuming update
assertThatThrownBy(
() =>
util.verifyExecPlanInsert(
"""
|insert into sink_without_pk
|SELECT a, metadata_3, c
|FROM (
| SELECT *,
| ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as rowNum
| FROM cdc_with_meta
|)
|WHERE rowNum = 1
""".stripMargin
))
.hasMessageContaining(
"StreamPhysicalDeduplicate doesn't support consuming update and delete changes")
.isInstanceOf[TableException]
// now deduplicate query with updating will translate to retract rank
val callable: ThrowingCallable = () =>
util.verifyExecPlanInsert(
"""
|insert into sink_without_pk
|SELECT a, metadata_3, c
|FROM (
| SELECT *,
| ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as rowNum
| FROM cdc_with_meta
|)
|WHERE rowNum = 1
""".stripMargin)

if (tryResolve) {
assertThatThrownBy(callable)
.hasMessageContaining(
"The column(s): $7(generated by non-deterministic function: PROCTIME ) can not satisfy the determinism requirement")
.isInstanceOf[TableException]
} else {
assertThatCode(callable).doesNotThrowAnyException()
}
}

@TestTemplate
def testRowtimeDedupOnCdcWithMetadataSinkWithPk(): Unit = {
// TODO this should be updated after StreamPhysicalDeduplicate supports consuming update
assertThatThrownBy(
() =>
util.verifyExecPlanInsert(
"""
|insert into sink_with_pk
|SELECT a, b, c
|FROM (
| SELECT *,
| ROW_NUMBER() OVER (PARTITION BY a ORDER BY op_ts ASC) as rowNum
| FROM cdc_with_meta_and_wm
|)
|WHERE rowNum = 1
""".stripMargin
))
.hasMessageContaining(
"StreamPhysicalDeduplicate doesn't support consuming update and delete changes")
.isInstanceOf[TableException]
// now deduplicate query with updating will translate to retract rank
val callable: ThrowingCallable = () =>
util.verifyExecPlanInsert(
"""
|insert into sink_with_pk
|SELECT a, b, c
|FROM (
| SELECT *,
| ROW_NUMBER() OVER (PARTITION BY a ORDER BY op_ts ASC) as rowNum
| FROM cdc_with_meta_and_wm
|)
|WHERE rowNum = 1
""".stripMargin)

if (tryResolve) {
assertThatThrownBy(callable)
.hasMessageContaining(
"The metadata column(s): 'op_ts' in cdc source may cause wrong result or error on downstream operators")
.isInstanceOf[TableException]
} else {
assertThatCode(callable).doesNotThrowAnyException()
}
}

@TestTemplate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1684,7 +1684,7 @@ class WindowAggregateTest(aggPhaseEnforcer: AggregatePhaseStrategy) extends Tabl

@TestTemplate
def testProctimeWindowTVFWithDedupWhenCantMerge(): Unit = {
util.verifyRelPlan(
util.verifyExecPlan(
"""
|select c, count(a)
|from (
Expand Down

0 comments on commit 592ebf7

Please sign in to comment.