Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
yihua committed Jan 30, 2025
1 parent 1de418b commit 58d5534
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -326,14 +326,22 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll {
}
}

protected def withSparkSqlSessionConfig(configNameValues: (String, String)*)(f: => Unit): Unit = {
protected def withSparkSqlSessionConfig(configNameValues: (String, String)*
)(f: => Unit): Unit = {
withSparkSqlSessionConfigWithCondition(configNameValues.map(e => (e, true)): _*)(f)
}

protected def withSparkSqlSessionConfigWithCondition(configNameValues: ((String, String), Boolean)*
)(f: => Unit): Unit = {
try {
configNameValues.foreach { case (configName, configValue) =>
spark.sql(s"set $configName=$configValue")
configNameValues.foreach { case ((configName, configValue), condition) =>
if (condition) {
spark.sql(s"set $configName=$configValue")
}
}
f
} finally {
configNameValues.foreach { case (configName, _) =>
configNameValues.foreach { case ((configName, configValue), condition) =>
spark.sql(s"reset $configName")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,12 @@ import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.validateTableConf

class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase {

/*
"cow,false,false,8", "cow,false,true,8", "cow,true,false,8",
"cow,true,false,6", "cow,true,true,6",
"mor,false,false,8", "mor,false,true,8", "mor,true,false,8",
"mor,true,false,6", "mor,true,true,6"
*/
//"cow,true,false,6",
// TODO(HUDI-8938): add "mor,true,true,6" after the fix
Seq(
"cow,false,false,8", "cow,false,true,8", "cow,true,false,8",
"cow,true,false,6", "cow,true,true,6",
"mor,false,false,8", "mor,false,true,8", "mor,true,false,8",
"mor,true,false,6", "mor,true,true,6").foreach { args =>
"mor,true,false,6").foreach { args =>
val argList = args.split(',')
val tableType = argList(0)
val setRecordMergeConfigs = argList(1).toBoolean
Expand Down Expand Up @@ -88,12 +82,13 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase {

test(s"Test $tableType table with COMMIT_TIME_ORDERING (tableVersion=$tableVersion,"
+ s"setRecordMergeConfigs=$setRecordMergeConfigs,setUpsertOperation=$setUpsertOperation)") {
withSparkSqlSessionConfig("hoodie.merge.small.file.group.candidates.limit" -> "0"
withSparkSqlSessionConfigWithCondition(
("hoodie.merge.small.file.group.candidates.limit" -> "0", true),
("hoodie.spark.sql.insert.into.operation" -> "upsert", setUpsertOperation),
// TODO(HUDI-8820): enable MDT after supporting MDT with table version 6
("hoodie.metadata.enable" -> "false", tableVersion.toInt == 6),
) {
withRecordType()(withTempDir { tmp =>
if (setUpsertOperation) {
spark.sql(s"set hoodie.spark.sql.insert.into.operation=upsert")
}
val tableName = generateTableName
// Create table with COMMIT_TIME_ORDERING
spark.sql(
Expand Down Expand Up @@ -224,7 +219,6 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase {
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(2, "B", 40.0, 101)
)
spark.sql(s"RESET hoodie.spark.sql.insert.into.operation")
}
})
}
Expand All @@ -234,7 +228,12 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase {
test(s"Test merge operations with COMMIT_TIME_ORDERING for $tableType table "
+ s"(tableVersion=$tableVersion,setRecordMergeConfigs=$setRecordMergeConfigs,"
+ s"setUpsertOperation=$setUpsertOperation)") {
withSparkSqlSessionConfig("hoodie.merge.small.file.group.candidates.limit" -> "0") {
withSparkSqlSessionConfigWithCondition(
("hoodie.merge.small.file.group.candidates.limit" -> "0", true),
("hoodie.spark.sql.insert.into.operation" -> "upsert", setUpsertOperation),
// TODO(HUDI-8820): enable MDT after supporting MDT with table version 6
("hoodie.metadata.enable" -> "false", tableVersion.toInt == 6),
) {
withRecordType()(withTempDir { tmp =>
val tableName = generateTableName
// Create table with COMMIT_TIME_ORDERING
Expand Down Expand Up @@ -272,18 +271,22 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase {
validateTableConfig(
storage, tmp.getCanonicalPath, expectedMergeConfigs, nonExistentConfigs)

// TODO(HUDI-8840): enable MERGE INTO with deletes
val shouldTestMergeIntoDelete = setRecordMergeConfigs && tableVersion.toInt == 8
// Merge operation - delete with higher, lower and equal ordering field value, all should take effect.
spark.sql(
s"""
| merge into $tableName t
| using (
| select 1 as id, 'B2' as name, 25.0 as price, 101L as ts union all
| select 2, '', 55.0, 99L as ts union all
| select 0, '', 55.0, 100L as ts
| ) s
| on t.id = s.id
| when matched then delete
if (shouldTestMergeIntoDelete) {
spark.sql(
s"""
| merge into $tableName t
| using (
| select 1 as id, 'B2' as name, 25.0 as price, 101L as ts union all
| select 2, '', 55.0, 99L as ts union all
| select 0, '', 55.0, 100L as ts
| ) s
| on t.id = s.id
| when matched then delete
""".stripMargin)
}

// Merge operation - update with mixed ts values
spark.sql(
Expand All @@ -301,11 +304,20 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase {
// Verify state after merges
validateTableConfig(
storage, tmp.getCanonicalPath, expectedMergeConfigs, nonExistentConfigs)
val nonDeletedRows: Seq[Seq[Any]] = if (shouldTestMergeIntoDelete) {
Seq()
} else {
Seq(Seq(0, "X", 20.0, 100),
Seq(1, "A", 10.0, 100),
Seq(2, "B", 20.0, 100))
}
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(3, "C", 30.0, 100),
Seq(4, "D2", 45.0, 101),
Seq(5, "E2", 55.0, 99),
Seq(6, "F2", 65.0, 100)
(nonDeletedRows ++ Seq(
Seq(3, "C", 30.0, 100),
Seq(4, "D2", 45.0, 101),
Seq(5, "E2", 55.0, 99),
Seq(6, "F2", 65.0, 100)
)): _*,
)

// Insert new records through merge
Expand All @@ -324,12 +336,14 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase {
validateTableConfig(
storage, tmp.getCanonicalPath, expectedMergeConfigs, nonExistentConfigs)
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(3, "C", 30.0, 100),
Seq(4, "D2", 45.0, 101),
Seq(5, "E2", 55.0, 99),
Seq(6, "F2", 65.0, 100),
Seq(7, "D2", 45.0, 100),
Seq(8, "E2", 55.0, 100)
(nonDeletedRows ++ Seq(
Seq(3, "C", 30.0, 100),
Seq(4, "D2", 45.0, 101),
Seq(5, "E2", 55.0, 99),
Seq(6, "F2", 65.0, 100),
Seq(7, "D2", 45.0, 100),
Seq(8, "E2", 55.0, 100)
)): _*
)
})
}
Expand Down

0 comments on commit 58d5534

Please sign in to comment.