From 63e014a6c0fd783818f380a6a84be58990abdecc Mon Sep 17 00:00:00 2001 From: Geser Dugarov Date: Wed, 18 Dec 2024 09:43:19 +0700 Subject: [PATCH] [HUDI-8394] Restrict multiple bulk inserts into COW with simple bucket and disabled Spark native row writer (#12245) --- .../BucketIndexBulkInsertPartitioner.java | 15 ++- .../apache/hudi/keygen/TestKeyGenUtils.java | 1 - ...tRDDSimpleBucketBulkInsertPartitioner.java | 33 +++-- .../hudi/common/HoodieSparkSqlTestBase.scala | 15 +++ .../spark/sql/hudi/dml/TestInsertTable.scala | 122 +++++++++++++----- 5 files changed, 140 insertions(+), 46 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BucketIndexBulkInsertPartitioner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BucketIndexBulkInsertPartitioner.java index df50877a410e..6be3dc83da0a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BucketIndexBulkInsertPartitioner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BucketIndexBulkInsertPartitioner.java @@ -19,7 +19,9 @@ package org.apache.hudi.table; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode; import org.apache.hudi.io.AppendHandleFactory; import org.apache.hudi.io.SingleFileHandleCreateFactory; @@ -47,6 +49,7 @@ public abstract class BucketIndexBulkInsertPartitioner implements BulkInsertP protected final List indexKeyFields; protected final List doAppend = new ArrayList<>(); protected final List fileIdPfxList = new ArrayList<>(); + protected boolean isAppendAllowed; public BucketIndexBulkInsertPartitioner(HoodieTable table, String sortString, boolean preserveHoodieMetadata) { @@ -59,12 +62,20 @@ public BucketIndexBulkInsertPartitioner(HoodieTable table, String sortString, bo this.sortColumnNames = null; } this.preserveHoodieMetadata = preserveHoodieMetadata; + // Multiple bulk inserts into COW using `BucketIndexBulkInsertPartitioner` is restricted, otherwise AppendHandleFactory will produce MOR log files + this.isAppendAllowed = !table.getMetaClient().getTableConfig().getTableType().equals(HoodieTableType.COPY_ON_WRITE); } @Override public Option getWriteHandleFactory(int idx) { - return doAppend.get(idx) ? Option.of(new AppendHandleFactory()) : - Option.of(new SingleFileHandleCreateFactory(FSUtils.createNewFileId(getFileIdPfx(idx), 0), this.preserveHoodieMetadata)); + if (!doAppend.get(idx)) { + return Option.of(new SingleFileHandleCreateFactory(FSUtils.createNewFileId(getFileIdPfx(idx), 0), this.preserveHoodieMetadata)); + } else if (isAppendAllowed) { + return Option.of(new AppendHandleFactory()); + } else { + throw new HoodieNotSupportedException("Multiple bulk inserts into COW with simple bucket and disabled Spark native row writer is not supported, " + + "please, use upsert operation, overwrite mode (already written data will be lost), or turn on Spark native row writer."); + } } @Override diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java index 5cffef26e6e5..e506751a1a08 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java @@ -124,7 +124,6 @@ public void testExtractRecordKeys() { String[] s3 = KeyGenUtils.extractRecordKeys("id:1,id2:__null__,id3:__empty__"); Assertions.assertArrayEquals(new String[] {"1", null, ""}, s3); - // keys with ':' are not supported String[] s4 = KeyGenUtils.extractRecordKeys("id:ab:cd,id2:ef"); Assertions.assertArrayEquals(new String[] {"ab:cd", "ef"}, s4); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestRDDSimpleBucketBulkInsertPartitioner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestRDDSimpleBucketBulkInsertPartitioner.java index 0141d0d4cecd..017847eff555 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestRDDSimpleBucketBulkInsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestRDDSimpleBucketBulkInsertPartitioner.java @@ -26,6 +26,7 @@ import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.data.HoodieJavaRDD; +import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieSparkTable; @@ -40,13 +41,17 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; +import static org.apache.hudi.exception.ExceptionUtil.getRootCause; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertLinesMatch; public class TestRDDSimpleBucketBulkInsertPartitioner extends HoodieSparkClientTestHarness { @@ -86,6 +91,7 @@ public void testSimpleBucketPartitioner(String tableType, boolean partitionSort) HoodieJavaRDD javaRDD = HoodieJavaRDD.of(records, context, 1); final HoodieSparkTable table = HoodieSparkTable.create(config, context); + // we call BulkInsertInternalPartitionerFactory.get() directly, which behaves like we disabled Spark native row writer BulkInsertPartitioner partitioner = BulkInsertInternalPartitionerFactory.get(table, config); JavaRDD repartitionRecords = (JavaRDD) partitioner.repartitionRecords(HoodieJavaRDD.getJavaRDD(javaRDD), 1); @@ -104,16 +110,25 @@ public void testSimpleBucketPartitioner(String tableType, boolean partitionSort) }, false).collect(); } - // first writes, it will create new bucket files based on the records + // 1st write, will create new bucket files based on the records getHoodieWriteClient(config).startCommitWithTime("0"); - List writeStatues = getHoodieWriteClient(config).bulkInsert(HoodieJavaRDD.getJavaRDD(javaRDD), "0").collect(); - Map writeStatuesMap = new HashMap<>(); - writeStatues.forEach(ws -> writeStatuesMap.put(ws.getFileId(), ws)); + List writeStatuses = getHoodieWriteClient(config).bulkInsert(HoodieJavaRDD.getJavaRDD(javaRDD), "0").collect(); + Map writeStatusesMap = new HashMap<>(); + writeStatuses.forEach(ws -> writeStatusesMap.put(ws.getFileId(), ws)); - // second writes the same records, all records should be mapped to the same bucket files getHoodieWriteClient(config).startCommitWithTime("1"); - List writeStatues2 = getHoodieWriteClient(config).bulkInsert(HoodieJavaRDD.getJavaRDD(javaRDD), "1").collect(); - writeStatues2.forEach(ws -> assertEquals(ws.getTotalRecords(), writeStatuesMap.get(ws.getFileId()).getTotalRecords())); + // 2nd write of the same records, all records should be mapped to the same bucket files for MOR, + // for COW with disabled Spark native row writer, 2nd bulk insert should fail with exception + try { + List writeStatuses2 = getHoodieWriteClient(config).bulkInsert(HoodieJavaRDD.getJavaRDD(javaRDD), "1").collect(); + writeStatuses2.forEach(ws -> assertEquals(ws.getTotalRecords(), writeStatusesMap.get(ws.getFileId()).getTotalRecords())); + } catch (Exception ex) { + assertEquals("COPY_ON_WRITE", tableType); + Throwable rootExceptionCause = getRootCause(ex); + assertInstanceOf(HoodieNotSupportedException.class, rootExceptionCause); + assertLinesMatch(Collections.singletonList("Multiple bulk insert.*COW.*Spark native row writer.*not supported.*"), + Collections.singletonList(rootExceptionCause.getMessage())); + } } private static final List TABLE_TYPES = Arrays.asList( @@ -121,10 +136,6 @@ public void testSimpleBucketPartitioner(String tableType, boolean partitionSort) "MERGE_ON_READ" ); - private static Iterable tableTypes() { - return TABLE_TYPES; - } - // table type, partitionSort private static Iterable configParams() { List opts = new ArrayList<>(); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala index b30a2adb353f..778706702681 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala @@ -43,6 +43,7 @@ import java.io.File import java.util.TimeZone import java.util.concurrent.atomic.AtomicInteger import java.util.regex.Pattern +import scala.util.matching.Regex class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll { org.apache.log4j.Logger.getRootLogger.setLevel(org.apache.log4j.Level.WARN) @@ -230,6 +231,20 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll { assertResult(true)(hasException) } + protected def checkExceptionMatch(sql: String)(errorMsgRegex: String): Unit = { + var hasException = false + try { + spark.sql(sql) + } catch { + case e: Throwable if getRootCause(e).getMessage.matches(errorMsgRegex) => + hasException = true + + case f: Throwable => + fail("Exception should match pattern: " + errorMsgRegex + ", error message: " + getRootCause(f).getMessage, f) + } + assertResult(true)(hasException) + } + def dropTypeLiteralPrefix(value: Any): Any = { value match { case s: String => diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala index 693bdeb73600..7814d75a05e5 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala @@ -1665,47 +1665,105 @@ class TestInsertTable extends HoodieSparkSqlTestBase { Seq(3, "a3,3", 30.0, 3000, "2021-01-07") ) - spark.sql( - s""" - | insert into $tableName values - | (1, 'a1', 10, 1000, "2021-01-05"), - | (3, "a3", 30, 3000, "2021-01-07") - """.stripMargin) + // for COW with disabled Spark native row writer, multiple bulk inserts are restricted + if (tableType != "cow" && bulkInsertAsRow != "false") { + spark.sql( + s""" + | insert into $tableName values + | (1, 'a1', 10, 1000, "2021-01-05"), + | (3, "a3", 30, 3000, "2021-01-07") + """.stripMargin) - checkAnswer(s"select id, name, price, ts, dt from $tableName")( - Seq(1, "a1,1", 10.0, 1000, "2021-01-05"), - Seq(1, "a1", 10.0, 1000, "2021-01-05"), - Seq(2, "a2", 20.0, 2000, "2021-01-06"), - Seq(3, "a3,3", 30.0, 3000, "2021-01-07"), - Seq(3, "a3", 30.0, 3000, "2021-01-07") - ) + checkAnswer(s"select id, name, price, ts, dt from $tableName")( + Seq(1, "a1,1", 10.0, 1000, "2021-01-05"), + Seq(1, "a1", 10.0, 1000, "2021-01-05"), + Seq(2, "a2", 20.0, 2000, "2021-01-06"), + Seq(3, "a3,3", 30.0, 3000, "2021-01-07"), + Seq(3, "a3", 30.0, 3000, "2021-01-07") + ) - // there are two files in partition(dt = '2021-01-05') - checkAnswer(s"select count(distinct _hoodie_file_name) from $tableName where dt = '2021-01-05'")( - Seq(2) - ) + // there are two files in partition(dt = '2021-01-05') + checkAnswer(s"select count(distinct _hoodie_file_name) from $tableName where dt = '2021-01-05'")( + Seq(2) + ) - // would generate 6 other files in partition(dt = '2021-01-05') - spark.sql( - s""" - | insert into $tableName values - | (4, 'a1,1', 10, 1000, "2021-01-05"), - | (5, 'a1,1', 10, 1000, "2021-01-05"), - | (6, 'a1,1', 10, 1000, "2021-01-05"), - | (7, 'a1,1', 10, 1000, "2021-01-05"), - | (8, 'a1,1', 10, 1000, "2021-01-05"), - | (10, 'a3,3', 30, 3000, "2021-01-05") - """.stripMargin) - - checkAnswer(s"select count(distinct _hoodie_file_name) from $tableName where dt = '2021-01-05'")( - Seq(8) - ) + // would generate 6 other files in partition(dt = '2021-01-05') + spark.sql( + s""" + | insert into $tableName values + | (4, 'a1,1', 10, 1000, "2021-01-05"), + | (5, 'a1,1', 10, 1000, "2021-01-05"), + | (6, 'a1,1', 10, 1000, "2021-01-05"), + | (7, 'a1,1', 10, 1000, "2021-01-05"), + | (8, 'a1,1', 10, 1000, "2021-01-05"), + | (10, 'a3,3', 30, 3000, "2021-01-05") + """.stripMargin) + + checkAnswer(s"select count(distinct _hoodie_file_name) from $tableName where dt = '2021-01-05'")( + Seq(8) + ) + } } } } } } + test("Test not supported multiple BULK INSERTs into COW with SIMPLE BUCKET and disabled Spark native row writer") { + withSQLConf("hoodie.datasource.write.operation" -> "bulk_insert", + "hoodie.bulkinsert.shuffle.parallelism" -> "1") { + withTempDir { tmp => + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id long, + | name string, + | ts int, + | par string + |) using hudi + | tblproperties ( + | primaryKey = 'id,name', + | type = 'cow', + | preCombineField = 'ts', + | hoodie.index.type = 'BUCKET', + | hoodie.index.bucket.engine = 'SIMPLE', + | hoodie.bucket.index.num.buckets = '4', + | hoodie.bucket.index.hash.field = 'id,name', + | hoodie.datasource.write.row.writer.enable = 'false') + | partitioned by (par) + | location '${tmp.getCanonicalPath}' + """.stripMargin) + + // Used rows with corresponding `bucketId`s if there are 4 buckets + // `id,name` `bucketId` + // 5,'a1,1' -> 1 + // 6,'a6,6' -> 2 + // 9,'a3,3' -> 1 + // 13,'a13,13' -> 2 + // 24,'cd' -> 0 + + // buckets 1 & 2 into partition 'main', bucket 1 into partition 'side' + spark.sql(s"insert into $tableName values (5, 'a1,1', 1, 'main'), (6, 'a6,6', 1, 'main'), (9, 'a3,3', 1, 'side')") + // bucket 1 into 'main', bucket 2 into 'side', the whole insert will fail due to existed bucket 1 in 'main' + val causeRegex = "Multiple bulk insert.*COW.*Spark native row writer.*not supported.*" + checkExceptionMatch(s"insert into $tableName values (9, 'a3,3', 2, 'main'), (13, 'a13,13', 1, 'side')")(causeRegex) + checkAnswer(spark.sql(s"select id from $tableName order by id").collect())(Seq(5), Seq(6), Seq(9)) + + // bucket 0 into 'main', no bucket into 'side', will also fail, + // bulk insert into separate not presented bucket, if there is some other buckets already written, also restricted + checkExceptionMatch(s"insert into $tableName values (24, 'cd', 1, 'main')")(causeRegex) + checkAnswer(spark.sql(s"select id from $tableName where par = 'main' order by id").collect())(Seq(5), Seq(6)) + + // for overwrite mode it's allowed to do multiple bulk inserts + spark.sql(s"insert overwrite $tableName values (9, 'a3,3', 3, 'main'), (13, 'a13,13', 2, 'side')") + // only data from the latest insert overwrite is available, + // because insert overwrite drops the whole table due to [HUDI-4704] + checkAnswer(spark.sql(s"select id from $tableName order by id").collect())(Seq(9), Seq(13)) + } + } + } + /** * This test is to make sure that bulk insert doesn't create a bunch of tiny files if * hoodie.bulkinsert.user.defined.partitioner.sort.columns doesn't start with the partition columns