diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWindowAggregate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWindowAggregate.scala index 47eed0a67fd1f..9bb859be10ba1 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWindowAggregate.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWindowAggregate.scala @@ -116,7 +116,7 @@ class StreamPhysicalWindowAggregate( if (windowing.isInstanceOf[WindowAttachedWindowingStrategy] && windowing.isProctime) { throw new TableException( - "Non-mergeable processing time window tvf aggregation is invalid, is should fallback to group " + + "Non-mergeable processing time window tvf aggregation is invalid, should fallback to group " + "aggregation instead. This is a bug and should not happen. Please file an issue.") } new StreamExecWindowAggregate( diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala index 9a72028086920..606b669fec929 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala @@ -24,7 +24,7 @@ import org.apache.flink.table.planner.functions.sql.{FlinkSqlOperatorTable, SqlW import org.apache.flink.table.planner.plan.`trait`.RelWindowProperties import org.apache.flink.table.planner.plan.logical._ import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery -import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalAggregate, FlinkLogicalCorrelate, FlinkLogicalIntersect, FlinkLogicalJoin, FlinkLogicalMatch, FlinkLogicalMinus, FlinkLogicalOverAggregate, FlinkLogicalRank, FlinkLogicalTableFunctionScan, FlinkLogicalUnion} +import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalAggregate, FlinkLogicalMatch, FlinkLogicalOverAggregate, FlinkLogicalRank, FlinkLogicalTableFunctionScan} import org.apache.flink.table.planner.plan.utils.AggregateUtil.inferAggAccumulatorNames import org.apache.flink.table.planner.plan.utils.WindowEmitStrategy.{TABLE_EXEC_EMIT_EARLY_FIRE_ENABLED, TABLE_EXEC_EMIT_LATE_FIRE_ENABLED} import org.apache.flink.table.planner.typeutils.RowTypeUtils @@ -35,11 +35,11 @@ import org.apache.flink.table.types.logical.utils.LogicalTypeChecks.canBeTimeAtt import org.apache.calcite.plan.volcano.RelSubset import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.{RelNode, RelVisitor} import org.apache.calcite.rel.core._ +import org.apache.calcite.rel.{BiRel, RelNode, RelVisitor} import org.apache.calcite.rex._ -import org.apache.calcite.sql.`type`.SqlTypeFamily import org.apache.calcite.sql.SqlKind +import org.apache.calcite.sql.`type`.SqlTypeFamily import org.apache.calcite.util.{ImmutableBitSet, Util} import java.time.Duration @@ -332,7 +332,8 @@ object WindowUtil { /** * For rowtime window, return true if the given aggregate grouping contains window start and end. - * For proctime window, we should also check if it exists a neighbour windowTableFunctionCall. + * For proctime window, we should also check if it exists a neighbour windowTableFunctionCall and + * doesn't exist any [[RexCall]] on window time columns. * * If the window is a session window, we should also check if the partition keys are the same as * the group keys. See more at [[WindowUtil.validGroupKeyPartitionKey()]]. @@ -421,17 +422,14 @@ object WindowUtil { windowProperties: RelWindowProperties, fmq: FlinkRelMetadataQuery, agg: FlinkLogicalAggregate): Boolean = { - var existNeighbourWindowTableFunc = false val calcMatcher = new CalcWindowFunctionScanMatcher try { calcMatcher.go(agg.getInput(0)) } catch { - case r: Util.FoundOne => - r.getNode match { - case _: Some[_] => - existNeighbourWindowTableFunc = true - case _ => // do nothing - } + case _: Throwable => // do nothing + } + if (!calcMatcher.existNeighbourWindowTableFunc) { + return false } var existCallOnWindowColumns = calcMatcher.calcNodes.nonEmpty && calcMatcher.calcNodes.exists(calc => calcContainsCallsOnWindowColumns(calc, fmq)) @@ -441,14 +439,13 @@ object WindowUtil { existCallOnWindowColumns = existCallOnWindowColumns || !agg.getAggCallList.forall { call => aggInputWindowProps.intersect(ImmutableBitSet.of(call.getArgList)).isEmpty } - - // proctime window can's support calculation on window columns before aggregation, - // and need to check if there is a neighbour windowTableFunctionCall - !existCallOnWindowColumns && existNeighbourWindowTableFunc + // proctime tvf window can't support calculation on window columns before aggregation + !existCallOnWindowColumns } private class CalcWindowFunctionScanMatcher extends RelVisitor { val calcNodes: ListBuffer[Calc] = ListBuffer[Calc]() + var existNeighbourWindowTableFunc = false override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = { node match { @@ -458,18 +455,19 @@ object WindowUtil { super.visit(calc, 0, parent) case scan: FlinkLogicalTableFunctionScan => if (WindowUtil.isWindowTableFunctionCall(scan.getCall)) { - throw new Util.FoundOne(Some(0)) + existNeighbourWindowTableFunc = true + // stop visiting + throw new Util.FoundOne } case rss: RelSubset => val innerRel = Option.apply(rss.getBest).getOrElse(rss.getOriginal) // special case doesn't call super.visit for RelSubSet because it has no children visit(innerRel, 0, rss) - case _: FlinkLogicalAggregate | _: FlinkLogicalCorrelate | _: FlinkLogicalIntersect | - _: FlinkLogicalJoin | _: FlinkLogicalMatch | _: FlinkLogicalMinus | - _: FlinkLogicalOverAggregate | _: FlinkLogicalRank | _: FlinkLogicalUnion => - // proctime attribute comes from these operators can not be used directly for proctime - // window aggregate, so further traversal of child nodes is unnecessary - throw new Util.FoundOne(Option.empty) + case _: FlinkLogicalAggregate | _: FlinkLogicalMatch | _: FlinkLogicalOverAggregate | + _: FlinkLogicalRank | _: BiRel | _: SetOp => + // proctime attribute comes from these operators can't be used directly for proctime + // window aggregate, so further tree walk is unnecessary + throw new Util.FoundOne case _ => // continue to visit children super.visit(node, ordinal, parent)