Skip to content

Commit

Permalink
[HUDI-8413] Change the default merge mode to COMMIT_TIME_ORDERING (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
yihua authored Jan 28, 2025
1 parent c9f4a0f commit 4bea55a
Show file tree
Hide file tree
Showing 28 changed files with 329 additions and 214 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public class HoodieWriteConfig extends HoodieConfig {

public static final ConfigProperty<String> PRECOMBINE_FIELD_NAME = ConfigProperty
.key("hoodie.datasource.write.precombine.field")
.defaultValue("ts")
.noDefaultValue()
.withDocumentation("Field used in preCombining before actual write. When two records have the same key value, "
+ "we will pick the one with the largest value for the precombine field, determined by Object.compareTo(..)");

Expand All @@ -181,7 +181,7 @@ public class HoodieWriteConfig extends HoodieConfig {
// This ConfigProperty is also used in SQL options which expect String type
public static final ConfigProperty<String> RECORD_MERGE_MODE = ConfigProperty
.key("hoodie.write.record.merge.mode")
.defaultValue(RecordMergeMode.EVENT_TIME_ORDERING.name())
.noDefaultValue("COMMIT_TIME_ORDERING if ordering field is not set; EVENT_TIME_ORDERING if ordering field is set")
.sinceVersion("1.0.0")
.withDocumentation(RecordMergeMode.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,9 @@ static void unsetInitialVersion(HoodieTableConfig tableConfig, Map<ConfigPropert

static void unsetRecordMergeMode(HoodieTableConfig tableConfig, Map<ConfigProperty, String> tablePropsToAdd) {
Triple<RecordMergeMode, String, String> mergingConfigs =
HoodieTableConfig.inferCorrectMergingBehavior(tableConfig.getRecordMergeMode(), tableConfig.getPayloadClass(), tableConfig.getRecordMergeStrategyId());
HoodieTableConfig.inferCorrectMergingBehavior(
tableConfig.getRecordMergeMode(), tableConfig.getPayloadClass(),
tableConfig.getRecordMergeStrategyId(), tableConfig.getPreCombineField());
if (StringUtils.nonEmpty(mergingConfigs.getMiddle())) {
tablePropsToAdd.put(HoodieTableConfig.PAYLOAD_CLASS_NAME, mergingConfigs.getMiddle());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class DefaultSparkRecordMerger extends HoodieSparkRecordMerger {

@Override
public String getMergingStrategy() {
return HoodieRecordMerger.DEFAULT_MERGE_STRATEGY_UUID;
return HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,6 @@ public <T> ConfigProperty<T> defaultValue(T value) {
}

public <T> ConfigProperty<T> defaultValue(T value, String docOnDefaultValue) {
Objects.requireNonNull(value);
Objects.requireNonNull(docOnDefaultValue);
ConfigProperty<T> configProperty = new ConfigProperty<>(key, value, docOnDefaultValue, "", Option.empty(), Option.empty(), Option.empty(), Collections.emptySet(), false);
return configProperty;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.PublicAPIClass;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.table.HoodieTableConfig;
Expand All @@ -35,6 +36,8 @@
import java.util.Arrays;
import java.util.List;

import static org.apache.hudi.common.util.StringUtils.nonEmpty;

/**
* HoodieMerge defines how to merge two records. It is a stateless component.
* It can implement the merging logic of HoodieRecord of different engines
Expand All @@ -44,7 +47,7 @@
public interface HoodieRecordMerger extends Serializable {

// Uses event time ordering to determine which record is chosen
String DEFAULT_MERGE_STRATEGY_UUID = "eeb8d96f-b1e4-49fd-bbf8-28ac514178e5";
String EVENT_TIME_BASED_MERGE_STRATEGY_UUID = "eeb8d96f-b1e4-49fd-bbf8-28ac514178e5";

// Always chooses the most recently written record
String COMMIT_TIME_BASED_MERGE_STRATEGY_UUID = "ce9acb64-bde0-424c-9b91-f6ebba25356d";
Expand Down Expand Up @@ -183,4 +186,23 @@ default String[] getMandatoryFieldsForMerging(Schema dataSchema, HoodieTableConf
*/
String getMergingStrategy();

static String getRecordMergeStrategyId(RecordMergeMode mergeMode,
String payloadClassName,
String recordMergeStrategyId) {
switch (mergeMode) {
case COMMIT_TIME_ORDERING:
return COMMIT_TIME_BASED_MERGE_STRATEGY_UUID;
case EVENT_TIME_ORDERING:
return EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
case CUSTOM:
default:
if (nonEmpty(recordMergeStrategyId)) {
return recordMergeStrategyId;
}
if (nonEmpty(payloadClassName)) {
return PAYLOAD_BASED_MERGE_STRATEGY_UUID;
}
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,16 @@ default Comparable<?> getOrderingValue() {
return 0;
}

static String getAvroPayloadForMergeMode(RecordMergeMode mergeMode) {
static String getAvroPayloadForMergeMode(RecordMergeMode mergeMode, String payloadClassName) {
switch (mergeMode) {
//TODO: After we have merge mode working for writing, we should have a dummy payload that will throw exception when used
default:
case EVENT_TIME_ORDERING:
return DefaultHoodieRecordPayload.class.getName();
case COMMIT_TIME_ORDERING:
return OverwriteWithLatestAvroPayload.class.getName();
case CUSTOM:
default:
return payloadClassName;
}
}

Expand Down
Loading

0 comments on commit 4bea55a

Please sign in to comment.