From 4786ac5d96e047f2682ac524e0c0b84cf07d5489 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Tue, 28 Jan 2025 22:23:13 -0800 Subject: [PATCH] Improve tests --- .../hudi/common/HoodieSparkSqlTestBase.scala | 10 +- .../dml/TestMergeModeCommitTimeOrdering.scala | 236 +++++++++++------- 2 files changed, 152 insertions(+), 94 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala index 0b65721951d3..a3c653b2b46b 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.checkMessageConta import org.apache.spark.sql.types.StructField import org.apache.spark.util.Utils import org.joda.time.DateTimeZone -import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse} import org.scalactic.source import org.scalatest.{BeforeAndAfterAll, FunSuite, Tag} import org.slf4j.LoggerFactory @@ -380,11 +380,15 @@ object HoodieSparkSqlTestBase { def validateTableConfig(storage: HoodieStorage, basePath: String, - expectedConfigs: Map[String, String]): Unit = { + expectedConfigs: Map[String, String], + nonExistentConfigs: Seq[String]): Unit = { val tableConfig = HoodieTableConfig.loadFromHoodieProps(storage, basePath) expectedConfigs.foreach(e => { - assertEquals(e._2, tableConfig.getString(e._1)) + assertEquals(e._2, tableConfig.getString(e._1), + s"Table config ${e._1} should be ${e._2} but is ${tableConfig.getString(e._1)}") }) + nonExistentConfigs.foreach(e => assertFalse( + tableConfig.contains(e), s"$e should not be present in the table config")) } private def checkMessageContains(e: Throwable, text: String): Boolean = diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala index b9f7bb96a13a..a76ec65a991c 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala @@ -28,31 +28,67 @@ import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.validateTableConf class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase { - // "cow,false" - // "cow,true", "mor,true", "mor,false" - // "cow,true", "mor,false", "mor,true" - Seq("cow,false", "mor,false").foreach { args => + /* + "cow,false,false,8", "cow,false,true,8", "cow,true,false,8", + "cow,true,false,6", "cow,true,true,6", + "mor,false,false,8", "mor,false,true,8", "mor,true,false,8", + "mor,true,false,6", "mor,true,true,6" + */ + //"cow,true,false,6", + Seq("cow,false,false,8", "cow,false,true,8", "cow,true,false,8", + "cow,true,false,6", "cow,true,true,6", + "mor,false,false,8", "mor,false,true,8", "mor,true,false,8", + "mor,true,false,6", "mor,true,true,6").foreach { args => val argList = args.split(',') val tableType = argList(0) - val setRecordMergeMode = argList(1).toBoolean + val setRecordMergeConfigs = argList(1).toBoolean + val setUpsertOperation = argList(2).toBoolean + val tableVersion = argList(3) + val isUpsert = setUpsertOperation || (tableVersion.toInt != 6 && setRecordMergeConfigs) val storage = HoodieTestUtils.getDefaultStorage - val expectedMergeConfigs = Map( - HoodieTableConfig.RECORD_MERGE_MODE.key -> COMMIT_TIME_ORDERING.name(), - HoodieTableConfig.PAYLOAD_CLASS_NAME.key -> classOf[OverwriteWithLatestAvroPayload].getName, - HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key -> COMMIT_TIME_BASED_MERGE_STRATEGY_UUID - ) - val mergeConfigClause = if (setRecordMergeMode) { - // with precombine, dedup automatically kick in - ", preCombineField = 'ts',\nhoodie.record.merge.mode = 'COMMIT_TIME_ORDERING'" - //",\nhoodie.record.merge.mode = 'COMMIT_TIME_ORDERING'" + val mergeConfigClause = if (setRecordMergeConfigs) { + // with precombine field set, UPSERT operation is used automatically + if (tableVersion.toInt == 6) { + // Table version 6 + s", payloadClass = '${classOf[OverwriteWithLatestAvroPayload].getName}'" + } else { + // Current table version (8) + ", preCombineField = 'ts',\nhoodie.record.merge.mode = 'COMMIT_TIME_ORDERING'" + } } else { // By default, the COMMIT_TIME_ORDERING is used if not specified by the user "" } - test(s"Test $tableType table with COMMIT_TIME_ORDERING merge mode (explicitly set = $setRecordMergeMode)") { + val writeTableVersionClause = if (tableVersion.toInt == 6) { + s"hoodie.write.table.version = $tableVersion," + } else { + "" + } + val expectedMergeConfigs = if (tableVersion.toInt == 6) { + Map( + HoodieTableConfig.VERSION.key -> "6", + HoodieTableConfig.PAYLOAD_CLASS_NAME.key -> classOf[OverwriteWithLatestAvroPayload].getName) + } else { + Map( + HoodieTableConfig.VERSION.key -> "8", + HoodieTableConfig.RECORD_MERGE_MODE.key -> COMMIT_TIME_ORDERING.name(), + HoodieTableConfig.PAYLOAD_CLASS_NAME.key -> classOf[OverwriteWithLatestAvroPayload].getName, + HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key -> COMMIT_TIME_BASED_MERGE_STRATEGY_UUID) + } + val nonExistentConfigs = if (tableVersion.toInt == 6) { + Seq(HoodieTableConfig.RECORD_MERGE_MODE.key, HoodieTableConfig.PRECOMBINE_FIELD.key) + } else { + Seq(HoodieTableConfig.PRECOMBINE_FIELD.key) + } + + test(s"Test $tableType table with COMMIT_TIME_ORDERING (tableVersion=$tableVersion," + + s"setRecordMergeConfigs=$setRecordMergeConfigs,setUpsertOperation=$setUpsertOperation)") { withSparkSqlSessionConfig("hoodie.merge.small.file.group.candidates.limit" -> "0" ) { withRecordType()(withTempDir { tmp => + if (setUpsertOperation) { + spark.sql(s"set hoodie.spark.sql.insert.into.operation=upsert") + } val tableName = generateTableName // Create table with COMMIT_TIME_ORDERING spark.sql( @@ -64,13 +100,15 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase { | ts long | ) using hudi | tblproperties ( + | $writeTableVersionClause | type = '$tableType', | primaryKey = 'id' | $mergeConfigClause | ) | location '${tmp.getCanonicalPath}' """.stripMargin) - validateTableConfig(storage, tmp.getCanonicalPath, expectedMergeConfigs) + validateTableConfig( + storage, tmp.getCanonicalPath, expectedMergeConfigs, nonExistentConfigs) // Insert initial records with ts=100 spark.sql( s""" @@ -83,93 +121,104 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase { // Verify inserting records with the same ts value are visible (COMMIT_TIME_ORDERING) spark.sql( s""" - | insert into $tableName - | select 1 as id, 'A_equal' as name, 60.0 as price, 100 as ts - | union all - | select 2, 'B_equal', 70.0, 100 + | insert into $tableName + | select 1 as id, 'A_equal' as name, 60.0 as price, 100 as ts + | union all + | select 2, 'B_equal', 70.0, 100 """.stripMargin) - validateTableConfig(storage, tmp.getCanonicalPath, expectedMergeConfigs) + validateTableConfig( + storage, tmp.getCanonicalPath, expectedMergeConfigs, nonExistentConfigs) checkAnswer(s"select id, name, price, ts from $tableName order by id")( - Seq(1, "A_equal", 60.0, 100), - Seq(2, "B_equal", 70.0, 100) - ) + (if (isUpsert) { + Seq( + Seq(1, "A_equal", 60.0, 100), + Seq(2, "B_equal", 70.0, 100)) + } else { + Seq( + Seq(1, "A", 10.0, 100), + Seq(1, "A_equal", 60.0, 100), + Seq(2, "B", 20.0, 100), + Seq(2, "B_equal", 70.0, 100)) + }): _*) - // Verify updating records with the same ts value are visible (COMMIT_TIME_ORDERING) - spark.sql( - s""" - | update $tableName - | set price = 50.0, ts = 100 - | where id = 1 + if (isUpsert) { + // Verify updating records with the same ts value are visible (COMMIT_TIME_ORDERING) + spark.sql( + s""" + | update $tableName + | set price = 50.0, ts = 100 + | where id = 1 """.stripMargin) - validateTableConfig(storage, tmp.getCanonicalPath, expectedMergeConfigs) - checkAnswer(s"select id, name, price, ts from $tableName order by id")( - Seq(1, "A_equal", 50.0, 100), - Seq(2, "B_equal", 70.0, 100) - ) + validateTableConfig( + storage, tmp.getCanonicalPath, expectedMergeConfigs, nonExistentConfigs) + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "A_equal", 50.0, 100), + Seq(2, "B_equal", 70.0, 100)) - // Verify inserting records with a lower ts value are visible (COMMIT_TIME_ORDERING) - spark.sql( - s""" - | insert into $tableName - | select 1 as id, 'A' as name, 30.0 as price, 99 as ts - | union all - | select 2, 'B', 40.0, 99 + // Verify inserting records with a lower ts value are visible (COMMIT_TIME_ORDERING) + spark.sql( + s""" + | insert into $tableName + | select 1 as id, 'A' as name, 30.0 as price, 99 as ts + | union all + | select 2, 'B', 40.0, 99 """.stripMargin) - checkAnswer(s"select id, name, price, ts from $tableName order by id")( - Seq(1, "A", 30.0, 99), - Seq(2, "B", 40.0, 99) - ) + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "A", 30.0, 99), + Seq(2, "B", 40.0, 99)) - // Verify updating records with a lower ts value are visible (COMMIT_TIME_ORDERING) - spark.sql( - s""" - | update $tableName - | set price = 50.0, ts = 98 - | where id = 1 + // Verify updating records with a lower ts value are visible (COMMIT_TIME_ORDERING) + spark.sql( + s""" + | update $tableName + | set price = 50.0, ts = 98 + | where id = 1 """.stripMargin) - checkAnswer(s"select id, name, price, ts from $tableName order by id")( - Seq(1, "A", 50.0, 98), - Seq(2, "B", 40.0, 99) - ) + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "A", 50.0, 98), + Seq(2, "B", 40.0, 99)) - // Verify inserting records with a higher ts value are visible (COMMIT_TIME_ORDERING) - spark.sql( - s""" - | insert into $tableName - | select 1 as id, 'A' as name, 30.0 as price, 101 as ts - | union all - | select 2, 'B', 40.0, 101 + // Verify inserting records with a higher ts value are visible (COMMIT_TIME_ORDERING) + spark.sql( + s""" + | insert into $tableName + | select 1 as id, 'A' as name, 30.0 as price, 101 as ts + | union all + | select 2, 'B', 40.0, 101 """.stripMargin) - // Verify records with ts=101 are visible - checkAnswer(s"select id, name, price, ts from $tableName order by id")( - Seq(1, "A", 30.0, 101), - Seq(2, "B", 40.0, 101) - ) + // Verify records with ts=101 are visible + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "A", 30.0, 101), + Seq(2, "B", 40.0, 101) + ) - // Verify updating records with a higher ts value are visible (COMMIT_TIME_ORDERING) - spark.sql( - s""" - | update $tableName - | set price = 50.0, ts = 102 - | where id = 1 + // Verify updating records with a higher ts value are visible (COMMIT_TIME_ORDERING) + spark.sql( + s""" + | update $tableName + | set price = 50.0, ts = 102 + | where id = 1 """.stripMargin) - // Verify final state after all operations - checkAnswer(s"select id, name, price, ts from $tableName order by id")( - Seq(1, "A", 50.0, 102), - Seq(2, "B", 40.0, 101) - ) + // Verify final state after all operations + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "A", 50.0, 102), + Seq(2, "B", 40.0, 101) + ) - // Delete record - spark.sql(s"delete from $tableName where id = 1") - validateTableConfig(storage, tmp.getCanonicalPath, expectedMergeConfigs) - // Verify deletion - checkAnswer(s"select id, name, price, ts from $tableName order by id")( - Seq(2, "B", 40.0, 101) - ) + // Delete record + spark.sql(s"delete from $tableName where id = 1") + validateTableConfig( + storage, tmp.getCanonicalPath, expectedMergeConfigs, nonExistentConfigs) + // Verify deletion + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(2, "B", 40.0, 101) + ) + spark.sql(s"RESET hoodie.spark.sql.insert.into.operation") + } }) } } @@ -177,7 +226,8 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase { // TODO(HUDI-8468): add COW test after supporting COMMIT_TIME_ORDERING in MERGE INTO for COW if ("mor".equals(tableType)) { test(s"Test merge operations with COMMIT_TIME_ORDERING for $tableType table " - + s"(explicitly set = $setRecordMergeMode)") { + + s"(tableVersion=$tableVersion,setRecordMergeConfigs=$setRecordMergeConfigs," + + s"setUpsertOperation=$setUpsertOperation)") { withSparkSqlSessionConfig("hoodie.merge.small.file.group.candidates.limit" -> "0") { withRecordType()(withTempDir { tmp => val tableName = generateTableName @@ -197,7 +247,8 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase { | ) | location '${tmp.getCanonicalPath}' """.stripMargin) - validateTableConfig(storage, tmp.getCanonicalPath, expectedMergeConfigs) + validateTableConfig( + storage, tmp.getCanonicalPath, expectedMergeConfigs, nonExistentConfigs) // Insert initial records spark.sql( @@ -211,7 +262,8 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase { | select 5, 'E', 50.0, 100 union all | select 6, 'F', 60.0, 100 """.stripMargin) - validateTableConfig(storage, tmp.getCanonicalPath, expectedMergeConfigs) + validateTableConfig( + storage, tmp.getCanonicalPath, expectedMergeConfigs, nonExistentConfigs) // Merge operation - delete with higher, lower and equal ordering field value, all should take effect. spark.sql( @@ -240,7 +292,8 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase { """.stripMargin) // Verify state after merges - validateTableConfig(storage, tmp.getCanonicalPath, expectedMergeConfigs) + validateTableConfig( + storage, tmp.getCanonicalPath, expectedMergeConfigs, nonExistentConfigs) checkAnswer(s"select id, name, price, ts from $tableName order by id")( Seq(3, "C", 30.0, 100), Seq(4, "D2", 45.0, 101), @@ -261,7 +314,8 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase { """.stripMargin) // Verify final state - validateTableConfig(storage, tmp.getCanonicalPath, expectedMergeConfigs) + validateTableConfig( + storage, tmp.getCanonicalPath, expectedMergeConfigs, nonExistentConfigs) checkAnswer(s"select id, name, price, ts from $tableName order by id")( Seq(3, "C", 30.0, 100), Seq(4, "D2", 45.0, 101),