Skip to content

Commit

Permalink
Implement FileId + RecordKey based sort partitioning to reduce skew i…
Browse files Browse the repository at this point in the history
…n the data
  • Loading branch information
Vamsi committed Jan 30, 2025
1 parent 27a950e commit 666815b
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,14 @@ public class HoodieIndexConfig extends HoodieConfig {
+ "When true, bucketized bloom filtering is enabled. "
+ "This reduces skew seen in sort based bloom index lookup");

public static final ConfigProperty<String> BLOOM_INDEX_FILE_GROUP_ID_KEY_SORT_PARTITIONER = ConfigProperty
.key("hoodie.bloom.index.fileId.key.sort.partitioner")
.defaultValue("false")
.markAdvanced()
.withDocumentation("Only applies if index type is BLOOM. "
+ "When true, fileId and key sort based partitioning is enabled "
+ "This reduces skew seen in bucket based bloom index lookup");

public static final ConfigProperty<String> SIMPLE_INDEX_USE_CACHING = ConfigProperty
.key("hoodie.simple.index.use.caching")
.defaultValue("true")
Expand Down Expand Up @@ -620,6 +628,11 @@ public Builder bloomIndexBucketizedChecking(boolean bucketizedChecking) {
return this;
}

public Builder bloomIndexFileGroupIdKeySortPartitioner(boolean fileGroupIdKeySortPartitioner) {
hoodieIndexConfig.setValue(BLOOM_INDEX_FILE_GROUP_ID_KEY_SORT_PARTITIONER, String.valueOf(fileGroupIdKeySortPartitioner));
return this;
}

public Builder bloomIndexKeysPerBucket(int keysPerBucket) {
hoodieIndexConfig.setValue(BLOOM_INDEX_KEYS_PER_BUCKET, String.valueOf(keysPerBucket));
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2055,6 +2055,10 @@ public boolean useBloomIndexBucketizedChecking() {
return getBoolean(HoodieIndexConfig.BLOOM_INDEX_BUCKETIZED_CHECKING);
}

public boolean useBloomIndexFileGroupIdKeySortPartitioner() {
return getBoolean(HoodieIndexConfig.BLOOM_INDEX_FILE_GROUP_ID_KEY_SORT_PARTITIONER);
}

/**
* Determines if the metadata bloom filter index is enabled.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ public HoodiePairData<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecor
.repartitionAndSortWithinPartitions(partitioner)
.map(Tuple2::_2)
.mapPartitions(new HoodieSparkBloomIndexCheckFunction(hoodieTable, config), true);
} else if (config.useBloomIndexFileGroupIdKeySortPartitioner()) {
keyLookupResultRDD = fileComparisonsRDD.map(fileGroupAndRecordKey -> fileGroupAndRecordKey)
.sortBy(fileGroupAndRecordKey -> fileGroupAndRecordKey._1
+ "+" + fileGroupAndRecordKey._2, true, targetParallelism
).mapPartitions(new HoodieSparkBloomIndexCheckFunction(hoodieTable, config), true);
} else {
keyLookupResultRDD = fileComparisonsRDD.sortByKey(true, targetParallelism)
.mapPartitions(new HoodieSparkBloomIndexCheckFunction(hoodieTable, config), true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,20 +82,24 @@ public class TestHoodieBloomIndex extends TestHoodieMetadataBase {

private static final Schema SCHEMA = getSchemaFromResource(TestHoodieBloomIndex.class, "/exampleSchema.avsc", true);
private static final String TEST_NAME_WITH_PARAMS =
"[{index}] Test with rangePruning={0}, treeFiltering={1}, bucketizedChecking={2}, useMetadataTable={3}";
"[{index}] Test with rangePruning={0}, treeFiltering={1}, bucketizedChecking={2}, useMetadataTable={3}, useFileGroupIDKeySortPartitioner={4}";
private static final Random RANDOM = new Random(0xDEED);

public static Stream<Arguments> configParams() {
// rangePruning, treeFiltering, bucketizedChecking, useMetadataTable
// rangePruning, treeFiltering, bucketizedChecking, useMetadataTable, useFileGroupIDKeySortPartitioner
Object[][] data = new Object[][] {
{true, true, true, false},
{false, true, true, false},
{true, true, false, false},
{true, false, true, false},
{true, true, true, true},
{false, true, true, true},
{true, true, false, true},
{true, false, true, true}
{true, true, true, false, false},
{false, true, true, false, false},
{true, true, false, false, false},
{true, false, true, false, false},
{true, true, true, true, false},
{false, true, true, true, false},
{true, true, false, true, false},
{true, false, true, true, false},
{true, true, false, false, true},
{true, false, false, false, true},
{false, false, false, false, true},
{false, true, false, false, true}
};
return Stream.of(data).map(Arguments::of);
}
Expand All @@ -120,7 +124,8 @@ public void tearDown() throws Exception {
}

private HoodieWriteConfig makeConfig(
boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking, boolean useMetadataTable) {
boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking,
boolean useMetadataTable, boolean useFileGroupIDKeySortPartitioner) {
// For the bloom index to use column stats and bloom filters from metadata table,
// the following configs must be set to true:
// "hoodie.bloom.index.use.metadata"
Expand All @@ -134,6 +139,7 @@ private HoodieWriteConfig makeConfig(
.bloomIndexBucketizedChecking(bucketizedChecking)
.bloomIndexKeysPerBucket(2)
.bloomIndexUseMetadata(useMetadataTable)
.bloomIndexFileGroupIdKeySortPartitioner(useFileGroupIDKeySortPartitioner)
.build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.withMetadataIndexBloomFilter(useMetadataTable)
Expand All @@ -146,9 +152,9 @@ private HoodieWriteConfig makeConfig(
@MethodSource("configParams")
public void testLoadInvolvedFiles(
boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking,
boolean useMetadataTable) throws Exception {
boolean useMetadataTable, boolean useFileGroupIDKeySortPartitioner) throws Exception {
HoodieWriteConfig config =
makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable);
makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable, useFileGroupIDKeySortPartitioner);
HoodieBloomIndex index = new HoodieBloomIndex(config, SparkHoodieBloomIndexHelper.getInstance());
HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient);
metadataWriter = SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context);
Expand Down Expand Up @@ -249,9 +255,9 @@ public void testLoadInvolvedFiles(
@MethodSource("configParams")
public void testRangePruning(
boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking,
boolean useMetadataTable) {
boolean useMetadataTable, boolean useFileGroupIDKeySortPartitioner) {
HoodieWriteConfig config =
makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable);
makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable, useFileGroupIDKeySortPartitioner);
HoodieBloomIndex index = new HoodieBloomIndex(config, SparkHoodieBloomIndexHelper.getInstance());

final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo = new HashMap<>();
Expand Down Expand Up @@ -352,12 +358,12 @@ public void testCheckUUIDsAgainstOneFile() throws Exception {
@MethodSource("configParams")
public void testTagLocationWithEmptyRDD(
boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking,
boolean useMetadataTable) {
boolean useMetadataTable, boolean useFileGroupIDKeySortPartitioner) {
// We have some records to be tagged (two different partitions)
JavaRDD<HoodieRecord> recordRDD = jsc.emptyRDD();
// Also create the metadata and config
HoodieWriteConfig config =
makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable);
makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable, useFileGroupIDKeySortPartitioner);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieSparkTable table = HoodieSparkTable.create(config, context, metaClient);

Expand All @@ -373,7 +379,7 @@ public void testTagLocationWithEmptyRDD(
@MethodSource("configParams")
public void testTagLocationOnPartitionedTable(
boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking,
boolean useMetadataTable) throws Exception {
boolean useMetadataTable, boolean useFileGroupIDKeySortPartitioner) throws Exception {
// We have some records to be tagged (two different partitions)
String rowKey1 = genRandomUUID();
String rowKey2 = genRandomUUID();
Expand All @@ -398,7 +404,7 @@ public void testTagLocationOnPartitionedTable(
JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3, record4));

// Also create the metadata and config
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable);
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable, useFileGroupIDKeySortPartitioner);
HoodieSparkTable hoodieTable = HoodieSparkTable.create(config, context, metaClient);
metadataWriter = SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context);
HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, metadataWriter, Option.of(context));
Expand Down Expand Up @@ -473,7 +479,7 @@ public void testTagLocationOnPartitionedTable(
@MethodSource("configParams")
public void testTagLocationOnNonpartitionedTable(
boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking,
boolean useMetadataTable) throws Exception {
boolean useMetadataTable, boolean useFileGroupIDKeySortPartitioner) throws Exception {
// We have some records to be tagged (two different partitions)
String rowKey1 = genRandomUUID();
String rowKey2 = genRandomUUID();
Expand All @@ -497,7 +503,7 @@ public void testTagLocationOnNonpartitionedTable(

// Also create the metadata and config
HoodieWriteConfig config =
makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable);
makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable, useFileGroupIDKeySortPartitioner);
HoodieSparkTable hoodieTable = HoodieSparkTable.create(config, context, metaClient);
metadataWriter = SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context);
HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, metadataWriter, Option.of(context));
Expand Down Expand Up @@ -566,7 +572,7 @@ public void testTagLocationOnNonpartitionedTable(
@MethodSource("configParams")
public void testCheckExists(
boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking,
boolean useMetadataTable) throws Exception {
boolean useMetadataTable, boolean useFileGroupIDKeySortPartitioner) throws Exception {
// We have some records to be tagged (two different partitions)

String recordStr1 = "{\"_row_key\":\"1eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
Expand All @@ -593,7 +599,7 @@ public void testCheckExists(

// Also create the metadata and config
HoodieWriteConfig config =
makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable);
makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable, useFileGroupIDKeySortPartitioner);
HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient);
metadataWriter = SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context);
HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, metadataWriter, Option.of(context));
Expand Down Expand Up @@ -681,7 +687,7 @@ public void testCheckExists(
@MethodSource("configParams")
public void testBloomFilterFalseError(
boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking,
boolean useMetadataTable) throws Exception {
boolean useMetadataTable, boolean useFileGroupIDKeySortPartitioner) throws Exception {
// We have two hoodie records
String recordStr1 = "{\"_row_key\":\"1eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
+ "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
Expand All @@ -707,7 +713,7 @@ public void testBloomFilterFalseError(
// We do the tag
JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1, record2));
HoodieWriteConfig config =
makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable);
makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable, useFileGroupIDKeySortPartitioner);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieSparkTable.create(config, context, metaClient);

Expand Down

0 comments on commit 666815b

Please sign in to comment.