Skip to content

Commit

Permalink
[HUDI-8850] Fix record merge mode related issues and improve test cov…
Browse files Browse the repository at this point in the history
…erage in Spark SQL (#12725)

This PR fixes record merge mode issues and improves test coverage in Spark SQL:

- Table version 6 lacks record merge mode in the config. The file group reader now infers it if missing, enabling v6 table reads.
- For INSERT INTO in Spark SQL, buildHoodieInsertConfig didn't set record merge mode properly. The logic is now removed.
- The SQL writer now gets the correct record merge mode from the table config automatically.
- In MERGE INTO in Spark SQL, the record merge mode check is fixed to avoid NPE.
- More tests are added in TestMergeModeCommitTimeOrdering and TestMergeModeEventTimeOrdering for different merge modes. The tests cover both table version 6 and 8.
  • Loading branch information
yihua authored Jan 30, 2025
1 parent b44e19c commit 27a950e
Show file tree
Hide file tree
Showing 7 changed files with 448 additions and 247 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordReader;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.CachingIterator;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.EmptyIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.collection.Triple;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.storage.HoodieStorage;
Expand Down Expand Up @@ -102,9 +104,17 @@ public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
this.start = start;
this.length = length;
HoodieTableConfig tableConfig = hoodieTableMetaClient.getTableConfig();
RecordMergeMode recordMergeMode = tableConfig.getRecordMergeMode();
String mergeStrategyId = tableConfig.getRecordMergeStrategyId();
if (!tableConfig.getTableVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
Triple<RecordMergeMode, String, String> triple = HoodieTableConfig.inferCorrectMergingBehavior(
recordMergeMode, tableConfig.getPayloadClass(),
mergeStrategyId, null);
recordMergeMode = triple.getLeft();
mergeStrategyId = triple.getRight();
}
readerContext.setRecordMerger(readerContext.getRecordMerger(
tableConfig.getRecordMergeMode(),
tableConfig.getRecordMergeStrategyId(),
recordMergeMode, mergeStrategyId,
props.getString(RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY,
props.getString(RECORD_MERGE_IMPL_CLASSES_DEPRECATED_WRITE_CONFIG_KEY, ""))));
readerContext.setTablePath(tablePath);
Expand All @@ -122,7 +132,7 @@ public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
this.outputConverter = readerContext.getSchemaHandler().getOutputConverter();
this.readStats = new HoodieReadStats();
this.recordBuffer = getRecordBuffer(readerContext, hoodieTableMetaClient,
tableConfig.getRecordMergeMode(), props, hoodieBaseFileOption, this.logFiles.isEmpty(),
recordMergeMode, props, hoodieBaseFileOption, this.logFiles.isEmpty(),
isSkipMerge, shouldUseRecordPosition, readStats);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import org.apache.hudi.{DataSourceWriteOptions, HoodieFileIndex}
import org.apache.hudi.AutoRecordKeyGenerationUtils.shouldAutoGenerateRecordKeys
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieConversionUtils.toProperties
import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieCommonConfig, RecordMergeMode, TypedProperties}
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecordMerger, WriteOperationType}
import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieCommonConfig, TypedProperties}
import org.apache.hudi.common.model.WriteOperationType
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.table.HoodieTableConfig.DATABASE_NAME
import org.apache.hudi.common.util.{ReflectionUtils, StringUtils}
Expand Down Expand Up @@ -257,21 +257,7 @@ trait ProvidesHoodieConfig extends Logging {
Map()
}

val deducedPayloadClassName = classOf[DefaultHoodieRecordPayload].getCanonicalName
val recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING.name
val recordMergeStrategy = HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID

if (tableConfig.getPayloadClass.equals(classOf[DefaultHoodieRecordPayload].getCanonicalName) &&
RecordMergeMode.EVENT_TIME_ORDERING.equals(tableConfig.getRecordMergeMode)) {
tableConfig.clearValue(HoodieTableConfig.PAYLOAD_CLASS_NAME)
tableConfig.clearValue(HoodieTableConfig.RECORD_MERGE_MODE)
tableConfig.clearValue(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID)
}

val defaultOpts = Map(
DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key -> deducedPayloadClassName,
DataSourceWriteOptions.RECORD_MERGE_MODE.key -> recordMergeMode,
DataSourceWriteOptions.RECORD_MERGE_STRATEGY_ID.key() -> recordMergeStrategy,
// NOTE: By default insert would try to do deduplication in case that pre-combine column is specified
// for the table
HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(combineBeforeInsert),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@ import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE, RECORD_MERGE_MODE, SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP, TBL_NAME, WRITE_PARTIAL_UPDATE_SCHEMA}
import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE, SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP, TBL_NAME, WRITE_PARTIAL_UPDATE_SCHEMA}
import org.apache.hudi.exception.{HoodieException, HoodieNotSupportedException}
import org.apache.hudi.hive.HiveSyncConfigHolder
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.util.JFunction.scalaFunction1Noop

import org.apache.avro.Schema
import org.apache.spark.sql._
import org.apache.spark.sql.HoodieCatalystExpressionUtils.{MatchCast, attributeEquals}
import org.apache.spark.sql.HoodieCatalystExpressionUtils.{attributeEquals, MatchCast}
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, BoundReference, EqualTo, Expression, Literal, NamedExpression, PredicateHelper}
Expand All @@ -46,13 +47,14 @@ import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
import org.apache.spark.sql.hudi.ProvidesHoodieConfig
import org.apache.spark.sql.hudi.ProvidesHoodieConfig.{combineOptions, getPartitionPathFieldWriteConfig}
import org.apache.spark.sql.hudi.analysis.HoodieAnalysis.failAnalysis
import org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.{CoercedAttributeReference, encodeAsBase64String, stripCasting, toStructType, userGuideString, validateTargetTableAttrExistsInAssignments}
import org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.{encodeAsBase64String, stripCasting, toStructType, userGuideString, validateTargetTableAttrExistsInAssignments, CoercedAttributeReference}
import org.apache.spark.sql.hudi.command.PartialAssignmentMode.PartialAssignmentMode
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._
import org.apache.spark.sql.types.{BooleanType, StructField, StructType}

import java.util.Base64

import scala.collection.JavaConverters._

/**
Expand Down Expand Up @@ -817,8 +819,9 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
// Precombine field and record key field must be present in the assignment clause of all insert actions for event time ordering mode.
// Check has no effect if we don't have such fields in target table or we don't have insert actions
// Please note we are relying on merge mode in the table config as writer merge mode is always "CUSTOM" for MIT.
if (getStringWithAltKeys(props.asJava.asInstanceOf[java.util.Map[String, Object]], HoodieTableConfig.RECORD_MERGE_MODE)
.equals(RecordMergeMode.EVENT_TIME_ORDERING.name())) {
if (RecordMergeMode.EVENT_TIME_ORDERING.name()
.equals(getStringWithAltKeys(props.asJava.asInstanceOf[java.util.Map[String, Object]],
HoodieTableConfig.RECORD_MERGE_MODE))) {
insertActions.foreach(action =>
hoodieCatalogTable.preCombineKey.foreach(
field => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ import org.apache.hudi.common.config.HoodieStorageConfig
import org.apache.hudi.common.model.{HoodieAvroRecordMerger, HoodieRecord}
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.ExceptionUtil.getRootCause
import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex
import org.apache.hudi.storage.HoodieStorage
import org.apache.hudi.testutils.HoodieClientTestUtils.{createMetaClient, getSparkConfForTest}

import org.apache.hadoop.fs.Path
Expand All @@ -37,6 +39,7 @@ import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.checkMessageConta
import org.apache.spark.sql.types.StructField
import org.apache.spark.util.Utils
import org.joda.time.DateTimeZone
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse}
import org.scalactic.source
import org.scalatest.{BeforeAndAfterAll, FunSuite, Tag}
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -323,14 +326,22 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll {
}
}

protected def withSparkSqlSessionConfig(configNameValues: (String, String)*)(f: => Unit): Unit = {
protected def withSparkSqlSessionConfig(configNameValues: (String, String)*
)(f: => Unit): Unit = {
withSparkSqlSessionConfigWithCondition(configNameValues.map(e => (e, true)): _*)(f)
}

protected def withSparkSqlSessionConfigWithCondition(configNameValues: ((String, String), Boolean)*
)(f: => Unit): Unit = {
try {
configNameValues.foreach { case (configName, configValue) =>
spark.sql(s"set $configName=$configValue")
configNameValues.foreach { case ((configName, configValue), condition) =>
if (condition) {
spark.sql(s"set $configName=$configValue")
}
}
f
} finally {
configNameValues.foreach { case (configName, _) =>
configNameValues.foreach { case ((configName, configValue), condition) =>
spark.sql(s"reset $configName")
}
}
Expand Down Expand Up @@ -375,7 +386,19 @@ object HoodieSparkSqlTestBase {
.getActiveTimeline.getInstantDetails(cleanInstant).get)
}

def validateTableConfig(storage: HoodieStorage,
basePath: String,
expectedConfigs: Map[String, String],
nonExistentConfigs: Seq[String]): Unit = {
val tableConfig = HoodieTableConfig.loadFromHoodieProps(storage, basePath)
expectedConfigs.foreach(e => {
assertEquals(e._2, tableConfig.getString(e._1),
s"Table config ${e._1} should be ${e._2} but is ${tableConfig.getString(e._1)}")
})
nonExistentConfigs.foreach(e => assertFalse(
tableConfig.contains(e), s"$e should not be present in the table config"))
}

private def checkMessageContains(e: Throwable, text: String): Boolean =
e.getMessage.trim.contains(text.trim)

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient

import org.apache.spark.sql.Row
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase

import org.slf4j.LoggerFactory

class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
Expand Down Expand Up @@ -1254,7 +1253,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
| ts BIGINT
|) using hudi
|TBLPROPERTIES (
| type = 'cow',
| type = '$tableType',
| primaryKey = 'id',
| preCombineField = 'ts',
| recordMergeMode = '$recordMergeMode'
Expand Down
Loading

0 comments on commit 27a950e

Please sign in to comment.