Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8850] Fix record merge mode related issues and improve test coverage in Spark SQL #12725

Merged
merged 9 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordReader;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.CachingIterator;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.EmptyIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.collection.Triple;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.storage.HoodieStorage;
Expand Down Expand Up @@ -102,9 +104,17 @@ public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
this.start = start;
this.length = length;
HoodieTableConfig tableConfig = hoodieTableMetaClient.getTableConfig();
RecordMergeMode recordMergeMode = tableConfig.getRecordMergeMode();
String mergeStrategyId = tableConfig.getRecordMergeStrategyId();
if (!tableConfig.getTableVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
Triple<RecordMergeMode, String, String> triple = HoodieTableConfig.inferCorrectMergingBehavior(
recordMergeMode, tableConfig.getPayloadClass(),
mergeStrategyId, null);
recordMergeMode = triple.getLeft();
mergeStrategyId = triple.getRight();
}
readerContext.setRecordMerger(readerContext.getRecordMerger(
tableConfig.getRecordMergeMode(),
tableConfig.getRecordMergeStrategyId(),
recordMergeMode, mergeStrategyId,
props.getString(RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY,
props.getString(RECORD_MERGE_IMPL_CLASSES_DEPRECATED_WRITE_CONFIG_KEY, ""))));
readerContext.setTablePath(tablePath);
Expand All @@ -122,7 +132,7 @@ public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
this.outputConverter = readerContext.getSchemaHandler().getOutputConverter();
this.readStats = new HoodieReadStats();
this.recordBuffer = getRecordBuffer(readerContext, hoodieTableMetaClient,
tableConfig.getRecordMergeMode(), props, hoodieBaseFileOption, this.logFiles.isEmpty(),
recordMergeMode, props, hoodieBaseFileOption, this.logFiles.isEmpty(),
isSkipMerge, shouldUseRecordPosition, readStats);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import org.apache.hudi.{DataSourceWriteOptions, HoodieFileIndex}
import org.apache.hudi.AutoRecordKeyGenerationUtils.shouldAutoGenerateRecordKeys
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieConversionUtils.toProperties
import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieCommonConfig, RecordMergeMode, TypedProperties}
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecordMerger, WriteOperationType}
import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieCommonConfig, TypedProperties}
import org.apache.hudi.common.model.WriteOperationType
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.table.HoodieTableConfig.DATABASE_NAME
import org.apache.hudi.common.util.{ReflectionUtils, StringUtils}
Expand Down Expand Up @@ -257,21 +257,7 @@ trait ProvidesHoodieConfig extends Logging {
Map()
}

val deducedPayloadClassName = classOf[DefaultHoodieRecordPayload].getCanonicalName
val recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING.name
val recordMergeStrategy = HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID

if (tableConfig.getPayloadClass.equals(classOf[DefaultHoodieRecordPayload].getCanonicalName) &&
RecordMergeMode.EVENT_TIME_ORDERING.equals(tableConfig.getRecordMergeMode)) {
tableConfig.clearValue(HoodieTableConfig.PAYLOAD_CLASS_NAME)
tableConfig.clearValue(HoodieTableConfig.RECORD_MERGE_MODE)
tableConfig.clearValue(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID)
}

val defaultOpts = Map(
DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key -> deducedPayloadClassName,
DataSourceWriteOptions.RECORD_MERGE_MODE.key -> recordMergeMode,
DataSourceWriteOptions.RECORD_MERGE_STRATEGY_ID.key() -> recordMergeStrategy,
// NOTE: By default insert would try to do deduplication in case that pre-combine column is specified
// for the table
HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(combineBeforeInsert),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@ import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE, RECORD_MERGE_MODE, SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP, TBL_NAME, WRITE_PARTIAL_UPDATE_SCHEMA}
import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE, SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP, TBL_NAME, WRITE_PARTIAL_UPDATE_SCHEMA}
import org.apache.hudi.exception.{HoodieException, HoodieNotSupportedException}
import org.apache.hudi.hive.HiveSyncConfigHolder
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.util.JFunction.scalaFunction1Noop

import org.apache.avro.Schema
import org.apache.spark.sql._
import org.apache.spark.sql.HoodieCatalystExpressionUtils.{MatchCast, attributeEquals}
import org.apache.spark.sql.HoodieCatalystExpressionUtils.{attributeEquals, MatchCast}
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, BoundReference, EqualTo, Expression, Literal, NamedExpression, PredicateHelper}
Expand All @@ -46,13 +47,14 @@ import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
import org.apache.spark.sql.hudi.ProvidesHoodieConfig
import org.apache.spark.sql.hudi.ProvidesHoodieConfig.{combineOptions, getPartitionPathFieldWriteConfig}
import org.apache.spark.sql.hudi.analysis.HoodieAnalysis.failAnalysis
import org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.{CoercedAttributeReference, encodeAsBase64String, stripCasting, toStructType, userGuideString, validateTargetTableAttrExistsInAssignments}
import org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.{encodeAsBase64String, stripCasting, toStructType, userGuideString, validateTargetTableAttrExistsInAssignments, CoercedAttributeReference}
import org.apache.spark.sql.hudi.command.PartialAssignmentMode.PartialAssignmentMode
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._
import org.apache.spark.sql.types.{BooleanType, StructField, StructType}

import java.util.Base64

import scala.collection.JavaConverters._

/**
Expand Down Expand Up @@ -817,8 +819,9 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
// Precombine field and record key field must be present in the assignment clause of all insert actions for event time ordering mode.
// Check has no effect if we don't have such fields in target table or we don't have insert actions
// Please note we are relying on merge mode in the table config as writer merge mode is always "CUSTOM" for MIT.
if (getStringWithAltKeys(props.asJava.asInstanceOf[java.util.Map[String, Object]], HoodieTableConfig.RECORD_MERGE_MODE)
.equals(RecordMergeMode.EVENT_TIME_ORDERING.name())) {
if (RecordMergeMode.EVENT_TIME_ORDERING.name()
.equals(getStringWithAltKeys(props.asJava.asInstanceOf[java.util.Map[String, Object]],
HoodieTableConfig.RECORD_MERGE_MODE))) {
insertActions.foreach(action =>
hoodieCatalogTable.preCombineKey.foreach(
field => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ import org.apache.hudi.common.config.HoodieStorageConfig
import org.apache.hudi.common.model.{HoodieAvroRecordMerger, HoodieRecord}
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.ExceptionUtil.getRootCause
import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex
import org.apache.hudi.storage.HoodieStorage
import org.apache.hudi.testutils.HoodieClientTestUtils.{createMetaClient, getSparkConfForTest}

import org.apache.hadoop.fs.Path
Expand All @@ -37,6 +39,7 @@ import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.checkMessageConta
import org.apache.spark.sql.types.StructField
import org.apache.spark.util.Utils
import org.joda.time.DateTimeZone
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse}
import org.scalactic.source
import org.scalatest.{BeforeAndAfterAll, FunSuite, Tag}
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -323,14 +326,22 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll {
}
}

protected def withSparkSqlSessionConfig(configNameValues: (String, String)*)(f: => Unit): Unit = {
protected def withSparkSqlSessionConfig(configNameValues: (String, String)*
)(f: => Unit): Unit = {
withSparkSqlSessionConfigWithCondition(configNameValues.map(e => (e, true)): _*)(f)
}

protected def withSparkSqlSessionConfigWithCondition(configNameValues: ((String, String), Boolean)*
)(f: => Unit): Unit = {
try {
configNameValues.foreach { case (configName, configValue) =>
spark.sql(s"set $configName=$configValue")
configNameValues.foreach { case ((configName, configValue), condition) =>
if (condition) {
spark.sql(s"set $configName=$configValue")
}
}
f
} finally {
configNameValues.foreach { case (configName, _) =>
configNameValues.foreach { case ((configName, configValue), condition) =>
spark.sql(s"reset $configName")
}
}
Expand Down Expand Up @@ -375,7 +386,19 @@ object HoodieSparkSqlTestBase {
.getActiveTimeline.getInstantDetails(cleanInstant).get)
}

def validateTableConfig(storage: HoodieStorage,
basePath: String,
expectedConfigs: Map[String, String],
nonExistentConfigs: Seq[String]): Unit = {
val tableConfig = HoodieTableConfig.loadFromHoodieProps(storage, basePath)
expectedConfigs.foreach(e => {
assertEquals(e._2, tableConfig.getString(e._1),
s"Table config ${e._1} should be ${e._2} but is ${tableConfig.getString(e._1)}")
})
nonExistentConfigs.foreach(e => assertFalse(
tableConfig.contains(e), s"$e should not be present in the table config"))
}

private def checkMessageContains(e: Throwable, text: String): Boolean =
e.getMessage.trim.contains(text.trim)

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient

import org.apache.spark.sql.Row
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase

import org.slf4j.LoggerFactory

class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
Expand Down Expand Up @@ -1254,7 +1253,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
| ts BIGINT
|) using hudi
|TBLPROPERTIES (
| type = 'cow',
| type = '$tableType',
| primaryKey = 'id',
| preCombineField = 'ts',
| recordMergeMode = '$recordMergeMode'
Expand Down
Loading
Loading