Skip to content

Commit

Permalink
[HUDI-8844] Support secondary and expression index creation through a…
Browse files Browse the repository at this point in the history
…sync indexer and write configs (#12653)

- Added new write configs for secondary index (SI) and expression index (EI) conformant to sql syntax for create index.
- Use those configs to check if we need to update new SI/EI definition in index definition file.
- Initialize SI/EI if a new one is being created through write configs.
- Add async indexer and spark datasource tests to validate index gets built successfully.
  • Loading branch information
codope authored Jan 25, 2025
1 parent febf0c5 commit 739b79a
Show file tree
Hide file tree
Showing 27 changed files with 793 additions and 201 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieIndexDefinition;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
Expand All @@ -36,6 +37,8 @@
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.MetadataValues;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.HoodieTimer;
Expand All @@ -44,11 +47,13 @@
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.exception.HoodieMetadataIndexException;
import org.apache.hudi.io.HoodieMergedReadHandle;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
Expand All @@ -63,13 +68,21 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

import static java.util.stream.Collectors.toList;
import static org.apache.hudi.common.config.HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP;
import static org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
import static org.apache.hudi.index.expression.HoodieExpressionIndex.EXPRESSION_OPTION;
import static org.apache.hudi.index.expression.HoodieExpressionIndex.IDENTITY_TRANSFORM;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_EXPRESSION_INDEX_PREFIX;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.validateDataTypeForSecondaryOrExpressionIndex;
import static org.apache.hudi.table.action.commit.HoodieDeleteHelper.createDeleteRecord;

/**
Expand Down Expand Up @@ -460,4 +473,69 @@ public static <R> HoodieRecord<R> createNewTaggedHoodieRecord(HoodieRecord<R> ol
throw new HoodieIndexException("Unsupported record type: " + recordType);
}
}

/**
* Register a metadata index.
* Index definitions are stored in user-specified path or, by default, in .hoodie/.index_defs/index.json.
* For the first time, the index definition file will be created if not exists.
* For the second time, the index definition file will be updated if exists.
* Table Config is updated if necessary.
*/
public static void register(HoodieTableMetaClient metaClient, HoodieIndexDefinition indexDefinition) {
LOG.info("Registering index {} of using {}", indexDefinition.getIndexName(), indexDefinition.getIndexType());
// build HoodieIndexMetadata and then add to index definition file
boolean indexDefnUpdated = metaClient.buildIndexDefinition(indexDefinition);
if (indexDefnUpdated) {
String indexMetaPath = metaClient.getIndexDefinitionPath();
// update table config if necessary
if (!metaClient.getTableConfig().getProps().containsKey(HoodieTableConfig.RELATIVE_INDEX_DEFINITION_PATH.key())
|| !metaClient.getTableConfig().getRelativeIndexDefinitionPath().isPresent()) {
metaClient.getTableConfig().setValue(HoodieTableConfig.RELATIVE_INDEX_DEFINITION_PATH, FSUtils.getRelativePartitionPath(metaClient.getBasePath(), new StoragePath(indexMetaPath)));
HoodieTableConfig.update(metaClient.getStorage(), metaClient.getMetaPath(), metaClient.getTableConfig().getProps());
}
}
}

static HoodieIndexDefinition getSecondaryOrExpressionIndexDefinition(HoodieTableMetaClient metaClient, String userIndexName, String indexType, Map<String, Map<String, String>> columns,
Map<String, String> options, Map<String, String> tableProperties) throws Exception {
String fullIndexName = indexType.equals(PARTITION_NAME_SECONDARY_INDEX)
? PARTITION_NAME_SECONDARY_INDEX_PREFIX + userIndexName
: PARTITION_NAME_EXPRESSION_INDEX_PREFIX + userIndexName;
if (indexExists(metaClient, fullIndexName)) {
throw new HoodieMetadataIndexException("Index already exists: " + userIndexName);
}
checkArgument(columns.size() == 1, "Only one column can be indexed for functional or secondary index.");

if (!isEligibleForSecondaryOrExpressionIndex(metaClient, indexType, tableProperties, columns)) {
throw new HoodieMetadataIndexException("Not eligible for indexing: " + indexType + ", indexName: " + userIndexName);
}

return HoodieIndexDefinition.newBuilder()
.withIndexName(fullIndexName)
.withIndexType(indexType)
.withIndexFunction(options.getOrDefault(EXPRESSION_OPTION, IDENTITY_TRANSFORM))
.withSourceFields(new ArrayList<>(columns.keySet()))
.withIndexOptions(options)
.build();
}

static boolean indexExists(HoodieTableMetaClient metaClient, String indexName) {
return metaClient.getTableConfig().getMetadataPartitions().stream().anyMatch(partition -> partition.equals(indexName));
}

private static boolean isEligibleForSecondaryOrExpressionIndex(HoodieTableMetaClient metaClient,
String indexType,
Map<String, String> options,
Map<String, Map<String, String>> columns) throws Exception {
if (!validateDataTypeForSecondaryOrExpressionIndex(new ArrayList<>(columns.keySet()), new TableSchemaResolver(metaClient).getTableAvroSchema())) {
return false;
}
// for secondary index, record index is a must
if (indexType.equals(PARTITION_NAME_SECONDARY_INDEX)) {
// either record index is enabled or record index partition is already present
return metaClient.getTableConfig().getMetadataPartitions().stream().anyMatch(partition -> partition.equals(MetadataPartitionType.RECORD_INDEX.getPartitionPath()))
|| Boolean.parseBoolean(options.getOrDefault(RECORD_INDEX_ENABLE_PROP.key(), RECORD_INDEX_ENABLE_PROP.defaultValue().toString()));
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.io.HoodieMergedReadHandle;
import org.apache.hudi.metadata.HoodieTableMetadataUtil.DirectoryInfo;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
Expand Down Expand Up @@ -108,10 +109,12 @@
import static org.apache.hudi.metadata.HoodieMetadataWriteUtils.createMetadataWriteConfig;
import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.DirectoryInfo;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getExpressionIndexPartitionsToInit;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightMetadataPartitions;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionLatestFileSlicesIncludingInflight;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getProjectedSchemaForExpressionIndex;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getSecondaryIndexPartitionsToInit;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.readRecordKeysFromBaseFiles;
import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;
import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
Expand Down Expand Up @@ -269,7 +272,7 @@ protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
// If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
// Otherwise, we use the timestamp of the latest completed action.
String initializationTime = dataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::requestedTime).orElse(SOLO_COMMIT_TIMESTAMP);
initializeFromFilesystem(initializationTime, metadataPartitionsToInit, inflightInstantTimestamp);
initializeFromFilesystem(initializationTime, metadataPartitionsToInit);
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
return true;
} catch (IOException e) {
Expand Down Expand Up @@ -337,10 +340,8 @@ private boolean isBootstrapNeeded(Option<HoodieInstant> latestMetadataInstant) {
*
* @param initializationTime - Timestamp to use for the commit
* @param partitionsToInit - List of MDT partitions to initialize
* @param inflightInstantTimestamp - Current action instant responsible for this initialization
*/
private void initializeFromFilesystem(String initializationTime, List<MetadataPartitionType> partitionsToInit,
Option<String> inflightInstantTimestamp) throws IOException {
private void initializeFromFilesystem(String initializationTime, List<MetadataPartitionType> partitionsToInit) throws IOException {
Set<String> pendingDataInstants = getPendingDataInstants(dataMetaClient);

// FILES partition is always required and is initialized first
Expand Down Expand Up @@ -425,11 +426,13 @@ private void initializeFromFilesystem(String initializationTime, List<MetadataPa
partitionName = RECORD_INDEX.getPartitionPath();
break;
case EXPRESSION_INDEX:
Set<String> expressionIndexPartitionsToInit = getIndexPartitionsToInit(partitionType);
if (expressionIndexPartitionsToInit.isEmpty()) {
Set<String> expressionIndexPartitionsToInit = getExpressionIndexPartitionsToInit(partitionType, dataWriteConfig.getMetadataConfig(), dataMetaClient);
if (expressionIndexPartitionsToInit.size() != 1) {
if (expressionIndexPartitionsToInit.size() > 1) {
LOG.warn("Skipping expression index initialization as only one expression index bootstrap at a time is supported for now. Provided: {}", expressionIndexPartitionsToInit);
}
continue;
}
ValidationUtils.checkState(expressionIndexPartitionsToInit.size() == 1, "Only one expression index at a time is supported for now");
partitionName = expressionIndexPartitionsToInit.iterator().next();
fileGroupCountAndRecordsPair = initializeExpressionIndexPartition(partitionName, instantTimeForPartition);
break;
Expand All @@ -444,7 +447,7 @@ private void initializeFromFilesystem(String initializationTime, List<MetadataPa
partitionName = PARTITION_STATS.getPartitionPath();
break;
case SECONDARY_INDEX:
Set<String> secondaryIndexPartitionsToInit = getIndexPartitionsToInit(partitionType);
Set<String> secondaryIndexPartitionsToInit = getSecondaryIndexPartitionsToInit(partitionType, dataWriteConfig.getMetadataConfig(), dataMetaClient);
if (secondaryIndexPartitionsToInit.size() != 1) {
if (secondaryIndexPartitionsToInit.size() > 1) {
LOG.warn("Skipping secondary index initialization as only one secondary index bootstrap at a time is supported for now. Provided: {}", secondaryIndexPartitionsToInit);
Expand Down Expand Up @@ -613,20 +616,6 @@ HoodieIndexDefinition getIndexDefinition(String indexName) {
return HoodieTableMetadataUtil.getHoodieIndexDefinition(indexName, dataMetaClient);
}

private Set<String> getIndexPartitionsToInit(MetadataPartitionType partitionType) {
if (dataMetaClient.getIndexMetadata().isEmpty()) {
return Collections.emptySet();
}

Set<String> indexPartitions = dataMetaClient.getIndexMetadata().get().getIndexDefinitions().values().stream()
.map(HoodieIndexDefinition::getIndexName)
.filter(indexName -> indexName.startsWith(partitionType.getPartitionPath()))
.collect(Collectors.toSet());
Set<String> completedMetadataPartitions = dataMetaClient.getTableConfig().getMetadataPartitions();
indexPartitions.removeAll(completedMetadataPartitions);
return indexPartitions;
}

private Pair<Integer, HoodieData<HoodieRecord>> initializeSecondaryIndexPartition(String indexName) throws IOException {
HoodieIndexDefinition indexDefinition = getIndexDefinition(indexName);
ValidationUtils.checkState(indexDefinition != null, "Secondary Index definition is not present for index " + indexName);
Expand Down Expand Up @@ -1074,7 +1063,7 @@ public void buildMetadataPartitions(HoodieEngineContext engineContext, List<Hood
dataMetaClient.getTableConfig().setMetadataPartitionsInflight(dataMetaClient, partitionPaths);

// initialize partitions
initializeFromFilesystem(instantTime, partitionTypes, Option.empty());
initializeFromFilesystem(instantTime, partitionTypes);
}

/**
Expand Down Expand Up @@ -1172,7 +1161,7 @@ private void updateSecondaryIndexIfPresent(HoodieCommitMetadata commitMetadata,

dataMetaClient.getTableConfig().getMetadataPartitions()
.stream()
.filter(partition -> partition.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX))
.filter(partition -> partition.startsWith(PARTITION_NAME_SECONDARY_INDEX_PREFIX))
.forEach(partition -> {
HoodieData<HoodieRecord> secondaryIndexRecords;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,58 +17,28 @@
* under the License.
*/

package org.apache.hudi.table.action.index.functional;
package org.apache.hudi.table.action.index;

import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieIndexDefinition;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.storage.StoragePath;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;

public abstract class BaseHoodieIndexClient {

private static final Logger LOG = LoggerFactory.getLogger(BaseHoodieIndexClient.class);

public BaseHoodieIndexClient() {
}

/**
* Register a expression index.
* Index definitions are stored in user-specified path or, by default, in .hoodie/.index_defs/index.json.
* For the first time, the index definition file will be created if not exists.
* For the second time, the index definition file will be updated if exists.
* Table Config is updated if necessary.
*/
public void register(HoodieTableMetaClient metaClient, HoodieIndexDefinition indexDefinition) {
LOG.info("Registering index {} of using {}", indexDefinition.getIndexName(), indexDefinition.getIndexType());
// build HoodieIndexMetadata and then add to index definition file
boolean indexDefnUpdated = metaClient.buildIndexDefinition(indexDefinition);
if (indexDefnUpdated) {
String indexMetaPath = metaClient.getIndexDefinitionPath();
// update table config if necessary
if (!metaClient.getTableConfig().getProps().containsKey(HoodieTableConfig.RELATIVE_INDEX_DEFINITION_PATH.key())
|| !metaClient.getTableConfig().getRelativeIndexDefinitionPath().isPresent()) {
metaClient.getTableConfig().setValue(HoodieTableConfig.RELATIVE_INDEX_DEFINITION_PATH, FSUtils.getRelativePartitionPath(metaClient.getBasePath(), new StoragePath(indexMetaPath)));
HoodieTableConfig.update(metaClient.getStorage(), metaClient.getMetaPath(), metaClient.getTableConfig().getProps());
}
}
}

/**
* Create a expression index.
* Create a metadata index.
*/
public abstract void create(HoodieTableMetaClient metaClient, String indexName, String indexType, Map<String, Map<String, String>> columns, Map<String, String> options,
Map<String, String> tableProperties) throws Exception;

/**
* Creates or updated the col stats index definition.
* @param metaClient data table's {@link HoodieTableMetaClient} instance.
*
* @param metaClient data table's {@link HoodieTableMetaClient} instance.
* @param columnsToIndex list of columns to index.
*/
public abstract void createOrUpdateColumnStatsIndexDefinition(HoodieTableMetaClient metaClient, List<String> columnsToIndex);
Expand Down
Loading

0 comments on commit 739b79a

Please sign in to comment.