Skip to content

Commit

Permalink
Improve tests
Browse files Browse the repository at this point in the history
  • Loading branch information
yihua committed Jan 29, 2025
1 parent d7666bd commit 2cdb44e
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.checkMessageConta
import org.apache.spark.sql.types.StructField
import org.apache.spark.util.Utils
import org.joda.time.DateTimeZone
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse}
import org.scalactic.source
import org.scalatest.{BeforeAndAfterAll, FunSuite, Tag}
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -380,11 +380,15 @@ object HoodieSparkSqlTestBase {

def validateTableConfig(storage: HoodieStorage,
basePath: String,
expectedConfigs: Map[String, String]): Unit = {
expectedConfigs: Map[String, String],
nonExistentConfigs: Seq[String]): Unit = {
val tableConfig = HoodieTableConfig.loadFromHoodieProps(storage, basePath)
expectedConfigs.foreach(e => {
assertEquals(e._2, tableConfig.getString(e._1))
assertEquals(e._2, tableConfig.getString(e._1),
s"Table config ${e._1} should be ${e._2} but is ${tableConfig.getString(e._1)}")
})
nonExistentConfigs.foreach(e => assertFalse(
tableConfig.contains(e), s"$e should not be present in the table config"))
}

private def checkMessageContains(e: Throwable, text: String): Boolean =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,31 +28,67 @@ import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.validateTableConf

class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase {

// "cow,false"
// "cow,true", "mor,true", "mor,false"
// "cow,true", "mor,false", "mor,true"
Seq("cow,false", "mor,false").foreach { args =>
/*
"cow,false,false,8", "cow,false,true,8", "cow,true,false,8",
"cow,true,false,6", "cow,true,true,6",
"mor,false,false,8", "mor,false,true,8", "mor,true,false,8",
"mor,true,false,6", "mor,true,true,6"
*/
//"cow,true,false,6",
Seq("cow,false,false,8", "cow,false,true,8", "cow,true,false,8",
"cow,true,false,6", "cow,true,true,6",
"mor,false,false,8", "mor,false,true,8", "mor,true,false,8",
"mor,true,false,6", "mor,true,true,6").foreach { args =>
val argList = args.split(',')
val tableType = argList(0)
val setRecordMergeMode = argList(1).toBoolean
val setRecordMergeConfigs = argList(1).toBoolean
val setUpsertOperation = argList(2).toBoolean
val tableVersion = argList(3)
val isUpsert = setUpsertOperation || (tableVersion.toInt != 6 && setRecordMergeConfigs)
val storage = HoodieTestUtils.getDefaultStorage
val expectedMergeConfigs = Map(
HoodieTableConfig.RECORD_MERGE_MODE.key -> COMMIT_TIME_ORDERING.name(),
HoodieTableConfig.PAYLOAD_CLASS_NAME.key -> classOf[OverwriteWithLatestAvroPayload].getName,
HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key -> COMMIT_TIME_BASED_MERGE_STRATEGY_UUID
)
val mergeConfigClause = if (setRecordMergeMode) {
// with precombine, dedup automatically kick in
", preCombineField = 'ts',\nhoodie.record.merge.mode = 'COMMIT_TIME_ORDERING'"
//",\nhoodie.record.merge.mode = 'COMMIT_TIME_ORDERING'"
val mergeConfigClause = if (setRecordMergeConfigs) {
// with precombine field set, UPSERT operation is used automatically
if (tableVersion.toInt == 6) {
// Table version 6
s", payloadClass = '${classOf[OverwriteWithLatestAvroPayload].getName}'"
} else {
// Current table version (8)
", preCombineField = 'ts',\nhoodie.record.merge.mode = 'COMMIT_TIME_ORDERING'"
}
} else {
// By default, the COMMIT_TIME_ORDERING is used if not specified by the user
""
}
test(s"Test $tableType table with COMMIT_TIME_ORDERING merge mode (explicitly set = $setRecordMergeMode)") {
val writeTableVersionClause = if (tableVersion.toInt == 6) {
s"hoodie.write.table.version = $tableVersion,"
} else {
""
}
val expectedMergeConfigs = if (tableVersion.toInt == 6) {
Map(
HoodieTableConfig.VERSION.key -> "6",
HoodieTableConfig.PAYLOAD_CLASS_NAME.key -> classOf[OverwriteWithLatestAvroPayload].getName)
} else {
Map(
HoodieTableConfig.VERSION.key -> "8",
HoodieTableConfig.RECORD_MERGE_MODE.key -> COMMIT_TIME_ORDERING.name(),
HoodieTableConfig.PAYLOAD_CLASS_NAME.key -> classOf[OverwriteWithLatestAvroPayload].getName,
HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key -> COMMIT_TIME_BASED_MERGE_STRATEGY_UUID)
}
val nonExistentConfigs = if (tableVersion.toInt == 6) {
Seq(HoodieTableConfig.RECORD_MERGE_MODE.key, HoodieTableConfig.PRECOMBINE_FIELD.key)
} else {
Seq(HoodieTableConfig.PRECOMBINE_FIELD.key)
}

test(s"Test $tableType table with COMMIT_TIME_ORDERING (tableVersion=$tableVersion,"
+ s"setRecordMergeConfigs=$setRecordMergeConfigs,setUpsertOperation=$setUpsertOperation)") {
withSparkSqlSessionConfig("hoodie.merge.small.file.group.candidates.limit" -> "0"
) {
withRecordType()(withTempDir { tmp =>
if (setUpsertOperation) {
spark.sql(s"set hoodie.spark.sql.insert.into.operation=upsert")
}
val tableName = generateTableName
// Create table with COMMIT_TIME_ORDERING
spark.sql(
Expand All @@ -64,13 +100,15 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase {
| ts long
| ) using hudi
| tblproperties (
| $writeTableVersionClause
| type = '$tableType',
| primaryKey = 'id'
| $mergeConfigClause
| )
| location '${tmp.getCanonicalPath}'
""".stripMargin)
validateTableConfig(storage, tmp.getCanonicalPath, expectedMergeConfigs)
validateTableConfig(
storage, tmp.getCanonicalPath, expectedMergeConfigs, nonExistentConfigs)
// Insert initial records with ts=100
spark.sql(
s"""
Expand All @@ -83,101 +121,113 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase {
// Verify inserting records with the same ts value are visible (COMMIT_TIME_ORDERING)
spark.sql(
s"""
| insert into $tableName
| select 1 as id, 'A_equal' as name, 60.0 as price, 100 as ts
| union all
| select 2, 'B_equal', 70.0, 100
| insert into $tableName
| select 1 as id, 'A_equal' as name, 60.0 as price, 100 as ts
| union all
| select 2, 'B_equal', 70.0, 100
""".stripMargin)
validateTableConfig(storage, tmp.getCanonicalPath, expectedMergeConfigs)
validateTableConfig(
storage, tmp.getCanonicalPath, expectedMergeConfigs, nonExistentConfigs)
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "A_equal", 60.0, 100),
Seq(2, "B_equal", 70.0, 100)
)
(if (isUpsert) {
Seq(
Seq(1, "A_equal", 60.0, 100),
Seq(2, "B_equal", 70.0, 100))
} else {
Seq(
Seq(1, "A", 10.0, 100),
Seq(1, "A_equal", 60.0, 100),
Seq(2, "B", 20.0, 100),
Seq(2, "B_equal", 70.0, 100))
}): _*)

// Verify updating records with the same ts value are visible (COMMIT_TIME_ORDERING)
spark.sql(
s"""
| update $tableName
| set price = 50.0, ts = 100
| where id = 1
if (isUpsert) {
// Verify updating records with the same ts value are visible (COMMIT_TIME_ORDERING)
spark.sql(
s"""
| update $tableName
| set price = 50.0, ts = 100
| where id = 1
""".stripMargin)
validateTableConfig(storage, tmp.getCanonicalPath, expectedMergeConfigs)
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "A_equal", 50.0, 100),
Seq(2, "B_equal", 70.0, 100)
)
validateTableConfig(
storage, tmp.getCanonicalPath, expectedMergeConfigs, nonExistentConfigs)
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "A_equal", 50.0, 100),
Seq(2, "B_equal", 70.0, 100))

// Verify inserting records with a lower ts value are visible (COMMIT_TIME_ORDERING)
spark.sql(
s"""
| insert into $tableName
| select 1 as id, 'A' as name, 30.0 as price, 99 as ts
| union all
| select 2, 'B', 40.0, 99
// Verify inserting records with a lower ts value are visible (COMMIT_TIME_ORDERING)
spark.sql(
s"""
| insert into $tableName
| select 1 as id, 'A' as name, 30.0 as price, 99 as ts
| union all
| select 2, 'B', 40.0, 99
""".stripMargin)

checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "A", 30.0, 99),
Seq(2, "B", 40.0, 99)
)
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "A", 30.0, 99),
Seq(2, "B", 40.0, 99))

// Verify updating records with a lower ts value are visible (COMMIT_TIME_ORDERING)
spark.sql(
s"""
| update $tableName
| set price = 50.0, ts = 98
| where id = 1
// Verify updating records with a lower ts value are visible (COMMIT_TIME_ORDERING)
spark.sql(
s"""
| update $tableName
| set price = 50.0, ts = 98
| where id = 1
""".stripMargin)

checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "A", 50.0, 98),
Seq(2, "B", 40.0, 99)
)
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "A", 50.0, 98),
Seq(2, "B", 40.0, 99))

// Verify inserting records with a higher ts value are visible (COMMIT_TIME_ORDERING)
spark.sql(
s"""
| insert into $tableName
| select 1 as id, 'A' as name, 30.0 as price, 101 as ts
| union all
| select 2, 'B', 40.0, 101
// Verify inserting records with a higher ts value are visible (COMMIT_TIME_ORDERING)
spark.sql(
s"""
| insert into $tableName
| select 1 as id, 'A' as name, 30.0 as price, 101 as ts
| union all
| select 2, 'B', 40.0, 101
""".stripMargin)

// Verify records with ts=101 are visible
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "A", 30.0, 101),
Seq(2, "B", 40.0, 101)
)
// Verify records with ts=101 are visible
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "A", 30.0, 101),
Seq(2, "B", 40.0, 101)
)

// Verify updating records with a higher ts value are visible (COMMIT_TIME_ORDERING)
spark.sql(
s"""
| update $tableName
| set price = 50.0, ts = 102
| where id = 1
// Verify updating records with a higher ts value are visible (COMMIT_TIME_ORDERING)
spark.sql(
s"""
| update $tableName
| set price = 50.0, ts = 102
| where id = 1
""".stripMargin)

// Verify final state after all operations
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "A", 50.0, 102),
Seq(2, "B", 40.0, 101)
)
// Verify final state after all operations
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "A", 50.0, 102),
Seq(2, "B", 40.0, 101)
)

// Delete record
spark.sql(s"delete from $tableName where id = 1")
validateTableConfig(storage, tmp.getCanonicalPath, expectedMergeConfigs)
// Verify deletion
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(2, "B", 40.0, 101)
)
// Delete record
spark.sql(s"delete from $tableName where id = 1")
validateTableConfig(
storage, tmp.getCanonicalPath, expectedMergeConfigs, nonExistentConfigs)
// Verify deletion
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(2, "B", 40.0, 101)
)
spark.sql(s"RESET hoodie.spark.sql.insert.into.operation")
}
})
}
}

// TODO(HUDI-8468): add COW test after supporting COMMIT_TIME_ORDERING in MERGE INTO for COW
if ("mor".equals(tableType)) {
test(s"Test merge operations with COMMIT_TIME_ORDERING for $tableType table "
+ s"(explicitly set = $setRecordMergeMode)") {
+ s"(tableVersion=$tableVersion,setRecordMergeConfigs=$setRecordMergeConfigs,"
+ s"setUpsertOperation=$setUpsertOperation)") {
withSparkSqlSessionConfig("hoodie.merge.small.file.group.candidates.limit" -> "0") {
withRecordType()(withTempDir { tmp =>
val tableName = generateTableName
Expand All @@ -197,7 +247,8 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase {
| )
| location '${tmp.getCanonicalPath}'
""".stripMargin)
validateTableConfig(storage, tmp.getCanonicalPath, expectedMergeConfigs)
validateTableConfig(
storage, tmp.getCanonicalPath, expectedMergeConfigs, nonExistentConfigs)

// Insert initial records
spark.sql(
Expand All @@ -211,7 +262,8 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase {
| select 5, 'E', 50.0, 100 union all
| select 6, 'F', 60.0, 100
""".stripMargin)
validateTableConfig(storage, tmp.getCanonicalPath, expectedMergeConfigs)
validateTableConfig(
storage, tmp.getCanonicalPath, expectedMergeConfigs, nonExistentConfigs)

// Merge operation - delete with higher, lower and equal ordering field value, all should take effect.
spark.sql(
Expand Down Expand Up @@ -240,7 +292,8 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase {
""".stripMargin)

// Verify state after merges
validateTableConfig(storage, tmp.getCanonicalPath, expectedMergeConfigs)
validateTableConfig(
storage, tmp.getCanonicalPath, expectedMergeConfigs, nonExistentConfigs)
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(3, "C", 30.0, 100),
Seq(4, "D2", 45.0, 101),
Expand All @@ -261,7 +314,8 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase {
""".stripMargin)

// Verify final state
validateTableConfig(storage, tmp.getCanonicalPath, expectedMergeConfigs)
validateTableConfig(
storage, tmp.getCanonicalPath, expectedMergeConfigs, nonExistentConfigs)
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(3, "C", 30.0, 100),
Seq(4, "D2", 45.0, 101),
Expand Down

0 comments on commit 2cdb44e

Please sign in to comment.