From 27a950ee22320d7b447fd3e31693ae9290dd5842 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Thu, 30 Jan 2025 02:07:55 -0800 Subject: [PATCH] [HUDI-8850] Fix record merge mode related issues and improve test coverage in Spark SQL (#12725) This PR fixes record merge mode issues and improves test coverage in Spark SQL: - Table version 6 lacks record merge mode in the config. The file group reader now infers it if missing, enabling v6 table reads. - For INSERT INTO in Spark SQL, buildHoodieInsertConfig didn't set record merge mode properly. The logic is now removed. - The SQL writer now gets the correct record merge mode from the table config automatically. - In MERGE INTO in Spark SQL, the record merge mode check is fixed to avoid NPE. - More tests are added in TestMergeModeCommitTimeOrdering and TestMergeModeEventTimeOrdering for different merge modes. The tests cover both table version 6 and 8. --- .../table/read/HoodieFileGroupReader.java | 16 +- .../spark/sql/hudi/ProvidesHoodieConfig.scala | 18 +- .../command/MergeIntoHoodieTableCommand.scala | 13 +- .../hudi/common/HoodieSparkSqlTestBase.scala | 33 +- .../sql/hudi/dml/TestMergeIntoTable2.scala | 3 +- .../dml/TestMergeModeCommitTimeOrdering.scala | 349 ++++++++++++------ .../dml/TestMergeModeEventTimeOrdering.scala | 263 ++++++++----- 7 files changed, 448 insertions(+), 247 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java index bf9653efbf35..e77bded8a77a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java @@ -30,6 +30,7 @@ import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.HoodieTableVersion; import org.apache.hudi.common.table.log.HoodieMergedLogRecordReader; import org.apache.hudi.common.util.ConfigUtils; import org.apache.hudi.common.util.Option; @@ -37,6 +38,7 @@ import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.common.util.collection.EmptyIterator; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.common.util.collection.Triple; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.storage.HoodieStorage; @@ -102,9 +104,17 @@ public HoodieFileGroupReader(HoodieReaderContext readerContext, this.start = start; this.length = length; HoodieTableConfig tableConfig = hoodieTableMetaClient.getTableConfig(); + RecordMergeMode recordMergeMode = tableConfig.getRecordMergeMode(); + String mergeStrategyId = tableConfig.getRecordMergeStrategyId(); + if (!tableConfig.getTableVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT)) { + Triple triple = HoodieTableConfig.inferCorrectMergingBehavior( + recordMergeMode, tableConfig.getPayloadClass(), + mergeStrategyId, null); + recordMergeMode = triple.getLeft(); + mergeStrategyId = triple.getRight(); + } readerContext.setRecordMerger(readerContext.getRecordMerger( - tableConfig.getRecordMergeMode(), - tableConfig.getRecordMergeStrategyId(), + recordMergeMode, mergeStrategyId, props.getString(RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY, props.getString(RECORD_MERGE_IMPL_CLASSES_DEPRECATED_WRITE_CONFIG_KEY, "")))); readerContext.setTablePath(tablePath); @@ -122,7 +132,7 @@ public HoodieFileGroupReader(HoodieReaderContext readerContext, this.outputConverter = readerContext.getSchemaHandler().getOutputConverter(); this.readStats = new HoodieReadStats(); this.recordBuffer = getRecordBuffer(readerContext, hoodieTableMetaClient, - tableConfig.getRecordMergeMode(), props, hoodieBaseFileOption, this.logFiles.isEmpty(), + recordMergeMode, props, hoodieBaseFileOption, this.logFiles.isEmpty(), isSkipMerge, shouldUseRecordPosition, readStats); } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala index 7de0617b23b2..d7de38e9de9d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala @@ -21,8 +21,8 @@ import org.apache.hudi.{DataSourceWriteOptions, HoodieFileIndex} import org.apache.hudi.AutoRecordKeyGenerationUtils.shouldAutoGenerateRecordKeys import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.HoodieConversionUtils.toProperties -import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieCommonConfig, RecordMergeMode, TypedProperties} -import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecordMerger, WriteOperationType} +import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieCommonConfig, TypedProperties} +import org.apache.hudi.common.model.WriteOperationType import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.common.table.HoodieTableConfig.DATABASE_NAME import org.apache.hudi.common.util.{ReflectionUtils, StringUtils} @@ -257,21 +257,7 @@ trait ProvidesHoodieConfig extends Logging { Map() } - val deducedPayloadClassName = classOf[DefaultHoodieRecordPayload].getCanonicalName - val recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING.name - val recordMergeStrategy = HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID - - if (tableConfig.getPayloadClass.equals(classOf[DefaultHoodieRecordPayload].getCanonicalName) && - RecordMergeMode.EVENT_TIME_ORDERING.equals(tableConfig.getRecordMergeMode)) { - tableConfig.clearValue(HoodieTableConfig.PAYLOAD_CLASS_NAME) - tableConfig.clearValue(HoodieTableConfig.RECORD_MERGE_MODE) - tableConfig.clearValue(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID) - } - val defaultOpts = Map( - DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key -> deducedPayloadClassName, - DataSourceWriteOptions.RECORD_MERGE_MODE.key -> recordMergeMode, - DataSourceWriteOptions.RECORD_MERGE_STRATEGY_ID.key() -> recordMergeStrategy, // NOTE: By default insert would try to do deduplication in case that pre-combine column is specified // for the table HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(combineBeforeInsert), diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index f90fbef8e824..d6545911c1b2 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -28,14 +28,15 @@ import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys import org.apache.hudi.common.util.StringUtils import org.apache.hudi.config.HoodieWriteConfig -import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE, RECORD_MERGE_MODE, SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP, TBL_NAME, WRITE_PARTIAL_UPDATE_SCHEMA} +import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE, SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP, TBL_NAME, WRITE_PARTIAL_UPDATE_SCHEMA} import org.apache.hudi.exception.{HoodieException, HoodieNotSupportedException} import org.apache.hudi.hive.HiveSyncConfigHolder import org.apache.hudi.sync.common.HoodieSyncConfig import org.apache.hudi.util.JFunction.scalaFunction1Noop + import org.apache.avro.Schema import org.apache.spark.sql._ -import org.apache.spark.sql.HoodieCatalystExpressionUtils.{MatchCast, attributeEquals} +import org.apache.spark.sql.HoodieCatalystExpressionUtils.{attributeEquals, MatchCast} import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, BoundReference, EqualTo, Expression, Literal, NamedExpression, PredicateHelper} @@ -46,13 +47,14 @@ import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._ import org.apache.spark.sql.hudi.ProvidesHoodieConfig import org.apache.spark.sql.hudi.ProvidesHoodieConfig.{combineOptions, getPartitionPathFieldWriteConfig} import org.apache.spark.sql.hudi.analysis.HoodieAnalysis.failAnalysis -import org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.{CoercedAttributeReference, encodeAsBase64String, stripCasting, toStructType, userGuideString, validateTargetTableAttrExistsInAssignments} +import org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.{encodeAsBase64String, stripCasting, toStructType, userGuideString, validateTargetTableAttrExistsInAssignments, CoercedAttributeReference} import org.apache.spark.sql.hudi.command.PartialAssignmentMode.PartialAssignmentMode import org.apache.spark.sql.hudi.command.payload.ExpressionPayload import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._ import org.apache.spark.sql.types.{BooleanType, StructField, StructType} import java.util.Base64 + import scala.collection.JavaConverters._ /** @@ -817,8 +819,9 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie // Precombine field and record key field must be present in the assignment clause of all insert actions for event time ordering mode. // Check has no effect if we don't have such fields in target table or we don't have insert actions // Please note we are relying on merge mode in the table config as writer merge mode is always "CUSTOM" for MIT. - if (getStringWithAltKeys(props.asJava.asInstanceOf[java.util.Map[String, Object]], HoodieTableConfig.RECORD_MERGE_MODE) - .equals(RecordMergeMode.EVENT_TIME_ORDERING.name())) { + if (RecordMergeMode.EVENT_TIME_ORDERING.name() + .equals(getStringWithAltKeys(props.asJava.asInstanceOf[java.util.Map[String, Object]], + HoodieTableConfig.RECORD_MERGE_MODE))) { insertActions.foreach(action => hoodieCatalogTable.preCombineKey.foreach( field => { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala index 16187ba3fd92..0df0f89922bd 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala @@ -23,10 +23,12 @@ import org.apache.hudi.common.config.HoodieStorageConfig import org.apache.hudi.common.model.{HoodieAvroRecordMerger, HoodieRecord} import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType import org.apache.hudi.common.table.timeline.TimelineMetadataUtils +import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.ExceptionUtil.getRootCause import org.apache.hudi.hadoop.fs.HadoopFSUtils import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex +import org.apache.hudi.storage.HoodieStorage import org.apache.hudi.testutils.HoodieClientTestUtils.{createMetaClient, getSparkConfForTest} import org.apache.hadoop.fs.Path @@ -37,6 +39,7 @@ import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.checkMessageConta import org.apache.spark.sql.types.StructField import org.apache.spark.util.Utils import org.joda.time.DateTimeZone +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse} import org.scalactic.source import org.scalatest.{BeforeAndAfterAll, FunSuite, Tag} import org.slf4j.LoggerFactory @@ -323,14 +326,22 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll { } } - protected def withSparkSqlSessionConfig(configNameValues: (String, String)*)(f: => Unit): Unit = { + protected def withSparkSqlSessionConfig(configNameValues: (String, String)* + )(f: => Unit): Unit = { + withSparkSqlSessionConfigWithCondition(configNameValues.map(e => (e, true)): _*)(f) + } + + protected def withSparkSqlSessionConfigWithCondition(configNameValues: ((String, String), Boolean)* + )(f: => Unit): Unit = { try { - configNameValues.foreach { case (configName, configValue) => - spark.sql(s"set $configName=$configValue") + configNameValues.foreach { case ((configName, configValue), condition) => + if (condition) { + spark.sql(s"set $configName=$configValue") + } } f } finally { - configNameValues.foreach { case (configName, _) => + configNameValues.foreach { case ((configName, configValue), condition) => spark.sql(s"reset $configName") } } @@ -375,7 +386,19 @@ object HoodieSparkSqlTestBase { .getActiveTimeline.getInstantDetails(cleanInstant).get) } + def validateTableConfig(storage: HoodieStorage, + basePath: String, + expectedConfigs: Map[String, String], + nonExistentConfigs: Seq[String]): Unit = { + val tableConfig = HoodieTableConfig.loadFromHoodieProps(storage, basePath) + expectedConfigs.foreach(e => { + assertEquals(e._2, tableConfig.getString(e._1), + s"Table config ${e._1} should be ${e._2} but is ${tableConfig.getString(e._1)}") + }) + nonExistentConfigs.foreach(e => assertFalse( + tableConfig.contains(e), s"$e should not be present in the table config")) + } + private def checkMessageContains(e: Throwable, text: String): Boolean = e.getMessage.trim.contains(text.trim) - } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala index 1e504fecd0c2..bd8f7676e028 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala @@ -24,7 +24,6 @@ import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient import org.apache.spark.sql.Row import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase - import org.slf4j.LoggerFactory class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { @@ -1254,7 +1253,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { | ts BIGINT |) using hudi |TBLPROPERTIES ( - | type = 'cow', + | type = '$tableType', | primaryKey = 'id', | preCombineField = 'ts', | recordMergeMode = '$recordMergeMode' diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala index a0af78797e60..67e7d992b193 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala @@ -17,15 +17,76 @@ package org.apache.spark.sql.hudi.dml +import org.apache.hudi.common.config.RecordMergeMode.COMMIT_TIME_ORDERING +import org.apache.hudi.common.model.HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID +import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload +import org.apache.hudi.common.table.HoodieTableConfig +import org.apache.hudi.common.testutils.HoodieTestUtils + import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase +import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.validateTableConfig class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase { - Seq("mor").foreach { tableType => - // [HUDI-8850] For COW commit time ordering does not work. - // Seq("cow", "mor").foreach { tableType => - test(s"Test $tableType table with COMMIT_TIME_ORDERING merge mode") { - withSparkSqlSessionConfig("hoodie.merge.small.file.group.candidates.limit" -> "0" + // TODO(HUDI-8938): add "mor,true,true,6" after the fix + Seq( + "cow,8,false,false", "cow,8,false,true", "cow,8,true,false", + "cow,6,true,false", "cow,6,true,true", + "mor,8,false,false", "mor,8,false,true", "mor,8,true,false", + "mor,6,true,false").foreach { args => + val argList = args.split(',') + val tableType = argList(0) + val tableVersion = argList(1) + val setRecordMergeConfigs = argList(2).toBoolean + val setUpsertOperation = argList(3).toBoolean + val isUpsert = setUpsertOperation || (tableVersion.toInt != 6 && setRecordMergeConfigs) + val storage = HoodieTestUtils.getDefaultStorage + val mergeConfigClause = if (setRecordMergeConfigs) { + // with precombine field set, UPSERT operation is used automatically + if (tableVersion.toInt == 6) { + // Table version 6 + s", payloadClass = '${classOf[OverwriteWithLatestAvroPayload].getName}'" + } else { + // Current table version (8) + ", preCombineField = 'ts',\nhoodie.record.merge.mode = 'COMMIT_TIME_ORDERING'" + } + } else { + // By default, the COMMIT_TIME_ORDERING is used if not specified by the user + "" + } + val writeTableVersionClause = if (tableVersion.toInt == 6) { + s"hoodie.write.table.version = $tableVersion," + } else { + "" + } + val expectedMergeConfigs = if (tableVersion.toInt == 6) { + Map( + HoodieTableConfig.VERSION.key -> "6", + HoodieTableConfig.PAYLOAD_CLASS_NAME.key -> classOf[OverwriteWithLatestAvroPayload].getName) + } else { + Map( + HoodieTableConfig.VERSION.key -> "8", + HoodieTableConfig.RECORD_MERGE_MODE.key -> COMMIT_TIME_ORDERING.name(), + HoodieTableConfig.PAYLOAD_CLASS_NAME.key -> classOf[OverwriteWithLatestAvroPayload].getName, + HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key -> COMMIT_TIME_BASED_MERGE_STRATEGY_UUID) + } + val nonExistentConfigs = if (tableVersion.toInt == 6) { + Seq(HoodieTableConfig.RECORD_MERGE_MODE.key, HoodieTableConfig.PRECOMBINE_FIELD.key) + } else { + if (setRecordMergeConfigs) { + Seq() + } else { + Seq(HoodieTableConfig.PRECOMBINE_FIELD.key) + } + } + + test(s"Test $tableType table with COMMIT_TIME_ORDERING (tableVersion=$tableVersion," + + s"setRecordMergeConfigs=$setRecordMergeConfigs,setUpsertOperation=$setUpsertOperation)") { + withSparkSqlSessionConfigWithCondition( + ("hoodie.merge.small.file.group.candidates.limit" -> "0", true), + ("hoodie.spark.sql.insert.into.operation" -> "upsert", setUpsertOperation), + // TODO(HUDI-8820): enable MDT after supporting MDT with table version 6 + ("hoodie.metadata.enable" -> "false", tableVersion.toInt == 6) ) { withRecordType()(withTempDir { tmp => val tableName = generateTableName @@ -39,14 +100,15 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase { | ts long | ) using hudi | tblproperties ( + | $writeTableVersionClause | type = '$tableType', - | primaryKey = 'id', - | preCombineField = 'ts', - | hoodie.record.merge.mode = 'COMMIT_TIME_ORDERING' + | primaryKey = 'id' + | $mergeConfigClause | ) | location '${tmp.getCanonicalPath}' """.stripMargin) - + validateTableConfig( + storage, tmp.getCanonicalPath, expectedMergeConfigs, nonExistentConfigs) // Insert initial records with ts=100 spark.sql( s""" @@ -59,99 +121,119 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase { // Verify inserting records with the same ts value are visible (COMMIT_TIME_ORDERING) spark.sql( s""" - | insert into $tableName - | select 1 as id, 'A_equal' as name, 60.0 as price, 100 as ts - | union all - | select 2, 'B_equal', 70.0, 100 + | insert into $tableName + | select 1 as id, 'A_equal' as name, 60.0 as price, 100 as ts + | union all + | select 2, 'B_equal', 70.0, 100 """.stripMargin) - + validateTableConfig( + storage, tmp.getCanonicalPath, expectedMergeConfigs, nonExistentConfigs) checkAnswer(s"select id, name, price, ts from $tableName order by id")( - Seq(1, "A_equal", 60.0, 100), - Seq(2, "B_equal", 70.0, 100) - ) + (if (isUpsert) { + // With UPSERT operation, there is no duplicate + Seq( + Seq(1, "A_equal", 60.0, 100), + Seq(2, "B_equal", 70.0, 100)) + } else { + // With INSERT operation, there are duplicates + Seq( + Seq(1, "A", 10.0, 100), + Seq(1, "A_equal", 60.0, 100), + Seq(2, "B", 20.0, 100), + Seq(2, "B_equal", 70.0, 100)) + }): _*) - // Verify updating records with the same ts value are visible (COMMIT_TIME_ORDERING) - spark.sql( - s""" - | update $tableName - | set price = 50.0, ts = 100 - | where id = 1 + if (isUpsert) { + // Verify updating records with the same ts value are visible (COMMIT_TIME_ORDERING) + spark.sql( + s""" + | update $tableName + | set price = 50.0, ts = 100 + | where id = 1 """.stripMargin) + validateTableConfig( + storage, tmp.getCanonicalPath, expectedMergeConfigs, nonExistentConfigs) + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "A_equal", 50.0, 100), + Seq(2, "B_equal", 70.0, 100)) - checkAnswer(s"select id, name, price, ts from $tableName order by id")( - Seq(1, "A_equal", 50.0, 100), - Seq(2, "B_equal", 70.0, 100) - ) - - // Verify inserting records with a lower ts value are visible (COMMIT_TIME_ORDERING) - spark.sql( - s""" - | insert into $tableName - | select 1 as id, 'A' as name, 30.0 as price, 99 as ts - | union all - | select 2, 'B', 40.0, 99 + // Verify inserting records with a lower ts value are visible (COMMIT_TIME_ORDERING) + spark.sql( + s""" + | insert into $tableName + | select 1 as id, 'A' as name, 30.0 as price, 99 as ts + | union all + | select 2, 'B', 40.0, 99 """.stripMargin) - checkAnswer(s"select id, name, price, ts from $tableName order by id")( - Seq(1, "A", 30.0, 99), - Seq(2, "B", 40.0, 99) - ) + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "A", 30.0, 99), + Seq(2, "B", 40.0, 99)) - // Verify updating records with a lower ts value are visible (COMMIT_TIME_ORDERING) - spark.sql( - s""" - | update $tableName - | set price = 50.0, ts = 98 - | where id = 1 + // Verify updating records with a lower ts value are visible (COMMIT_TIME_ORDERING) + spark.sql( + s""" + | update $tableName + | set price = 50.0, ts = 98 + | where id = 1 """.stripMargin) - checkAnswer(s"select id, name, price, ts from $tableName order by id")( - Seq(1, "A", 50.0, 98), - Seq(2, "B", 40.0, 99) - ) + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "A", 50.0, 98), + Seq(2, "B", 40.0, 99)) - // Verify inserting records with a higher ts value are visible (COMMIT_TIME_ORDERING) - spark.sql( - s""" - | insert into $tableName - | select 1 as id, 'A' as name, 30.0 as price, 101 as ts - | union all - | select 2, 'B', 40.0, 101 + // Verify inserting records with a higher ts value are visible (COMMIT_TIME_ORDERING) + spark.sql( + s""" + | insert into $tableName + | select 1 as id, 'A' as name, 30.0 as price, 101 as ts + | union all + | select 2, 'B', 40.0, 101 """.stripMargin) - // Verify records with ts=101 are visible - checkAnswer(s"select id, name, price, ts from $tableName order by id")( - Seq(1, "A", 30.0, 101), - Seq(2, "B", 40.0, 101) - ) + // Verify records with ts=101 are visible + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "A", 30.0, 101), + Seq(2, "B", 40.0, 101) + ) - // Verify updating records with a higher ts value are visible (COMMIT_TIME_ORDERING) - spark.sql( - s""" - | update $tableName - | set price = 50.0, ts = 102 - | where id = 1 + // Verify updating records with a higher ts value are visible (COMMIT_TIME_ORDERING) + spark.sql( + s""" + | update $tableName + | set price = 50.0, ts = 102 + | where id = 1 """.stripMargin) - // Verify final state after all operations - checkAnswer(s"select id, name, price, ts from $tableName order by id")( - Seq(1, "A", 50.0, 102), - Seq(2, "B", 40.0, 101) - ) - - // Delete record - spark.sql(s"delete from $tableName where id = 1") + // Verify final state after all operations + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "A", 50.0, 102), + Seq(2, "B", 40.0, 101) + ) - // Verify deletion - checkAnswer(s"select id, name, price, ts from $tableName order by id")( - Seq(2, "B", 40.0, 101) - ) + // Delete record + spark.sql(s"delete from $tableName where id = 1") + validateTableConfig( + storage, tmp.getCanonicalPath, expectedMergeConfigs, nonExistentConfigs) + // Verify deletion + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(2, "B", 40.0, 101) + ) + } }) } } - test(s"Test merge operations with COMMIT_TIME_ORDERING for $tableType table") { - withSparkSqlSessionConfig("hoodie.merge.small.file.group.candidates.limit" -> "0") { + // TODO(HUDI-8468): add COW test after supporting COMMIT_TIME_ORDERING in MERGE INTO for COW + test(s"Test merge operations with COMMIT_TIME_ORDERING for $tableType table " + + s"(tableVersion=$tableVersion,setRecordMergeConfigs=$setRecordMergeConfigs," + + s"setUpsertOperation=$setUpsertOperation)") { + withSparkSqlSessionConfigWithCondition( + ("hoodie.merge.small.file.group.candidates.limit" -> "0", true), + ("hoodie.spark.sql.insert.into.operation" -> "upsert", setUpsertOperation), + // TODO(HUDI-8820): enable MDT after supporting MDT with table version 6 + ("hoodie.metadata.enable" -> "false", tableVersion.toInt == 6) + ) { withRecordType()(withTempDir { tmp => val tableName = generateTableName // Create table with COMMIT_TIME_ORDERING @@ -164,82 +246,103 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase { | ts long | ) using hudi | tblproperties ( + | $writeTableVersionClause | type = '$tableType', - | primaryKey = 'id', - | preCombineField = 'ts', - | hoodie.record.merge.mode = 'COMMIT_TIME_ORDERING' + | primaryKey = 'id' + | $mergeConfigClause | ) | location '${tmp.getCanonicalPath}' - """.stripMargin) + """.stripMargin) + validateTableConfig( + storage, tmp.getCanonicalPath, expectedMergeConfigs, nonExistentConfigs) // Insert initial records spark.sql( s""" | insert into $tableName - | select 1 as id, 'A' as name, 10.0 as price, 100 as ts union all - | select 0, 'X', 20.0, 100 union all - | select 2, 'B', 20.0, 100 union all - | select 3, 'C', 30.0, 100 union all - | select 4, 'D', 40.0, 100 union all - | select 5, 'E', 50.0, 100 union all - | select 6, 'F', 60.0, 100 - """.stripMargin) + | select 1 as id, 'A' as name, 10.0 as price, 100L as ts union all + | select 0, 'X', 20.0, 100L union all + | select 2, 'B', 20.0, 100L union all + | select 3, 'C', 30.0, 100L union all + | select 4, 'D', 40.0, 100L union all + | select 5, 'E', 50.0, 100L union all + | select 6, 'F', 60.0, 100L + """.stripMargin) + validateTableConfig( + storage, tmp.getCanonicalPath, expectedMergeConfigs, nonExistentConfigs) + // TODO(HUDI-8840): enable MERGE INTO with deletes + val shouldTestMergeIntoDelete = setRecordMergeConfigs && tableVersion.toInt == 8 // Merge operation - delete with higher, lower and equal ordering field value, all should take effect. - spark.sql( - s""" - | merge into $tableName t - | using ( - | select 1 as id, 'B2' as name, 25.0 as price, 101 as ts union all - | select 2, '', 55.0, 99 as ts union all - | select 0, '', 55.0, 100 as ts - | ) s - | on t.id = s.id - | when matched then delete - """.stripMargin) + if (shouldTestMergeIntoDelete) { + spark.sql( + s""" + | merge into $tableName t + | using ( + | select 1 as id, 'B2' as name, 25.0 as price, 101L as ts union all + | select 2, '', 55.0, 99L as ts union all + | select 0, '', 55.0, 100L as ts + | ) s + | on t.id = s.id + | when matched then delete + """.stripMargin) + } // Merge operation - update with mixed ts values spark.sql( s""" | merge into $tableName t | using ( - | select 4 as id, 'D2' as name, 45.0 as price, 101 as ts union all - | select 5, 'E2', 55.0, 99 as ts union all - | select 6, 'F2', 65.0, 100 as ts + | select 4 as id, 'D2' as name, 45.0 as price, 101L as ts union all + | select 5, 'E2', 55.0, 99L as ts union all + | select 6, 'F2', 65.0, 100L as ts | ) s | on t.id = s.id | when matched then update set * - """.stripMargin) + """.stripMargin) // Verify state after merges + validateTableConfig( + storage, tmp.getCanonicalPath, expectedMergeConfigs, nonExistentConfigs) + val nonDeletedRows: Seq[Seq[Any]] = if (shouldTestMergeIntoDelete) { + Seq() + } else { + Seq(Seq(0, "X", 20.0, 100), + Seq(1, "A", 10.0, 100), + Seq(2, "B", 20.0, 100)) + } checkAnswer(s"select id, name, price, ts from $tableName order by id")( - Seq(3, "C", 30.0, 100), - Seq(4, "D2", 45.0, 101), - Seq(5, "E2", 55.0, 99), - Seq(6, "F2", 65.0, 100) - ) + (nonDeletedRows ++ Seq( + Seq(3, "C", 30.0, 100), + Seq(4, "D2", 45.0, 101), + Seq(5, "E2", 55.0, 99), + Seq(6, "F2", 65.0, 100) + )): _*) // Insert new records through merge spark.sql( s""" | merge into $tableName t | using ( - | select 7 as id, 'D2' as name, 45.0 as price, 100 as ts union all - | select 8, 'E2', 55.0, 100 as ts + | select 7 as id, 'D2' as name, 45.0 as price, 100L as ts union all + | select 8, 'E2', 55.0, 100L as ts | ) s | on t.id = s.id | when not matched then insert * - """.stripMargin) + """.stripMargin) // Verify final state + validateTableConfig( + storage, tmp.getCanonicalPath, expectedMergeConfigs, nonExistentConfigs) checkAnswer(s"select id, name, price, ts from $tableName order by id")( - Seq(3, "C", 30.0, 100), - Seq(4, "D2", 45.0, 101), - Seq(5, "E2", 55.0, 99), - Seq(6, "F2", 65.0, 100), - Seq(7, "D2", 45.0, 100), - Seq(8, "E2", 55.0, 100) - ) + (nonDeletedRows ++ Seq( + Seq(3, "C", 30.0, 100), + Seq(4, "D2", 45.0, 101), + Seq(5, "E2", 55.0, 99), + Seq(6, "F2", 65.0, 100), + Seq(7, "D2", 45.0, 100), + Seq(8, "E2", 55.0, 100) + )): _*) }) } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeEventTimeOrdering.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeEventTimeOrdering.scala index f0ffbc3a38aa..69a5c83d6aea 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeEventTimeOrdering.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeEventTimeOrdering.scala @@ -18,16 +18,68 @@ package org.apache.spark.sql.hudi.dml import org.apache.hudi.DataSourceWriteOptions +import org.apache.hudi.common.config.RecordMergeMode.EVENT_TIME_ORDERING +import org.apache.hudi.common.model.DefaultHoodieRecordPayload +import org.apache.hudi.common.model.HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID +import org.apache.hudi.common.table.HoodieTableConfig +import org.apache.hudi.common.testutils.HoodieTestUtils import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase +import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.validateTableConfig class TestMergeModeEventTimeOrdering extends HoodieSparkSqlTestBase { - Seq("cow", "mor").foreach { tableType => - test(s"Test $tableType table with EVENT_TIME_ORDERING merge mode") { - withSparkSqlSessionConfig( - "hoodie.merge.small.file.group.candidates.limit" -> "0", - DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key -> "true" + // TODO(HUDI-8938): add "mor,6,true", "mor,6,false" after the fix + Seq("cow,8,true", "cow,8,false", "cow,6,true", "cow,6,false", + "mor,8,true", "mor,8,false").foreach { args => + val argList = args.split(',') + val tableType = argList(0) + val tableVersion = argList(1) + val setRecordMergeConfigs = argList(2).toBoolean + val storage = HoodieTestUtils.getDefaultStorage + val mergeConfigClause = if (setRecordMergeConfigs) { + if (tableVersion.toInt == 6) { + // Table version 6 + s", payloadClass = '${classOf[DefaultHoodieRecordPayload].getName}'" + } else { + // Current table version (8) + ", hoodie.record.merge.mode = 'EVENT_TIME_ORDERING'" + } + } else { + "" + } + val writeTableVersionClause = if (tableVersion.toInt == 6) { + s"hoodie.write.table.version = $tableVersion," + } else { + "" + } + val expectedMergeConfigs = if (tableVersion.toInt == 6) { + Map( + HoodieTableConfig.VERSION.key -> "6", + HoodieTableConfig.PAYLOAD_CLASS_NAME.key -> classOf[DefaultHoodieRecordPayload].getName, + HoodieTableConfig.PRECOMBINE_FIELD.key -> "ts" + ) + } else { + Map( + HoodieTableConfig.VERSION.key -> "8", + HoodieTableConfig.PRECOMBINE_FIELD.key -> "ts", + HoodieTableConfig.RECORD_MERGE_MODE.key -> EVENT_TIME_ORDERING.name(), + HoodieTableConfig.PAYLOAD_CLASS_NAME.key -> classOf[DefaultHoodieRecordPayload].getName, + HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key -> EVENT_TIME_BASED_MERGE_STRATEGY_UUID) + } + val nonExistentConfigs = if (tableVersion.toInt == 6) { + Seq(HoodieTableConfig.RECORD_MERGE_MODE.key) + } else { + Seq() + } + + test(s"Test $tableType table with EVENT_TIME_ORDERING (tableVersion=$tableVersion," + + s"setRecordMergeConfigs=$setRecordMergeConfigs)") { + withSparkSqlSessionConfigWithCondition( + ("hoodie.merge.small.file.group.candidates.limit" -> "0", true), + (DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key -> "true", true), + // TODO(HUDI-8820): enable MDT after supporting MDT with table version 6 + ("hoodie.metadata.enable" -> "false", tableVersion.toInt == 6) ) { withRecordType()(withTempDir { tmp => val tableName = generateTableName @@ -41,13 +93,16 @@ class TestMergeModeEventTimeOrdering extends HoodieSparkSqlTestBase { | ts long | ) using hudi | tblproperties ( + | $writeTableVersionClause | type = '$tableType', | primaryKey = 'id', - | preCombineField = 'ts', - | hoodie.record.merge.mode = 'EVENT_TIME_ORDERING' + | preCombineField = 'ts' + | $mergeConfigClause | ) | location '${tmp.getCanonicalPath}' """.stripMargin) + validateTableConfig( + storage, tmp.getCanonicalPath, expectedMergeConfigs, nonExistentConfigs) // Insert initial records with ts=100 spark.sql( @@ -67,6 +122,8 @@ class TestMergeModeEventTimeOrdering extends HoodieSparkSqlTestBase { | select 2, 'B_equal', 70.0, 100 """.stripMargin) + validateTableConfig( + storage, tmp.getCanonicalPath, expectedMergeConfigs, nonExistentConfigs) checkAnswer(s"select id, name, price, ts from $tableName order by id")( Seq(1, "A_equal", 60.0, 100), Seq(2, "B_equal", 70.0, 100) @@ -80,6 +137,8 @@ class TestMergeModeEventTimeOrdering extends HoodieSparkSqlTestBase { | where id = 1 """.stripMargin) + validateTableConfig( + storage, tmp.getCanonicalPath, expectedMergeConfigs, nonExistentConfigs) checkAnswer(s"select id, name, price, ts from $tableName order by id")( Seq(1, "A_equal", 50.0, 100), Seq(2, "B_equal", 70.0, 100) @@ -94,6 +153,8 @@ class TestMergeModeEventTimeOrdering extends HoodieSparkSqlTestBase { | select 2, 'B', 40.0, 99 """.stripMargin) + validateTableConfig( + storage, tmp.getCanonicalPath, expectedMergeConfigs, nonExistentConfigs) checkAnswer(s"select id, name, price, ts from $tableName order by id")( Seq(1, "A_equal", 50.0, 100), Seq(2, "B_equal", 70.0, 100) @@ -107,6 +168,8 @@ class TestMergeModeEventTimeOrdering extends HoodieSparkSqlTestBase { | where id = 1 """.stripMargin) + validateTableConfig( + storage, tmp.getCanonicalPath, expectedMergeConfigs, nonExistentConfigs) checkAnswer(s"select id, name, price, ts from $tableName order by id")( Seq(1, "A_equal", 50.0, 100), Seq(2, "B_equal", 70.0, 100) @@ -164,110 +227,124 @@ class TestMergeModeEventTimeOrdering extends HoodieSparkSqlTestBase { // Delete record with no ts. spark.sql(s"delete from $tableName where id = 1") // Verify deletion + validateTableConfig( + storage, tmp.getCanonicalPath, expectedMergeConfigs, nonExistentConfigs) checkAnswer(s"select id, name, price, ts from $tableName order by id")( Seq(2, "B", 40.0, 101) ) }) } } - } - Seq("mor").foreach { tableType => - // [HUDI-8915]: COW MIT delete does not honor event time ordering. For update we have the coverage in - // "Test MergeInto with commit time/event time ordering coverage". - // Seq("cow", "mor").foreach { tableType => - test(s"Test merge operations with EVENT_TIME_ORDERING for $tableType table") { - withSparkSqlSessionConfig("hoodie.merge.small.file.group.candidates.limit" -> "0") { - withRecordType()(withTempDir { tmp => - val tableName = generateTableName - // Create table with EVENT_TIME_ORDERING - spark.sql( - s""" - | create table $tableName ( - | id int, - | name string, - | price double, - | ts long - | ) using hudi - | tblproperties ( - | type = '$tableType', - | primaryKey = 'id', - | preCombineField = 'ts', - | hoodie.record.merge.mode = 'EVENT_TIME_ORDERING' - | ) - | location '${tmp.getCanonicalPath}' + if ("mor".equals(tableType)) { + // [HUDI-8915]: COW MIT delete does not honor event time ordering. For update we have the coverage in + // "Test MergeInto with commit time/event time ordering coverage". + // Seq("cow", "mor").foreach { tableType => + test(s"Test merge operations with EVENT_TIME_ORDERING for $tableType table " + + s"(tableVersion=$tableVersion,setRecordMergeConfigs=$setRecordMergeConfigs)") { + withSparkSqlSessionConfigWithCondition( + ("hoodie.merge.small.file.group.candidates.limit" -> "0", true), + // TODO(HUDI-8820): enable MDT after supporting MDT with table version 6 + ("hoodie.metadata.enable" -> "false", tableVersion.toInt == 6) + ) { + withRecordType()(withTempDir { tmp => + val tableName = generateTableName + // Create table with EVENT_TIME_ORDERING + spark.sql( + s""" + | create table $tableName ( + | id int, + | name string, + | price double, + | ts long + | ) using hudi + | tblproperties ( + | $writeTableVersionClause + | type = '$tableType', + | primaryKey = 'id', + | preCombineField = 'ts' + | $mergeConfigClause + | ) + | location '${tmp.getCanonicalPath}' """.stripMargin) + validateTableConfig( + storage, tmp.getCanonicalPath, expectedMergeConfigs, nonExistentConfigs) - // Insert initial records with ts=100 - spark.sql( - s""" - | insert into $tableName - | select 0 as id, 'A0' as name, 0.0 as price, 100L as ts union all - | select 1, 'A', 10.0, 100L union all - | select 2, 'B', 20.0, 100L union all - | select 3, 'C', 30.0, 100L union all - | select 4, 'D', 40.0, 100L union all - | select 5, 'E', 50.0, 100L union all - | select 6, 'F', 60.0, 100L + // Insert initial records with ts=100 + spark.sql( + s""" + | insert into $tableName + | select 0 as id, 'A0' as name, 0.0 as price, 100L as ts union all + | select 1, 'A', 10.0, 100L union all + | select 2, 'B', 20.0, 100L union all + | select 3, 'C', 30.0, 100L union all + | select 4, 'D', 40.0, 100L union all + | select 5, 'E', 50.0, 100L union all + | select 6, 'F', 60.0, 100L """.stripMargin) - // Merge operation - delete with arbitrary ts value (lower, equal and higher). Lower ts won't take effect. - spark.sql( - s""" - | merge into $tableName t - | using ( - | select 0 as id, 'B2' as name, 25.0 as price, 100L as ts union all - | select 1 as id, 'B2' as name, 25.0 as price, 101L as ts union all - | select 2 as id, 'B2' as name, 25.0 as price, 99L as ts - | ) s - | on t.id = s.id - | when matched then delete + // Merge operation - delete with arbitrary ts value (lower, equal and higher). Lower ts won't take effect. + spark.sql( + s""" + | merge into $tableName t + | using ( + | select 0 as id, 'B2' as name, 25.0 as price, 100L as ts union all + | select 1 as id, 'B2' as name, 25.0 as price, 101L as ts union all + | select 2 as id, 'B2' as name, 25.0 as price, 99L as ts + | ) s + | on t.id = s.id + | when matched then delete """.stripMargin) - // Merge operation - update with mixed ts values (only equal or higher ts should take effect) - spark.sql( - s""" - | merge into $tableName t - | using ( - | select 4 as id, 'D2' as name, 45.0 as price, 101L as ts union all - | select 5, 'E2', 55.0, 99L as ts union all - | select 6, 'F2', 65.0, 100L as ts - | ) s - | on t.id = s.id - | when matched then update set * + // Merge operation - update with mixed ts values (only equal or higher ts should take effect) + spark.sql( + s""" + | merge into $tableName t + | using ( + | select 4 as id, 'D2' as name, 45.0 as price, 101L as ts union all + | select 5, 'E2', 55.0, 99L as ts union all + | select 6, 'F2', 65.0, 100L as ts + | ) s + | on t.id = s.id + | when matched then update set * """.stripMargin) - // Verify state after merges - checkAnswer(s"select id, name, price, ts from $tableName order by id")( - Seq(2, "B", 20.0, 100), - Seq(3, "C", 30.0, 100), - Seq(4, "D2", 45.0, 101), - Seq(5, "E", 50.0, 100), - Seq(6, "F2", 65.0, 100) - ) + // Verify state after merges + validateTableConfig( + storage, tmp.getCanonicalPath, expectedMergeConfigs, nonExistentConfigs) + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(2, "B", 20.0, 100), + Seq(3, "C", 30.0, 100), + Seq(4, "D2", 45.0, 101), + Seq(5, "E", 50.0, 100), + Seq(6, "F2", 65.0, 100) + ) - // Insert new records through merge - spark.sql( - s""" - | merge into $tableName t - | using ( - | select 7 as id, 'G' as name, 70.0 as price, 99 as ts union all - | select 8, 'H', 80.0, 99 as ts - | ) s - | on t.id = s.id - | when not matched then insert * + // Insert new records through merge + spark.sql( + s""" + | merge into $tableName t + | using ( + | select 7 as id, 'G' as name, 70.0 as price, 99 as ts union all + | select 8, 'H', 80.0, 99 as ts + | ) s + | on t.id = s.id + | when not matched then insert * """.stripMargin) - checkAnswer(s"select id, name, price, ts from $tableName order by id")( - Seq(2, "B", 20.0, 100), - Seq(3, "C", 30.0, 100), - Seq(4, "D2", 45.0, 101), - Seq(5, "E", 50.0, 100), - Seq(6, "F2", 65.0, 100), - Seq(7, "G", 70.0, 99), - Seq(8, "H", 80.0, 99) - ) - }) + validateTableConfig( + storage, tmp.getCanonicalPath, expectedMergeConfigs, nonExistentConfigs) + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(2, "B", 20.0, 100), + Seq(3, "C", 30.0, 100), + Seq(4, "D2", 45.0, 101), + Seq(5, "E", 50.0, 100), + Seq(6, "F2", 65.0, 100), + Seq(7, "G", 70.0, 99), + Seq(8, "H", 80.0, 99) + ) + }) + } } } }