Skip to content

Commit

Permalink
[HUDI-8850] Fix determination of merge mode write config for INSERT I…
Browse files Browse the repository at this point in the history
…NTO in Spark SQL
  • Loading branch information
yihua committed Jan 28, 2025
1 parent 4bea55a commit 44ef790
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import org.apache.hudi.{DataSourceWriteOptions, HoodieFileIndex}
import org.apache.hudi.AutoRecordKeyGenerationUtils.shouldAutoGenerateRecordKeys
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieConversionUtils.toProperties
import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieCommonConfig, RecordMergeMode, TypedProperties}
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecordMerger, WriteOperationType}
import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieCommonConfig, TypedProperties}
import org.apache.hudi.common.model.WriteOperationType
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.table.HoodieTableConfig.DATABASE_NAME
import org.apache.hudi.common.util.{ReflectionUtils, StringUtils}
Expand Down Expand Up @@ -257,16 +257,12 @@ trait ProvidesHoodieConfig extends Logging {
Map()
}

val deducedPayloadClassName = classOf[DefaultHoodieRecordPayload].getCanonicalName
val recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING.name
val recordMergeStrategy = HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID

if (tableConfig.getPayloadClass.equals(classOf[DefaultHoodieRecordPayload].getCanonicalName) &&
RecordMergeMode.EVENT_TIME_ORDERING.equals(tableConfig.getRecordMergeMode)) {
tableConfig.clearValue(HoodieTableConfig.PAYLOAD_CLASS_NAME)
tableConfig.clearValue(HoodieTableConfig.RECORD_MERGE_MODE)
tableConfig.clearValue(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID)
}
val inferredMergeConfigs = HoodieTableConfig.inferCorrectMergingBehavior(
tableConfig.getRecordMergeMode, tableConfig.getPayloadClass,
tableConfig.getRecordMergeStrategyId, preCombineField)
val recordMergeMode = inferredMergeConfigs.getLeft.name()
val deducedPayloadClassName = inferredMergeConfigs.getMiddle
val recordMergeStrategy = inferredMergeConfigs.getRight

val defaultOpts = Map(
DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key -> deducedPayloadClassName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ import org.apache.hudi.common.config.HoodieStorageConfig
import org.apache.hudi.common.model.{HoodieAvroRecordMerger, HoodieRecord}
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.ExceptionUtil.getRootCause
import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex
import org.apache.hudi.storage.HoodieStorage
import org.apache.hudi.testutils.HoodieClientTestUtils.{createMetaClient, getSparkConfForTest}

import org.apache.hadoop.fs.Path
Expand All @@ -37,6 +39,7 @@ import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.checkMessageConta
import org.apache.spark.sql.types.StructField
import org.apache.spark.util.Utils
import org.joda.time.DateTimeZone
import org.junit.jupiter.api.Assertions.assertEquals
import org.scalactic.source
import org.scalatest.{BeforeAndAfterAll, FunSuite, Tag}
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -375,7 +378,15 @@ object HoodieSparkSqlTestBase {
.getActiveTimeline.getInstantDetails(cleanInstant).get)
}

def validateTableConfig(storage: HoodieStorage,
basePath: String,
expectedConfigs: Map[String, String]): Unit = {
val tableConfig = HoodieTableConfig.loadFromHoodieProps(storage, basePath)
expectedConfigs.foreach(e => {
assertEquals(e._2, tableConfig.getString(e._1))
})
}

private def checkMessageContains(e: Throwable, text: String): Boolean =
e.getMessage.trim.contains(text.trim)

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,38 @@

package org.apache.spark.sql.hudi.dml

import org.apache.hudi.common.config.RecordMergeMode.COMMIT_TIME_ORDERING
import org.apache.hudi.common.model.HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.testutils.HoodieTestUtils

import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.validateTableConfig

class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase {

Seq("mor").foreach { tableType =>
// [HUDI-8850] For COW commit time ordering does not work.
// Seq("cow", "mor").foreach { tableType =>
test(s"Test $tableType table with COMMIT_TIME_ORDERING merge mode") {
// "cow,false"
// "cow,true", "mor,true", "mor,false"
Seq("cow,false", "cow,true", "mor,false", "mor,true").foreach { args =>
val argList = args.split(',')
val tableType = argList(0)
val setRecordMergeMode = argList(1).toBoolean
val storage = HoodieTestUtils.getDefaultStorage
val expectedMergeConfigs = Map(
HoodieTableConfig.RECORD_MERGE_MODE.key -> COMMIT_TIME_ORDERING.name(),
HoodieTableConfig.PAYLOAD_CLASS_NAME.key -> classOf[OverwriteWithLatestAvroPayload].getName,
HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key -> COMMIT_TIME_BASED_MERGE_STRATEGY_UUID
)
val mergeConfigClause = if (setRecordMergeMode) {
// with precombine, dedup automatically kick in
// ", preCombineField = 'ts',\nhoodie.record.merge.mode = 'COMMIT_TIME_ORDERING'"
",\nhoodie.record.merge.mode = 'COMMIT_TIME_ORDERING'"
} else {
// By default, the COMMIT_TIME_ORDERING is used if not specified by the user
""
}
test(s"Test $tableType table with COMMIT_TIME_ORDERING merge mode (explicitly set = $setRecordMergeMode)") {
withSparkSqlSessionConfig("hoodie.merge.small.file.group.candidates.limit" -> "0"
) {
withRecordType()(withTempDir { tmp =>
Expand All @@ -40,13 +64,12 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase {
| ) using hudi
| tblproperties (
| type = '$tableType',
| primaryKey = 'id',
| preCombineField = 'ts',
| hoodie.record.merge.mode = 'COMMIT_TIME_ORDERING'
| primaryKey = 'id'
| $mergeConfigClause
| )
| location '${tmp.getCanonicalPath}'
""".stripMargin)

validateTableConfig(storage, tmp.getCanonicalPath, expectedMergeConfigs)
// Insert initial records with ts=100
spark.sql(
s"""
Expand All @@ -64,7 +87,7 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase {
| union all
| select 2, 'B_equal', 70.0, 100
""".stripMargin)

validateTableConfig(storage, tmp.getCanonicalPath, expectedMergeConfigs)
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "A_equal", 60.0, 100),
Seq(2, "B_equal", 70.0, 100)
Expand All @@ -77,7 +100,7 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase {
| set price = 50.0, ts = 100
| where id = 1
""".stripMargin)

validateTableConfig(storage, tmp.getCanonicalPath, expectedMergeConfigs)
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "A_equal", 50.0, 100),
Seq(2, "B_equal", 70.0, 100)
Expand Down Expand Up @@ -141,7 +164,7 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase {

// Delete record
spark.sql(s"delete from $tableName where id = 1")

validateTableConfig(storage, tmp.getCanonicalPath, expectedMergeConfigs)
// Verify deletion
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(2, "B", 40.0, 101)
Expand All @@ -150,97 +173,104 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase {
}
}

test(s"Test merge operations with COMMIT_TIME_ORDERING for $tableType table") {
withSparkSqlSessionConfig("hoodie.merge.small.file.group.candidates.limit" -> "0") {
withRecordType()(withTempDir { tmp =>
val tableName = generateTableName
// Create table with COMMIT_TIME_ORDERING
spark.sql(
s"""
| create table $tableName (
| id int,
| name string,
| price double,
| ts long
| ) using hudi
| tblproperties (
| type = '$tableType',
| primaryKey = 'id',
| preCombineField = 'ts',
| hoodie.record.merge.mode = 'COMMIT_TIME_ORDERING'
| )
| location '${tmp.getCanonicalPath}'
// TODO(HUDI-8468): add COW test after supporting COMMIT_TIME_ORDERING in MERGE INTO for COW
if ("mor".equals(tableType)) {
test(s"Test merge operations with COMMIT_TIME_ORDERING for $tableType table "
+ s"(explicitly set = $setRecordMergeMode)") {
withSparkSqlSessionConfig("hoodie.merge.small.file.group.candidates.limit" -> "0") {
withRecordType()(withTempDir { tmp =>
val tableName = generateTableName
// Create table with COMMIT_TIME_ORDERING
spark.sql(
s"""
| create table $tableName (
| id int,
| name string,
| price double,
| ts long
| ) using hudi
| tblproperties (
| type = '$tableType',
| primaryKey = 'id'
| $mergeConfigClause
| )
| location '${tmp.getCanonicalPath}'
""".stripMargin)
validateTableConfig(storage, tmp.getCanonicalPath, expectedMergeConfigs)

// Insert initial records
spark.sql(
s"""
| insert into $tableName
| select 1 as id, 'A' as name, 10.0 as price, 100 as ts union all
| select 0, 'X', 20.0, 100 union all
| select 2, 'B', 20.0, 100 union all
| select 3, 'C', 30.0, 100 union all
| select 4, 'D', 40.0, 100 union all
| select 5, 'E', 50.0, 100 union all
| select 6, 'F', 60.0, 100
// Insert initial records
spark.sql(
s"""
| insert into $tableName
| select 1 as id, 'A' as name, 10.0 as price, 100 as ts union all
| select 0, 'X', 20.0, 100 union all
| select 2, 'B', 20.0, 100 union all
| select 3, 'C', 30.0, 100 union all
| select 4, 'D', 40.0, 100 union all
| select 5, 'E', 50.0, 100 union all
| select 6, 'F', 60.0, 100
""".stripMargin)
validateTableConfig(storage, tmp.getCanonicalPath, expectedMergeConfigs)

// Merge operation - delete with higher, lower and equal ordering field value, all should take effect.
spark.sql(
s"""
| merge into $tableName t
| using (
| select 1 as id, 'B2' as name, 25.0 as price, 101 as ts union all
| select 2, '', 55.0, 99 as ts union all
| select 0, '', 55.0, 100 as ts
| ) s
| on t.id = s.id
| when matched then delete
// Merge operation - delete with higher, lower and equal ordering field value, all should take effect.
spark.sql(
s"""
| merge into $tableName t
| using (
| select 1 as id, 'B2' as name, 25.0 as price, 101 as ts union all
| select 2, '', 55.0, 99 as ts union all
| select 0, '', 55.0, 100 as ts
| ) s
| on t.id = s.id
| when matched then delete
""".stripMargin)

// Merge operation - update with mixed ts values
spark.sql(
s"""
| merge into $tableName t
| using (
| select 4 as id, 'D2' as name, 45.0 as price, 101 as ts union all
| select 5, 'E2', 55.0, 99 as ts union all
| select 6, 'F2', 65.0, 100 as ts
| ) s
| on t.id = s.id
| when matched then update set *
// Merge operation - update with mixed ts values
spark.sql(
s"""
| merge into $tableName t
| using (
| select 4 as id, 'D2' as name, 45.0 as price, 101 as ts union all
| select 5, 'E2', 55.0, 99 as ts union all
| select 6, 'F2', 65.0, 100 as ts
| ) s
| on t.id = s.id
| when matched then update set *
""".stripMargin)

// Verify state after merges
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(3, "C", 30.0, 100),
Seq(4, "D2", 45.0, 101),
Seq(5, "E2", 55.0, 99),
Seq(6, "F2", 65.0, 100)
)
// Verify state after merges
validateTableConfig(storage, tmp.getCanonicalPath, expectedMergeConfigs)
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(3, "C", 30.0, 100),
Seq(4, "D2", 45.0, 101),
Seq(5, "E2", 55.0, 99),
Seq(6, "F2", 65.0, 100)
)

// Insert new records through merge
spark.sql(
s"""
| merge into $tableName t
| using (
| select 7 as id, 'D2' as name, 45.0 as price, 100 as ts union all
| select 8, 'E2', 55.0, 100 as ts
| ) s
| on t.id = s.id
| when not matched then insert *
// Insert new records through merge
spark.sql(
s"""
| merge into $tableName t
| using (
| select 7 as id, 'D2' as name, 45.0 as price, 100 as ts union all
| select 8, 'E2', 55.0, 100 as ts
| ) s
| on t.id = s.id
| when not matched then insert *
""".stripMargin)

// Verify final state
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(3, "C", 30.0, 100),
Seq(4, "D2", 45.0, 101),
Seq(5, "E2", 55.0, 99),
Seq(6, "F2", 65.0, 100),
Seq(7, "D2", 45.0, 100),
Seq(8, "E2", 55.0, 100)
)
})
// Verify final state
validateTableConfig(storage, tmp.getCanonicalPath, expectedMergeConfigs)
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(3, "C", 30.0, 100),
Seq(4, "D2", 45.0, 101),
Seq(5, "E2", 55.0, 99),
Seq(6, "F2", 65.0, 100),
Seq(7, "D2", 45.0, 100),
Seq(8, "E2", 55.0, 100)
)
})
}
}
}
}
Expand Down

0 comments on commit 44ef790

Please sign in to comment.