diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala index a3c653b2b46b..0df0f89922bd 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala @@ -326,14 +326,22 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll { } } - protected def withSparkSqlSessionConfig(configNameValues: (String, String)*)(f: => Unit): Unit = { + protected def withSparkSqlSessionConfig(configNameValues: (String, String)* + )(f: => Unit): Unit = { + withSparkSqlSessionConfigWithCondition(configNameValues.map(e => (e, true)): _*)(f) + } + + protected def withSparkSqlSessionConfigWithCondition(configNameValues: ((String, String), Boolean)* + )(f: => Unit): Unit = { try { - configNameValues.foreach { case (configName, configValue) => - spark.sql(s"set $configName=$configValue") + configNameValues.foreach { case ((configName, configValue), condition) => + if (condition) { + spark.sql(s"set $configName=$configValue") + } } f } finally { - configNameValues.foreach { case (configName, _) => + configNameValues.foreach { case ((configName, configValue), condition) => spark.sql(s"reset $configName") } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala index bdd56e77e113..f5cf45c5aeb7 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala @@ -28,18 +28,12 @@ import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.validateTableConf class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase { - /* - "cow,false,false,8", "cow,false,true,8", "cow,true,false,8", - "cow,true,false,6", "cow,true,true,6", - "mor,false,false,8", "mor,false,true,8", "mor,true,false,8", - "mor,true,false,6", "mor,true,true,6" - */ - //"cow,true,false,6", + // TODO(HUDI-8938): add "mor,true,true,6" after the fix Seq( "cow,false,false,8", "cow,false,true,8", "cow,true,false,8", "cow,true,false,6", "cow,true,true,6", "mor,false,false,8", "mor,false,true,8", "mor,true,false,8", - "mor,true,false,6", "mor,true,true,6").foreach { args => + "mor,true,false,6").foreach { args => val argList = args.split(',') val tableType = argList(0) val setRecordMergeConfigs = argList(1).toBoolean @@ -88,12 +82,13 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase { test(s"Test $tableType table with COMMIT_TIME_ORDERING (tableVersion=$tableVersion," + s"setRecordMergeConfigs=$setRecordMergeConfigs,setUpsertOperation=$setUpsertOperation)") { - withSparkSqlSessionConfig("hoodie.merge.small.file.group.candidates.limit" -> "0" + withSparkSqlSessionConfigWithCondition( + ("hoodie.merge.small.file.group.candidates.limit" -> "0", true), + ("hoodie.spark.sql.insert.into.operation" -> "upsert", setUpsertOperation), + // TODO(HUDI-8820): enable MDT after supporting MDT with table version 6 + ("hoodie.metadata.enable" -> "false", tableVersion.toInt == 6), ) { withRecordType()(withTempDir { tmp => - if (setUpsertOperation) { - spark.sql(s"set hoodie.spark.sql.insert.into.operation=upsert") - } val tableName = generateTableName // Create table with COMMIT_TIME_ORDERING spark.sql( @@ -224,7 +219,6 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase { checkAnswer(s"select id, name, price, ts from $tableName order by id")( Seq(2, "B", 40.0, 101) ) - spark.sql(s"RESET hoodie.spark.sql.insert.into.operation") } }) } @@ -234,7 +228,12 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase { test(s"Test merge operations with COMMIT_TIME_ORDERING for $tableType table " + s"(tableVersion=$tableVersion,setRecordMergeConfigs=$setRecordMergeConfigs," + s"setUpsertOperation=$setUpsertOperation)") { - withSparkSqlSessionConfig("hoodie.merge.small.file.group.candidates.limit" -> "0") { + withSparkSqlSessionConfigWithCondition( + ("hoodie.merge.small.file.group.candidates.limit" -> "0", true), + ("hoodie.spark.sql.insert.into.operation" -> "upsert", setUpsertOperation), + // TODO(HUDI-8820): enable MDT after supporting MDT with table version 6 + ("hoodie.metadata.enable" -> "false", tableVersion.toInt == 6), + ) { withRecordType()(withTempDir { tmp => val tableName = generateTableName // Create table with COMMIT_TIME_ORDERING @@ -272,18 +271,22 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase { validateTableConfig( storage, tmp.getCanonicalPath, expectedMergeConfigs, nonExistentConfigs) + // TODO(HUDI-8840): enable MERGE INTO with deletes + val shouldTestMergeIntoDelete = setRecordMergeConfigs && tableVersion.toInt == 8 // Merge operation - delete with higher, lower and equal ordering field value, all should take effect. - spark.sql( - s""" - | merge into $tableName t - | using ( - | select 1 as id, 'B2' as name, 25.0 as price, 101L as ts union all - | select 2, '', 55.0, 99L as ts union all - | select 0, '', 55.0, 100L as ts - | ) s - | on t.id = s.id - | when matched then delete + if (shouldTestMergeIntoDelete) { + spark.sql( + s""" + | merge into $tableName t + | using ( + | select 1 as id, 'B2' as name, 25.0 as price, 101L as ts union all + | select 2, '', 55.0, 99L as ts union all + | select 0, '', 55.0, 100L as ts + | ) s + | on t.id = s.id + | when matched then delete """.stripMargin) + } // Merge operation - update with mixed ts values spark.sql( @@ -301,11 +304,20 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase { // Verify state after merges validateTableConfig( storage, tmp.getCanonicalPath, expectedMergeConfigs, nonExistentConfigs) + val nonDeletedRows: Seq[Seq[Any]] = if (shouldTestMergeIntoDelete) { + Seq() + } else { + Seq(Seq(0, "X", 20.0, 100), + Seq(1, "A", 10.0, 100), + Seq(2, "B", 20.0, 100)) + } checkAnswer(s"select id, name, price, ts from $tableName order by id")( - Seq(3, "C", 30.0, 100), - Seq(4, "D2", 45.0, 101), - Seq(5, "E2", 55.0, 99), - Seq(6, "F2", 65.0, 100) + (nonDeletedRows ++ Seq( + Seq(3, "C", 30.0, 100), + Seq(4, "D2", 45.0, 101), + Seq(5, "E2", 55.0, 99), + Seq(6, "F2", 65.0, 100) + )): _*, ) // Insert new records through merge @@ -324,12 +336,14 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase { validateTableConfig( storage, tmp.getCanonicalPath, expectedMergeConfigs, nonExistentConfigs) checkAnswer(s"select id, name, price, ts from $tableName order by id")( - Seq(3, "C", 30.0, 100), - Seq(4, "D2", 45.0, 101), - Seq(5, "E2", 55.0, 99), - Seq(6, "F2", 65.0, 100), - Seq(7, "D2", 45.0, 100), - Seq(8, "E2", 55.0, 100) + (nonDeletedRows ++ Seq( + Seq(3, "C", 30.0, 100), + Seq(4, "D2", 45.0, 101), + Seq(5, "E2", 55.0, 99), + Seq(6, "F2", 65.0, 100), + Seq(7, "D2", 45.0, 100), + Seq(8, "E2", 55.0, 100) + )): _* ) }) }