diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java index 385532917c498..63677ca7ff9ce 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java @@ -147,6 +147,14 @@ public class HoodieIndexConfig extends HoodieConfig { + "When true, bucketized bloom filtering is enabled. " + "This reduces skew seen in sort based bloom index lookup"); + public static final ConfigProperty BLOOM_INDEX_FILE_GROUP_ID_KEY_SORT_PARTITIONER = ConfigProperty + .key("hoodie.bloom.index.fileId.key.sort.partitioner") + .defaultValue("false") + .markAdvanced() + .withDocumentation("Only applies if index type is BLOOM. " + + "When true, fileId and key sort based partitioning is enabled " + + "This reduces skew seen in bucket based bloom index lookup"); + public static final ConfigProperty SIMPLE_INDEX_USE_CACHING = ConfigProperty .key("hoodie.simple.index.use.caching") .defaultValue("true") @@ -620,6 +628,11 @@ public Builder bloomIndexBucketizedChecking(boolean bucketizedChecking) { return this; } + public Builder bloomIndexFileGroupIdKeySortPartitioner(boolean fileGroupIdKeySortPartitioner) { + hoodieIndexConfig.setValue(BLOOM_INDEX_FILE_GROUP_ID_KEY_SORT_PARTITIONER, String.valueOf(fileGroupIdKeySortPartitioner)); + return this; + } + public Builder bloomIndexKeysPerBucket(int keysPerBucket) { hoodieIndexConfig.setValue(BLOOM_INDEX_KEYS_PER_BUCKET, String.valueOf(keysPerBucket)); return this; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index ddc0aa8bc46f1..ce2ffe47678d7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -2055,6 +2055,10 @@ public boolean useBloomIndexBucketizedChecking() { return getBoolean(HoodieIndexConfig.BLOOM_INDEX_BUCKETIZED_CHECKING); } + public boolean useBloomIndexFileGroupIdKeySortPartitioner() { + return getBoolean(HoodieIndexConfig.BLOOM_INDEX_FILE_GROUP_ID_KEY_SORT_PARTITIONER); + } + /** * Determines if the metadata bloom filter index is enabled. * diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java index 41f17da3bb366..833a827dc91f4 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java @@ -168,6 +168,11 @@ public HoodiePairData findMatchingFilesForRecor .repartitionAndSortWithinPartitions(partitioner) .map(Tuple2::_2) .mapPartitions(new HoodieSparkBloomIndexCheckFunction(hoodieTable, config), true); + } else if (config.useBloomIndexFileGroupIdKeySortPartitioner()) { + keyLookupResultRDD = fileComparisonsRDD.map(fileGroupAndRecordKey -> fileGroupAndRecordKey) + .sortBy(fileGroupAndRecordKey -> fileGroupAndRecordKey._1 + + "+" + fileGroupAndRecordKey._2, true, targetParallelism + ).mapPartitions(new HoodieSparkBloomIndexCheckFunction(hoodieTable, config), true); } else { keyLookupResultRDD = fileComparisonsRDD.sortByKey(true, targetParallelism) .mapPartitions(new HoodieSparkBloomIndexCheckFunction(hoodieTable, config), true); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java index 496a922bb7d6b..054447afea075 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java @@ -82,20 +82,24 @@ public class TestHoodieBloomIndex extends TestHoodieMetadataBase { private static final Schema SCHEMA = getSchemaFromResource(TestHoodieBloomIndex.class, "/exampleSchema.avsc", true); private static final String TEST_NAME_WITH_PARAMS = - "[{index}] Test with rangePruning={0}, treeFiltering={1}, bucketizedChecking={2}, useMetadataTable={3}"; + "[{index}] Test with rangePruning={0}, treeFiltering={1}, bucketizedChecking={2}, useMetadataTable={3}, useFileGroupIDKeySortPartitioner={4}"; private static final Random RANDOM = new Random(0xDEED); public static Stream configParams() { - // rangePruning, treeFiltering, bucketizedChecking, useMetadataTable + // rangePruning, treeFiltering, bucketizedChecking, useMetadataTable, useFileGroupIDKeySortPartitioner Object[][] data = new Object[][] { - {true, true, true, false}, - {false, true, true, false}, - {true, true, false, false}, - {true, false, true, false}, - {true, true, true, true}, - {false, true, true, true}, - {true, true, false, true}, - {true, false, true, true} + {true, true, true, false, false}, + {false, true, true, false, false}, + {true, true, false, false, false}, + {true, false, true, false, false}, + {true, true, true, true, false}, + {false, true, true, true, false}, + {true, true, false, true, false}, + {true, false, true, true, false}, + {true, true, false, false, true}, + {true, false, false, false, true}, + {false, false, false, false, true}, + {false, true, false, false, true} }; return Stream.of(data).map(Arguments::of); } @@ -120,7 +124,8 @@ public void tearDown() throws Exception { } private HoodieWriteConfig makeConfig( - boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking, boolean useMetadataTable) { + boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking, + boolean useMetadataTable, boolean useFileGroupIDKeySortPartitioner) { // For the bloom index to use column stats and bloom filters from metadata table, // the following configs must be set to true: // "hoodie.bloom.index.use.metadata" @@ -134,6 +139,7 @@ private HoodieWriteConfig makeConfig( .bloomIndexBucketizedChecking(bucketizedChecking) .bloomIndexKeysPerBucket(2) .bloomIndexUseMetadata(useMetadataTable) + .bloomIndexFileGroupIdKeySortPartitioner(useFileGroupIDKeySortPartitioner) .build()) .withMetadataConfig(HoodieMetadataConfig.newBuilder() .withMetadataIndexBloomFilter(useMetadataTable) @@ -146,9 +152,9 @@ private HoodieWriteConfig makeConfig( @MethodSource("configParams") public void testLoadInvolvedFiles( boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking, - boolean useMetadataTable) throws Exception { + boolean useMetadataTable, boolean useFileGroupIDKeySortPartitioner) throws Exception { HoodieWriteConfig config = - makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable); + makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable, useFileGroupIDKeySortPartitioner); HoodieBloomIndex index = new HoodieBloomIndex(config, SparkHoodieBloomIndexHelper.getInstance()); HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient); metadataWriter = SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context); @@ -249,9 +255,9 @@ public void testLoadInvolvedFiles( @MethodSource("configParams") public void testRangePruning( boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking, - boolean useMetadataTable) { + boolean useMetadataTable, boolean useFileGroupIDKeySortPartitioner) { HoodieWriteConfig config = - makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable); + makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable, useFileGroupIDKeySortPartitioner); HoodieBloomIndex index = new HoodieBloomIndex(config, SparkHoodieBloomIndexHelper.getInstance()); final Map> partitionToFileIndexInfo = new HashMap<>(); @@ -352,12 +358,12 @@ public void testCheckUUIDsAgainstOneFile() throws Exception { @MethodSource("configParams") public void testTagLocationWithEmptyRDD( boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking, - boolean useMetadataTable) { + boolean useMetadataTable, boolean useFileGroupIDKeySortPartitioner) { // We have some records to be tagged (two different partitions) JavaRDD recordRDD = jsc.emptyRDD(); // Also create the metadata and config HoodieWriteConfig config = - makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable); + makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable, useFileGroupIDKeySortPartitioner); metaClient = HoodieTableMetaClient.reload(metaClient); HoodieSparkTable table = HoodieSparkTable.create(config, context, metaClient); @@ -373,7 +379,7 @@ public void testTagLocationWithEmptyRDD( @MethodSource("configParams") public void testTagLocationOnPartitionedTable( boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking, - boolean useMetadataTable) throws Exception { + boolean useMetadataTable, boolean useFileGroupIDKeySortPartitioner) throws Exception { // We have some records to be tagged (two different partitions) String rowKey1 = genRandomUUID(); String rowKey2 = genRandomUUID(); @@ -398,7 +404,7 @@ public void testTagLocationOnPartitionedTable( JavaRDD recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3, record4)); // Also create the metadata and config - HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable); + HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable, useFileGroupIDKeySortPartitioner); HoodieSparkTable hoodieTable = HoodieSparkTable.create(config, context, metaClient); metadataWriter = SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context); HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, metadataWriter, Option.of(context)); @@ -473,7 +479,7 @@ public void testTagLocationOnPartitionedTable( @MethodSource("configParams") public void testTagLocationOnNonpartitionedTable( boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking, - boolean useMetadataTable) throws Exception { + boolean useMetadataTable, boolean useFileGroupIDKeySortPartitioner) throws Exception { // We have some records to be tagged (two different partitions) String rowKey1 = genRandomUUID(); String rowKey2 = genRandomUUID(); @@ -497,7 +503,7 @@ public void testTagLocationOnNonpartitionedTable( // Also create the metadata and config HoodieWriteConfig config = - makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable); + makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable, useFileGroupIDKeySortPartitioner); HoodieSparkTable hoodieTable = HoodieSparkTable.create(config, context, metaClient); metadataWriter = SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context); HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, metadataWriter, Option.of(context)); @@ -566,7 +572,7 @@ public void testTagLocationOnNonpartitionedTable( @MethodSource("configParams") public void testCheckExists( boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking, - boolean useMetadataTable) throws Exception { + boolean useMetadataTable, boolean useFileGroupIDKeySortPartitioner) throws Exception { // We have some records to be tagged (two different partitions) String recordStr1 = "{\"_row_key\":\"1eb5b87a-1feh-4edd-87b4-6ec96dc405a0\"," @@ -593,7 +599,7 @@ public void testCheckExists( // Also create the metadata and config HoodieWriteConfig config = - makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable); + makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable, useFileGroupIDKeySortPartitioner); HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient); metadataWriter = SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context); HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, metadataWriter, Option.of(context)); @@ -681,7 +687,7 @@ public void testCheckExists( @MethodSource("configParams") public void testBloomFilterFalseError( boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking, - boolean useMetadataTable) throws Exception { + boolean useMetadataTable, boolean useFileGroupIDKeySortPartitioner) throws Exception { // We have two hoodie records String recordStr1 = "{\"_row_key\":\"1eb5b87a-1feh-4edd-87b4-6ec96dc405a0\"," + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"; @@ -707,7 +713,7 @@ public void testBloomFilterFalseError( // We do the tag JavaRDD recordRDD = jsc.parallelize(Arrays.asList(record1, record2)); HoodieWriteConfig config = - makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable); + makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable, useFileGroupIDKeySortPartitioner); metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable table = HoodieSparkTable.create(config, context, metaClient);