Skip to content

Commit

Permalink
[HUDI-8394] Restrict multiple bulk inserts into COW with simple bucke…
Browse files Browse the repository at this point in the history
…t and disabled Spark native row writer (#12245)
  • Loading branch information
geserdugarov authored Dec 18, 2024
1 parent a1411a6 commit 63e014a
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
package org.apache.hudi.table;

import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
import org.apache.hudi.io.AppendHandleFactory;
import org.apache.hudi.io.SingleFileHandleCreateFactory;
Expand Down Expand Up @@ -47,6 +49,7 @@ public abstract class BucketIndexBulkInsertPartitioner<T> implements BulkInsertP
protected final List<String> indexKeyFields;
protected final List<Boolean> doAppend = new ArrayList<>();
protected final List<String> fileIdPfxList = new ArrayList<>();
protected boolean isAppendAllowed;

public BucketIndexBulkInsertPartitioner(HoodieTable table, String sortString, boolean preserveHoodieMetadata) {

Expand All @@ -59,12 +62,20 @@ public BucketIndexBulkInsertPartitioner(HoodieTable table, String sortString, bo
this.sortColumnNames = null;
}
this.preserveHoodieMetadata = preserveHoodieMetadata;
// Multiple bulk inserts into COW using `BucketIndexBulkInsertPartitioner` is restricted, otherwise AppendHandleFactory will produce MOR log files
this.isAppendAllowed = !table.getMetaClient().getTableConfig().getTableType().equals(HoodieTableType.COPY_ON_WRITE);
}

@Override
public Option<WriteHandleFactory> getWriteHandleFactory(int idx) {
return doAppend.get(idx) ? Option.of(new AppendHandleFactory()) :
Option.of(new SingleFileHandleCreateFactory(FSUtils.createNewFileId(getFileIdPfx(idx), 0), this.preserveHoodieMetadata));
if (!doAppend.get(idx)) {
return Option.of(new SingleFileHandleCreateFactory(FSUtils.createNewFileId(getFileIdPfx(idx), 0), this.preserveHoodieMetadata));
} else if (isAppendAllowed) {
return Option.of(new AppendHandleFactory());
} else {
throw new HoodieNotSupportedException("Multiple bulk inserts into COW with simple bucket and disabled Spark native row writer is not supported, "
+ "please, use upsert operation, overwrite mode (already written data will be lost), or turn on Spark native row writer.");
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ public void testExtractRecordKeys() {
String[] s3 = KeyGenUtils.extractRecordKeys("id:1,id2:__null__,id3:__empty__");
Assertions.assertArrayEquals(new String[] {"1", null, ""}, s3);

// keys with ':' are not supported
String[] s4 = KeyGenUtils.extractRecordKeys("id:ab:cd,id2:ef");
Assertions.assertArrayEquals(new String[] {"ab:cd", "ef"}, s4);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieSparkTable;
Expand All @@ -40,13 +41,17 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static org.apache.hudi.exception.ExceptionUtil.getRootCause;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertLinesMatch;

public class TestRDDSimpleBucketBulkInsertPartitioner extends HoodieSparkClientTestHarness {

Expand Down Expand Up @@ -86,6 +91,7 @@ public void testSimpleBucketPartitioner(String tableType, boolean partitionSort)
HoodieJavaRDD<HoodieRecord> javaRDD = HoodieJavaRDD.of(records, context, 1);

final HoodieSparkTable table = HoodieSparkTable.create(config, context);
// we call BulkInsertInternalPartitionerFactory.get() directly, which behaves like we disabled Spark native row writer
BulkInsertPartitioner partitioner = BulkInsertInternalPartitionerFactory.get(table, config);
JavaRDD<HoodieRecord> repartitionRecords =
(JavaRDD<HoodieRecord>) partitioner.repartitionRecords(HoodieJavaRDD.getJavaRDD(javaRDD), 1);
Expand All @@ -104,27 +110,32 @@ public void testSimpleBucketPartitioner(String tableType, boolean partitionSort)
}, false).collect();
}

// first writes, it will create new bucket files based on the records
// 1st write, will create new bucket files based on the records
getHoodieWriteClient(config).startCommitWithTime("0");
List<WriteStatus> writeStatues = getHoodieWriteClient(config).bulkInsert(HoodieJavaRDD.getJavaRDD(javaRDD), "0").collect();
Map<String, WriteStatus> writeStatuesMap = new HashMap<>();
writeStatues.forEach(ws -> writeStatuesMap.put(ws.getFileId(), ws));
List<WriteStatus> writeStatuses = getHoodieWriteClient(config).bulkInsert(HoodieJavaRDD.getJavaRDD(javaRDD), "0").collect();
Map<String, WriteStatus> writeStatusesMap = new HashMap<>();
writeStatuses.forEach(ws -> writeStatusesMap.put(ws.getFileId(), ws));

// second writes the same records, all records should be mapped to the same bucket files
getHoodieWriteClient(config).startCommitWithTime("1");
List<WriteStatus> writeStatues2 = getHoodieWriteClient(config).bulkInsert(HoodieJavaRDD.getJavaRDD(javaRDD), "1").collect();
writeStatues2.forEach(ws -> assertEquals(ws.getTotalRecords(), writeStatuesMap.get(ws.getFileId()).getTotalRecords()));
// 2nd write of the same records, all records should be mapped to the same bucket files for MOR,
// for COW with disabled Spark native row writer, 2nd bulk insert should fail with exception
try {
List<WriteStatus> writeStatuses2 = getHoodieWriteClient(config).bulkInsert(HoodieJavaRDD.getJavaRDD(javaRDD), "1").collect();
writeStatuses2.forEach(ws -> assertEquals(ws.getTotalRecords(), writeStatusesMap.get(ws.getFileId()).getTotalRecords()));
} catch (Exception ex) {
assertEquals("COPY_ON_WRITE", tableType);
Throwable rootExceptionCause = getRootCause(ex);
assertInstanceOf(HoodieNotSupportedException.class, rootExceptionCause);
assertLinesMatch(Collections.singletonList("Multiple bulk insert.*COW.*Spark native row writer.*not supported.*"),
Collections.singletonList(rootExceptionCause.getMessage()));
}
}

private static final List<Object> TABLE_TYPES = Arrays.asList(
"COPY_ON_WRITE",
"MERGE_ON_READ"
);

private static Iterable<Object> tableTypes() {
return TABLE_TYPES;
}

// table type, partitionSort
private static Iterable<Object[]> configParams() {
List<Object[]> opts = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import java.io.File
import java.util.TimeZone
import java.util.concurrent.atomic.AtomicInteger
import java.util.regex.Pattern
import scala.util.matching.Regex

class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll {
org.apache.log4j.Logger.getRootLogger.setLevel(org.apache.log4j.Level.WARN)
Expand Down Expand Up @@ -230,6 +231,20 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll {
assertResult(true)(hasException)
}

protected def checkExceptionMatch(sql: String)(errorMsgRegex: String): Unit = {
var hasException = false
try {
spark.sql(sql)
} catch {
case e: Throwable if getRootCause(e).getMessage.matches(errorMsgRegex) =>
hasException = true

case f: Throwable =>
fail("Exception should match pattern: " + errorMsgRegex + ", error message: " + getRootCause(f).getMessage, f)
}
assertResult(true)(hasException)
}

def dropTypeLiteralPrefix(value: Any): Any = {
value match {
case s: String =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1665,47 +1665,105 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
Seq(3, "a3,3", 30.0, 3000, "2021-01-07")
)

spark.sql(
s"""
| insert into $tableName values
| (1, 'a1', 10, 1000, "2021-01-05"),
| (3, "a3", 30, 3000, "2021-01-07")
""".stripMargin)
// for COW with disabled Spark native row writer, multiple bulk inserts are restricted
if (tableType != "cow" && bulkInsertAsRow != "false") {
spark.sql(
s"""
| insert into $tableName values
| (1, 'a1', 10, 1000, "2021-01-05"),
| (3, "a3", 30, 3000, "2021-01-07")
""".stripMargin)

checkAnswer(s"select id, name, price, ts, dt from $tableName")(
Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
Seq(1, "a1", 10.0, 1000, "2021-01-05"),
Seq(2, "a2", 20.0, 2000, "2021-01-06"),
Seq(3, "a3,3", 30.0, 3000, "2021-01-07"),
Seq(3, "a3", 30.0, 3000, "2021-01-07")
)
checkAnswer(s"select id, name, price, ts, dt from $tableName")(
Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
Seq(1, "a1", 10.0, 1000, "2021-01-05"),
Seq(2, "a2", 20.0, 2000, "2021-01-06"),
Seq(3, "a3,3", 30.0, 3000, "2021-01-07"),
Seq(3, "a3", 30.0, 3000, "2021-01-07")
)

// there are two files in partition(dt = '2021-01-05')
checkAnswer(s"select count(distinct _hoodie_file_name) from $tableName where dt = '2021-01-05'")(
Seq(2)
)
// there are two files in partition(dt = '2021-01-05')
checkAnswer(s"select count(distinct _hoodie_file_name) from $tableName where dt = '2021-01-05'")(
Seq(2)
)

// would generate 6 other files in partition(dt = '2021-01-05')
spark.sql(
s"""
| insert into $tableName values
| (4, 'a1,1', 10, 1000, "2021-01-05"),
| (5, 'a1,1', 10, 1000, "2021-01-05"),
| (6, 'a1,1', 10, 1000, "2021-01-05"),
| (7, 'a1,1', 10, 1000, "2021-01-05"),
| (8, 'a1,1', 10, 1000, "2021-01-05"),
| (10, 'a3,3', 30, 3000, "2021-01-05")
""".stripMargin)

checkAnswer(s"select count(distinct _hoodie_file_name) from $tableName where dt = '2021-01-05'")(
Seq(8)
)
// would generate 6 other files in partition(dt = '2021-01-05')
spark.sql(
s"""
| insert into $tableName values
| (4, 'a1,1', 10, 1000, "2021-01-05"),
| (5, 'a1,1', 10, 1000, "2021-01-05"),
| (6, 'a1,1', 10, 1000, "2021-01-05"),
| (7, 'a1,1', 10, 1000, "2021-01-05"),
| (8, 'a1,1', 10, 1000, "2021-01-05"),
| (10, 'a3,3', 30, 3000, "2021-01-05")
""".stripMargin)

checkAnswer(s"select count(distinct _hoodie_file_name) from $tableName where dt = '2021-01-05'")(
Seq(8)
)
}
}
}
}
}
}

test("Test not supported multiple BULK INSERTs into COW with SIMPLE BUCKET and disabled Spark native row writer") {
withSQLConf("hoodie.datasource.write.operation" -> "bulk_insert",
"hoodie.bulkinsert.shuffle.parallelism" -> "1") {
withTempDir { tmp =>
val tableName = generateTableName
spark.sql(
s"""
|create table $tableName (
| id long,
| name string,
| ts int,
| par string
|) using hudi
| tblproperties (
| primaryKey = 'id,name',
| type = 'cow',
| preCombineField = 'ts',
| hoodie.index.type = 'BUCKET',
| hoodie.index.bucket.engine = 'SIMPLE',
| hoodie.bucket.index.num.buckets = '4',
| hoodie.bucket.index.hash.field = 'id,name',
| hoodie.datasource.write.row.writer.enable = 'false')
| partitioned by (par)
| location '${tmp.getCanonicalPath}'
""".stripMargin)

// Used rows with corresponding `bucketId`s if there are 4 buckets
// `id,name` `bucketId`
// 5,'a1,1' -> 1
// 6,'a6,6' -> 2
// 9,'a3,3' -> 1
// 13,'a13,13' -> 2
// 24,'cd' -> 0

// buckets 1 & 2 into partition 'main', bucket 1 into partition 'side'
spark.sql(s"insert into $tableName values (5, 'a1,1', 1, 'main'), (6, 'a6,6', 1, 'main'), (9, 'a3,3', 1, 'side')")
// bucket 1 into 'main', bucket 2 into 'side', the whole insert will fail due to existed bucket 1 in 'main'
val causeRegex = "Multiple bulk insert.*COW.*Spark native row writer.*not supported.*"
checkExceptionMatch(s"insert into $tableName values (9, 'a3,3', 2, 'main'), (13, 'a13,13', 1, 'side')")(causeRegex)
checkAnswer(spark.sql(s"select id from $tableName order by id").collect())(Seq(5), Seq(6), Seq(9))

// bucket 0 into 'main', no bucket into 'side', will also fail,
// bulk insert into separate not presented bucket, if there is some other buckets already written, also restricted
checkExceptionMatch(s"insert into $tableName values (24, 'cd', 1, 'main')")(causeRegex)
checkAnswer(spark.sql(s"select id from $tableName where par = 'main' order by id").collect())(Seq(5), Seq(6))

// for overwrite mode it's allowed to do multiple bulk inserts
spark.sql(s"insert overwrite $tableName values (9, 'a3,3', 3, 'main'), (13, 'a13,13', 2, 'side')")
// only data from the latest insert overwrite is available,
// because insert overwrite drops the whole table due to [HUDI-4704]
checkAnswer(spark.sql(s"select id from $tableName order by id").collect())(Seq(9), Seq(13))
}
}
}

/**
* This test is to make sure that bulk insert doesn't create a bunch of tiny files if
* hoodie.bulkinsert.user.defined.partitioner.sort.columns doesn't start with the partition columns
Expand Down

0 comments on commit 63e014a

Please sign in to comment.