Skip to content

Commit

Permalink
[FLINK-36095][table-planner] KeyedLookupJoinWrapper should shuffle by…
Browse files Browse the repository at this point in the history
… input upsertKey instead of join key to avoid changelog disordering
  • Loading branch information
lincoln-lil committed Aug 19, 2024
1 parent adb8368 commit c4fa1a2
Show file tree
Hide file tree
Showing 9 changed files with 144 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.planner.plan.utils.LookupJoinUtil;
import org.apache.flink.table.runtime.keyselector.EmptyRowDataKeySelector;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.join.lookup.KeyedLookupJoinWrapper;
Expand All @@ -62,10 +61,11 @@

import javax.annotation.Nullable;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.PARTITIONER_TRANSFORMATION;

Expand All @@ -84,6 +84,7 @@ public class StreamExecLookupJoin extends CommonExecLookupJoin
"lookupKeyContainsPrimaryKey";

public static final String STATE_NAME = "lookupJoinState";
public static final String FIELD_NAME_INPUT_UPSERT_KEY = "inputUpsertKey";

@JsonProperty(FIELD_NAME_LOOKUP_KEY_CONTAINS_PRIMARY_KEY)
private final boolean lookupKeyContainsPrimaryKey;
Expand All @@ -97,6 +98,11 @@ public class StreamExecLookupJoin extends CommonExecLookupJoin
@JsonInclude(JsonInclude.Include.NON_NULL)
private final List<StateMetadata> stateMetadataList;

@Nullable
@JsonProperty(FIELD_NAME_INPUT_UPSERT_KEY)
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
private final int[] inputUpsertKey;

public StreamExecLookupJoin(
ReadableConfig tableConfig,
FlinkJoinType joinType,
Expand All @@ -111,6 +117,7 @@ public StreamExecLookupJoin(
@Nullable LookupJoinUtil.AsyncLookupOptions asyncLookupOptions,
@Nullable LookupJoinUtil.RetryLookupOptions retryOptions,
ChangelogMode inputChangelogMode,
@Nullable int[] inputUpsertKey,
InputProperty inputProperty,
RowType outputType,
String description) {
Expand All @@ -130,6 +137,7 @@ public StreamExecLookupJoin(
asyncLookupOptions,
retryOptions,
inputChangelogMode,
inputUpsertKey,
// serialize state meta only when upsert materialize is enabled
upsertMaterialize
? StateMetadata.getOneInputOperatorDefaultMeta(tableConfig, STATE_NAME)
Expand Down Expand Up @@ -164,6 +172,7 @@ public StreamExecLookupJoin(
LookupJoinUtil.RetryLookupOptions retryOptions,
@JsonProperty(FIELD_NAME_INPUT_CHANGELOG_MODE) @Nullable
ChangelogMode inputChangelogMode,
@JsonProperty(FIELD_NAME_INPUT_UPSERT_KEY) @Nullable int[] inputUpsertKey,
@JsonProperty(FIELD_NAME_STATE) @Nullable List<StateMetadata> stateMetadataList,
@JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
@JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
Expand All @@ -187,11 +196,11 @@ public StreamExecLookupJoin(
description);
this.lookupKeyContainsPrimaryKey = lookupKeyContainsPrimaryKey;
this.upsertMaterialize = upsertMaterialize;
this.inputUpsertKey = inputUpsertKey;
this.stateMetadataList = stateMetadataList;
}

@Override
@SuppressWarnings("unchecked")
public Transformation<RowData> translateToPlanInternal(
PlannerBase planner, ExecNodeConfig config) {
return createJoinTransformation(
Expand Down Expand Up @@ -246,39 +255,28 @@ protected Transformation<RowData> createSyncLookupJoinWithState(
KeyedProcessOperator<RowData, RowData, RowData> operator =
new KeyedProcessOperator<>(keyedLookupJoinWrapper);

List<Integer> refKeys =
allLookupKeys.values().stream()
.filter(key -> key instanceof LookupJoinUtil.FieldRefLookupKey)
.map(key -> ((LookupJoinUtil.FieldRefLookupKey) key).index)
.collect(Collectors.toList());
RowDataKeySelector keySelector;

// use single parallelism for empty key shuffle
boolean singleParallelism = refKeys.isEmpty();
if (singleParallelism) {
// all lookup keys are constants, then use an empty key selector
keySelector = EmptyRowDataKeySelector.INSTANCE;
int[] shuffleKeys;
if (inputUpsertKey == null || inputUpsertKey.length == 0) {
// input has no upsertKeys, then use all columns for key selector
shuffleKeys = IntStream.range(0, inputRowType.getFieldCount()).toArray();
} else {
shuffleKeys = inputUpsertKey;
// make it a deterministic asc order
Collections.sort(refKeys);
keySelector =
KeySelectorUtil.getRowDataSelector(
classLoader,
refKeys.stream().mapToInt(Integer::intValue).toArray(),
InternalTypeInfo.of(inputRowType));
Arrays.sort(shuffleKeys);
}

RowDataKeySelector keySelector;
keySelector =
KeySelectorUtil.getRowDataSelector(
classLoader, shuffleKeys, InternalTypeInfo.of(inputRowType));
final KeyGroupStreamPartitioner<RowData, RowData> partitioner =
new KeyGroupStreamPartitioner<>(
keySelector, KeyGroupRangeAssignment.DEFAULT_LOWER_BOUND_MAX_PARALLELISM);
Transformation<RowData> partitionedTransform =
new PartitionTransformation<>(inputTransformation, partitioner);
createTransformationMeta(PARTITIONER_TRANSFORMATION, "Partitioner", "Partitioner", config)
.fill(partitionedTransform);
if (singleParallelism) {
setSingletonParallelism(partitionedTransform);
} else {
partitionedTransform.setParallelism(inputTransformation.getParallelism(), false);
}
partitionedTransform.setParallelism(inputTransformation.getParallelism(), false);

OneInputTransformation<RowData, RowData> transform =
ExecNodeUtil.createOneInputTransformation(
Expand All @@ -290,14 +288,6 @@ protected Transformation<RowData> createSyncLookupJoinWithState(
false);
transform.setStateKeySelector(keySelector);
transform.setStateKeyType(keySelector.getProducedType());
if (singleParallelism) {
setSingletonParallelism(transform);
}
return transform;
}

private void setSingletonParallelism(Transformation<RowData> transformation) {
transformation.setParallelism(1);
transformation.setMaxParallelism(1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.util.Optional;
import java.util.Set;

/**
Expand All @@ -42,8 +43,17 @@ public class UpsertKeyUtil {
*/
@Nonnull
public static int[] getSmallestKey(@Nullable Set<ImmutableBitSet> upsertKeys) {
return smallestKey(upsertKeys).orElse(new int[0]);
}

/**
* Returns the smallest key of given upsert keys wrapped with a java {@link Optional}. Different
* from {@link #getSmallestKey(Set)}, it'll return result with an empty {@link Optional} if the
* input set is null or empty.
*/
public static Optional<int[]> smallestKey(@Nullable Set<ImmutableBitSet> upsertKeys) {
if (null == upsertKeys || upsertKeys.isEmpty()) {
return new int[0];
return Optional.empty();
}
return upsertKeys.stream()
.map(ImmutableBitSet::toArray)
Expand All @@ -60,7 +70,6 @@ public static int[] getSmallestKey(@Nullable Set<ImmutableBitSet> upsertKeys) {
}
}
return k2;
})
.get();
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,22 @@
package org.apache.flink.table.planner.plan.nodes.physical.stream

import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
import org.apache.flink.table.planner.plan.nodes.exec.spec.TemporalTableSourceSpec
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLookupJoin
import org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalLookupJoin
import org.apache.flink.table.planner.plan.utils.{FlinkRexUtil, JoinTypeUtil}
import org.apache.flink.table.planner.plan.utils.{FlinkRexUtil, JoinTypeUtil, UpsertKeyUtil}
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil

import org.apache.calcite.plan.{RelOptCluster, RelOptTable, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.{RelNode, RelWriter}
import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
import org.apache.calcite.rel.hint.RelHint
import org.apache.calcite.rex.RexProgram

import java.util
import java.util.Optional

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -111,8 +113,25 @@ class StreamPhysicalLookupJoin(
asyncOptions.orNull,
retryOptions.orNull,
inputChangelogMode,
getUpsertKey.orElse(null),
InputProperty.DEFAULT,
FlinkTypeFactory.toLogicalRowType(getRowType),
getRelDetailedDescription)
}

override def explainTerms(pw: RelWriter): RelWriter = {
val upsertKey = getUpsertKey
super
.explainTerms(pw)
.itemIf("upsertKey", util.Arrays.toString(upsertKey.orElse(null)), upsertKey.isPresent)
}

private def getUpsertKey: Optional[Array[Int]] = {
// no need to call getUpsertKeysInKeyGroupRange here because there's no exchange before lookup
// join, and only add exchange inside the xxExecLookupJoin node.
val inputUpsertKeys = FlinkRelMetadataQuery
.reuseOrCreate(cluster.getMetadataQuery)
.getUpsertKeys(inputRel)
UpsertKeyUtil.smallestKey(inputUpsertKeys)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
"type" : "LookupJoin[6]",
"pact" : "Operator",
"contents" : "[6]:LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[LeftOuterJoin], lookup=[id=100], where=[(id = 100)], select=[a, name, age], upsertMaterialize=[true])",
"parallelism" : 1,
"parallelism" : 4,
"predecessors" : [ {
"id" : 8,
"ship_strategy" : "HASH",
Expand All @@ -96,7 +96,7 @@ Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
"type" : "Sink: Sink1[7]",
"pact" : "Data Sink",
"contents" : "[7]:Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])",
"parallelism" : 1,
"parallelism" : 4,
"predecessors" : [ {
"id" : 10,
"ship_strategy" : "FORWARD",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
"type" : "LookupJoin[]",
"pact" : "Operator",
"contents" : "[]:LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[LeftOuterJoin], lookup=[id=100], where=[(id = 100)], select=[a, name, age], upsertMaterialize=[true])",
"parallelism" : 1,
"parallelism" : 4,
"predecessors" : [ {
"id" : ,
"ship_strategy" : "HASH",
Expand All @@ -96,7 +96,7 @@ Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
"type" : "Sink: Sink1[]",
"pact" : "Data Sink",
"contents" : "[]:Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])",
"parallelism" : 1,
"parallelism" : 4,
"predecessors" : [ {
"id" : ,
"ship_strategy" : "FORWARD",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ on t1.a = t2.a and ndFunc(t2.b) > 100]]>
<![CDATA[
Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, c])
+- Calc(select=[a, b, c])
+- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], where=[>(ndFunc(b), 100)], select=[a, b, c, a])
+- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], where=[>(ndFunc(b), 100)], select=[a, b, c, a], upsertKey=[[0]])
+- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, b, c], metadata=[]]], fields=[a, b, c])
advice[1]: [WARNING] There exists non deterministic function: 'ndFunc' in condition: '>(ndFunc($1), 100)' which may cause wrong result in update pipeline.
related rel plan:
LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], where=[>(ndFunc(b), 100)], select=[a, b, c, a], changelogMode=[I,UB,UA,D])
LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], where=[>(ndFunc(b), 100)], select=[a, b, c, a], upsertKey=[[0]], changelogMode=[I,UB,UA,D])
+- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, b, c], metadata=[]]], fields=[a, b, c], changelogMode=[I,UB,UA,D], upsertKeys=[[a]])
Expand Down Expand Up @@ -72,7 +72,7 @@ No available advice...
<![CDATA[
Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, c])
+- Calc(select=[a, b, c])
+- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], select=[a, c, a0, b])
+- LookupJoin(table=[default_catalog.default_database.dim_with_pk], joinType=[InnerJoin], lookup=[a=a], select=[a, c, a0, b], upsertKey=[[0]])
+- TableSourceScan(table=[[default_catalog, default_database, cdc, project=[a, c], metadata=[]]], fields=[a, c])
advice[1]: [WARNING] You might want to enable upsert materialization for look up join operator by configuring ('table.optimizer.non-deterministic-update.strategy' to 'TRY_RESOLVE') to resolve the correctness issue caused by 'Non-Deterministic Updates' (NDU) in a changelog pipeline.
Expand Down
Loading

0 comments on commit c4fa1a2

Please sign in to comment.