Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
lincoln-lil committed Jul 12, 2024
1 parent f50397a commit 7f0b565
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class StreamPhysicalWindowAggregate(

if (windowing.isInstanceOf[WindowAttachedWindowingStrategy] && windowing.isProctime) {
throw new TableException(
"Non-mergeable processing time window tvf aggregation is invalid, is should fallback to group " +
"Non-mergeable processing time window tvf aggregation is invalid, should fallback to group " +
"aggregation instead. This is a bug and should not happen. Please file an issue.")
}
new StreamExecWindowAggregate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.flink.table.planner.functions.sql.{FlinkSqlOperatorTable, SqlW
import org.apache.flink.table.planner.plan.`trait`.RelWindowProperties
import org.apache.flink.table.planner.plan.logical._
import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalAggregate, FlinkLogicalCorrelate, FlinkLogicalIntersect, FlinkLogicalJoin, FlinkLogicalMatch, FlinkLogicalMinus, FlinkLogicalOverAggregate, FlinkLogicalRank, FlinkLogicalTableFunctionScan, FlinkLogicalUnion}
import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalAggregate, FlinkLogicalMatch, FlinkLogicalOverAggregate, FlinkLogicalRank, FlinkLogicalTableFunctionScan}
import org.apache.flink.table.planner.plan.utils.AggregateUtil.inferAggAccumulatorNames
import org.apache.flink.table.planner.plan.utils.WindowEmitStrategy.{TABLE_EXEC_EMIT_EARLY_FIRE_ENABLED, TABLE_EXEC_EMIT_LATE_FIRE_ENABLED}
import org.apache.flink.table.planner.typeutils.RowTypeUtils
Expand All @@ -35,11 +35,11 @@ import org.apache.flink.table.types.logical.utils.LogicalTypeChecks.canBeTimeAtt

import org.apache.calcite.plan.volcano.RelSubset
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.{RelNode, RelVisitor}
import org.apache.calcite.rel.core._
import org.apache.calcite.rel.{BiRel, RelNode, RelVisitor}
import org.apache.calcite.rex._
import org.apache.calcite.sql.`type`.SqlTypeFamily
import org.apache.calcite.sql.SqlKind
import org.apache.calcite.sql.`type`.SqlTypeFamily
import org.apache.calcite.util.{ImmutableBitSet, Util}

import java.time.Duration
Expand Down Expand Up @@ -332,7 +332,8 @@ object WindowUtil {

/**
* For rowtime window, return true if the given aggregate grouping contains window start and end.
* For proctime window, we should also check if it exists a neighbour windowTableFunctionCall.
* For proctime window, we should also check if it exists a neighbour windowTableFunctionCall and
* doesn't exist any [[RexCall]] on window time columns.
*
* If the window is a session window, we should also check if the partition keys are the same as
* the group keys. See more at [[WindowUtil.validGroupKeyPartitionKey()]].
Expand Down Expand Up @@ -421,17 +422,14 @@ object WindowUtil {
windowProperties: RelWindowProperties,
fmq: FlinkRelMetadataQuery,
agg: FlinkLogicalAggregate): Boolean = {
var existNeighbourWindowTableFunc = false
val calcMatcher = new CalcWindowFunctionScanMatcher
try {
calcMatcher.go(agg.getInput(0))
} catch {
case r: Util.FoundOne =>
r.getNode match {
case _: Some[_] =>
existNeighbourWindowTableFunc = true
case _ => // do nothing
}
case _: Throwable => // do nothing
}
if (!calcMatcher.existNeighbourWindowTableFunc) {
return false
}
var existCallOnWindowColumns = calcMatcher.calcNodes.nonEmpty &&
calcMatcher.calcNodes.exists(calc => calcContainsCallsOnWindowColumns(calc, fmq))
Expand All @@ -441,14 +439,13 @@ object WindowUtil {
existCallOnWindowColumns = existCallOnWindowColumns || !agg.getAggCallList.forall {
call => aggInputWindowProps.intersect(ImmutableBitSet.of(call.getArgList)).isEmpty
}

// proctime window can's support calculation on window columns before aggregation,
// and need to check if there is a neighbour windowTableFunctionCall
!existCallOnWindowColumns && existNeighbourWindowTableFunc
// proctime tvf window can't support calculation on window columns before aggregation
!existCallOnWindowColumns
}

private class CalcWindowFunctionScanMatcher extends RelVisitor {
val calcNodes: ListBuffer[Calc] = ListBuffer[Calc]()
var existNeighbourWindowTableFunc = false

override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
node match {
Expand All @@ -458,18 +455,19 @@ object WindowUtil {
super.visit(calc, 0, parent)
case scan: FlinkLogicalTableFunctionScan =>
if (WindowUtil.isWindowTableFunctionCall(scan.getCall)) {
throw new Util.FoundOne(Some(0))
existNeighbourWindowTableFunc = true
// stop visiting
throw new Util.FoundOne
}
case rss: RelSubset =>
val innerRel = Option.apply(rss.getBest).getOrElse(rss.getOriginal)
// special case doesn't call super.visit for RelSubSet because it has no children
visit(innerRel, 0, rss)
case _: FlinkLogicalAggregate | _: FlinkLogicalCorrelate | _: FlinkLogicalIntersect |
_: FlinkLogicalJoin | _: FlinkLogicalMatch | _: FlinkLogicalMinus |
_: FlinkLogicalOverAggregate | _: FlinkLogicalRank | _: FlinkLogicalUnion =>
// proctime attribute comes from these operators can not be used directly for proctime
// window aggregate, so further traversal of child nodes is unnecessary
throw new Util.FoundOne(Option.empty)
case _: FlinkLogicalAggregate | _: FlinkLogicalMatch | _: FlinkLogicalOverAggregate |
_: FlinkLogicalRank | _: BiRel | _: SetOp =>
// proctime attribute comes from these operators can't be used directly for proctime
// window aggregate, so further tree walk is unnecessary
throw new Util.FoundOne
case _ =>
// continue to visit children
super.visit(node, ordinal, parent)
Expand Down

0 comments on commit 7f0b565

Please sign in to comment.