diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala index 7de0617b23b2..fa458c3db1a0 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala @@ -21,8 +21,8 @@ import org.apache.hudi.{DataSourceWriteOptions, HoodieFileIndex} import org.apache.hudi.AutoRecordKeyGenerationUtils.shouldAutoGenerateRecordKeys import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.HoodieConversionUtils.toProperties -import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieCommonConfig, RecordMergeMode, TypedProperties} -import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecordMerger, WriteOperationType} +import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieCommonConfig, TypedProperties} +import org.apache.hudi.common.model.WriteOperationType import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.common.table.HoodieTableConfig.DATABASE_NAME import org.apache.hudi.common.util.{ReflectionUtils, StringUtils} @@ -257,16 +257,12 @@ trait ProvidesHoodieConfig extends Logging { Map() } - val deducedPayloadClassName = classOf[DefaultHoodieRecordPayload].getCanonicalName - val recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING.name - val recordMergeStrategy = HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID - - if (tableConfig.getPayloadClass.equals(classOf[DefaultHoodieRecordPayload].getCanonicalName) && - RecordMergeMode.EVENT_TIME_ORDERING.equals(tableConfig.getRecordMergeMode)) { - tableConfig.clearValue(HoodieTableConfig.PAYLOAD_CLASS_NAME) - tableConfig.clearValue(HoodieTableConfig.RECORD_MERGE_MODE) - tableConfig.clearValue(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID) - } + val inferredMergeConfigs = HoodieTableConfig.inferCorrectMergingBehavior( + tableConfig.getRecordMergeMode, tableConfig.getPayloadClass, + tableConfig.getRecordMergeStrategyId, preCombineField) + val recordMergeMode = inferredMergeConfigs.getLeft.name() + val deducedPayloadClassName = inferredMergeConfigs.getMiddle + val recordMergeStrategy = inferredMergeConfigs.getRight val defaultOpts = Map( DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key -> deducedPayloadClassName, diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala index 16187ba3fd92..0b65721951d3 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala @@ -23,10 +23,12 @@ import org.apache.hudi.common.config.HoodieStorageConfig import org.apache.hudi.common.model.{HoodieAvroRecordMerger, HoodieRecord} import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType import org.apache.hudi.common.table.timeline.TimelineMetadataUtils +import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.ExceptionUtil.getRootCause import org.apache.hudi.hadoop.fs.HadoopFSUtils import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex +import org.apache.hudi.storage.HoodieStorage import org.apache.hudi.testutils.HoodieClientTestUtils.{createMetaClient, getSparkConfForTest} import org.apache.hadoop.fs.Path @@ -37,6 +39,7 @@ import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.checkMessageConta import org.apache.spark.sql.types.StructField import org.apache.spark.util.Utils import org.joda.time.DateTimeZone +import org.junit.jupiter.api.Assertions.assertEquals import org.scalactic.source import org.scalatest.{BeforeAndAfterAll, FunSuite, Tag} import org.slf4j.LoggerFactory @@ -375,7 +378,15 @@ object HoodieSparkSqlTestBase { .getActiveTimeline.getInstantDetails(cleanInstant).get) } + def validateTableConfig(storage: HoodieStorage, + basePath: String, + expectedConfigs: Map[String, String]): Unit = { + val tableConfig = HoodieTableConfig.loadFromHoodieProps(storage, basePath) + expectedConfigs.foreach(e => { + assertEquals(e._2, tableConfig.getString(e._1)) + }) + } + private def checkMessageContains(e: Throwable, text: String): Boolean = e.getMessage.trim.contains(text.trim) - } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala index a0af78797e60..35c5de1c5106 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala @@ -17,14 +17,38 @@ package org.apache.spark.sql.hudi.dml +import org.apache.hudi.common.config.RecordMergeMode.COMMIT_TIME_ORDERING +import org.apache.hudi.common.model.HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID +import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload +import org.apache.hudi.common.table.HoodieTableConfig +import org.apache.hudi.common.testutils.HoodieTestUtils + import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase +import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.validateTableConfig class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase { - Seq("mor").foreach { tableType => - // [HUDI-8850] For COW commit time ordering does not work. - // Seq("cow", "mor").foreach { tableType => - test(s"Test $tableType table with COMMIT_TIME_ORDERING merge mode") { + // "cow,false" + // "cow,true", "mor,true", "mor,false" + Seq("cow,false", "cow,true", "mor,false", "mor,true").foreach { args => + val argList = args.split(',') + val tableType = argList(0) + val setRecordMergeMode = argList(1).toBoolean + val storage = HoodieTestUtils.getDefaultStorage + val expectedMergeConfigs = Map( + HoodieTableConfig.RECORD_MERGE_MODE.key -> COMMIT_TIME_ORDERING.name(), + HoodieTableConfig.PAYLOAD_CLASS_NAME.key -> classOf[OverwriteWithLatestAvroPayload].getName, + HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key -> COMMIT_TIME_BASED_MERGE_STRATEGY_UUID + ) + val mergeConfigClause = if (setRecordMergeMode) { + // with precombine, dedup automatically kick in + // ", preCombineField = 'ts',\nhoodie.record.merge.mode = 'COMMIT_TIME_ORDERING'" + ",\nhoodie.record.merge.mode = 'COMMIT_TIME_ORDERING'" + } else { + // By default, the COMMIT_TIME_ORDERING is used if not specified by the user + "" + } + test(s"Test $tableType table with COMMIT_TIME_ORDERING merge mode (explicitly set = $setRecordMergeMode)") { withSparkSqlSessionConfig("hoodie.merge.small.file.group.candidates.limit" -> "0" ) { withRecordType()(withTempDir { tmp => @@ -40,13 +64,12 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase { | ) using hudi | tblproperties ( | type = '$tableType', - | primaryKey = 'id', - | preCombineField = 'ts', - | hoodie.record.merge.mode = 'COMMIT_TIME_ORDERING' + | primaryKey = 'id' + | $mergeConfigClause | ) | location '${tmp.getCanonicalPath}' """.stripMargin) - + validateTableConfig(storage, tmp.getCanonicalPath, expectedMergeConfigs) // Insert initial records with ts=100 spark.sql( s""" @@ -64,7 +87,7 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase { | union all | select 2, 'B_equal', 70.0, 100 """.stripMargin) - + validateTableConfig(storage, tmp.getCanonicalPath, expectedMergeConfigs) checkAnswer(s"select id, name, price, ts from $tableName order by id")( Seq(1, "A_equal", 60.0, 100), Seq(2, "B_equal", 70.0, 100) @@ -77,7 +100,7 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase { | set price = 50.0, ts = 100 | where id = 1 """.stripMargin) - + validateTableConfig(storage, tmp.getCanonicalPath, expectedMergeConfigs) checkAnswer(s"select id, name, price, ts from $tableName order by id")( Seq(1, "A_equal", 50.0, 100), Seq(2, "B_equal", 70.0, 100) @@ -141,7 +164,7 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase { // Delete record spark.sql(s"delete from $tableName where id = 1") - + validateTableConfig(storage, tmp.getCanonicalPath, expectedMergeConfigs) // Verify deletion checkAnswer(s"select id, name, price, ts from $tableName order by id")( Seq(2, "B", 40.0, 101) @@ -150,97 +173,104 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase { } } - test(s"Test merge operations with COMMIT_TIME_ORDERING for $tableType table") { - withSparkSqlSessionConfig("hoodie.merge.small.file.group.candidates.limit" -> "0") { - withRecordType()(withTempDir { tmp => - val tableName = generateTableName - // Create table with COMMIT_TIME_ORDERING - spark.sql( - s""" - | create table $tableName ( - | id int, - | name string, - | price double, - | ts long - | ) using hudi - | tblproperties ( - | type = '$tableType', - | primaryKey = 'id', - | preCombineField = 'ts', - | hoodie.record.merge.mode = 'COMMIT_TIME_ORDERING' - | ) - | location '${tmp.getCanonicalPath}' + // TODO(HUDI-8468): add COW test after supporting COMMIT_TIME_ORDERING in MERGE INTO for COW + if ("mor".equals(tableType)) { + test(s"Test merge operations with COMMIT_TIME_ORDERING for $tableType table " + + s"(explicitly set = $setRecordMergeMode)") { + withSparkSqlSessionConfig("hoodie.merge.small.file.group.candidates.limit" -> "0") { + withRecordType()(withTempDir { tmp => + val tableName = generateTableName + // Create table with COMMIT_TIME_ORDERING + spark.sql( + s""" + | create table $tableName ( + | id int, + | name string, + | price double, + | ts long + | ) using hudi + | tblproperties ( + | type = '$tableType', + | primaryKey = 'id' + | $mergeConfigClause + | ) + | location '${tmp.getCanonicalPath}' """.stripMargin) + validateTableConfig(storage, tmp.getCanonicalPath, expectedMergeConfigs) - // Insert initial records - spark.sql( - s""" - | insert into $tableName - | select 1 as id, 'A' as name, 10.0 as price, 100 as ts union all - | select 0, 'X', 20.0, 100 union all - | select 2, 'B', 20.0, 100 union all - | select 3, 'C', 30.0, 100 union all - | select 4, 'D', 40.0, 100 union all - | select 5, 'E', 50.0, 100 union all - | select 6, 'F', 60.0, 100 + // Insert initial records + spark.sql( + s""" + | insert into $tableName + | select 1 as id, 'A' as name, 10.0 as price, 100 as ts union all + | select 0, 'X', 20.0, 100 union all + | select 2, 'B', 20.0, 100 union all + | select 3, 'C', 30.0, 100 union all + | select 4, 'D', 40.0, 100 union all + | select 5, 'E', 50.0, 100 union all + | select 6, 'F', 60.0, 100 """.stripMargin) + validateTableConfig(storage, tmp.getCanonicalPath, expectedMergeConfigs) - // Merge operation - delete with higher, lower and equal ordering field value, all should take effect. - spark.sql( - s""" - | merge into $tableName t - | using ( - | select 1 as id, 'B2' as name, 25.0 as price, 101 as ts union all - | select 2, '', 55.0, 99 as ts union all - | select 0, '', 55.0, 100 as ts - | ) s - | on t.id = s.id - | when matched then delete + // Merge operation - delete with higher, lower and equal ordering field value, all should take effect. + spark.sql( + s""" + | merge into $tableName t + | using ( + | select 1 as id, 'B2' as name, 25.0 as price, 101 as ts union all + | select 2, '', 55.0, 99 as ts union all + | select 0, '', 55.0, 100 as ts + | ) s + | on t.id = s.id + | when matched then delete """.stripMargin) - // Merge operation - update with mixed ts values - spark.sql( - s""" - | merge into $tableName t - | using ( - | select 4 as id, 'D2' as name, 45.0 as price, 101 as ts union all - | select 5, 'E2', 55.0, 99 as ts union all - | select 6, 'F2', 65.0, 100 as ts - | ) s - | on t.id = s.id - | when matched then update set * + // Merge operation - update with mixed ts values + spark.sql( + s""" + | merge into $tableName t + | using ( + | select 4 as id, 'D2' as name, 45.0 as price, 101 as ts union all + | select 5, 'E2', 55.0, 99 as ts union all + | select 6, 'F2', 65.0, 100 as ts + | ) s + | on t.id = s.id + | when matched then update set * """.stripMargin) - // Verify state after merges - checkAnswer(s"select id, name, price, ts from $tableName order by id")( - Seq(3, "C", 30.0, 100), - Seq(4, "D2", 45.0, 101), - Seq(5, "E2", 55.0, 99), - Seq(6, "F2", 65.0, 100) - ) + // Verify state after merges + validateTableConfig(storage, tmp.getCanonicalPath, expectedMergeConfigs) + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(3, "C", 30.0, 100), + Seq(4, "D2", 45.0, 101), + Seq(5, "E2", 55.0, 99), + Seq(6, "F2", 65.0, 100) + ) - // Insert new records through merge - spark.sql( - s""" - | merge into $tableName t - | using ( - | select 7 as id, 'D2' as name, 45.0 as price, 100 as ts union all - | select 8, 'E2', 55.0, 100 as ts - | ) s - | on t.id = s.id - | when not matched then insert * + // Insert new records through merge + spark.sql( + s""" + | merge into $tableName t + | using ( + | select 7 as id, 'D2' as name, 45.0 as price, 100 as ts union all + | select 8, 'E2', 55.0, 100 as ts + | ) s + | on t.id = s.id + | when not matched then insert * """.stripMargin) - // Verify final state - checkAnswer(s"select id, name, price, ts from $tableName order by id")( - Seq(3, "C", 30.0, 100), - Seq(4, "D2", 45.0, 101), - Seq(5, "E2", 55.0, 99), - Seq(6, "F2", 65.0, 100), - Seq(7, "D2", 45.0, 100), - Seq(8, "E2", 55.0, 100) - ) - }) + // Verify final state + validateTableConfig(storage, tmp.getCanonicalPath, expectedMergeConfigs) + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(3, "C", 30.0, 100), + Seq(4, "D2", 45.0, 101), + Seq(5, "E2", 55.0, 99), + Seq(6, "F2", 65.0, 100), + Seq(7, "D2", 45.0, 100), + Seq(8, "E2", 55.0, 100) + ) + }) + } } } }