Skip to content

Commit

Permalink
address review comments and fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
codope committed Jan 20, 2025
1 parent 6245282 commit 7baa38c
Show file tree
Hide file tree
Showing 15 changed files with 207 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -510,8 +510,13 @@ public static HoodieIndexDefinition getSecondaryOrExpressionIndexDefinition(Hood
throw new HoodieMetadataIndexException("Not eligible for indexing: " + indexType + ", indexName: " + userIndexName);
}

return new HoodieIndexDefinition(fullIndexName, indexType, options.getOrDefault(EXPRESSION_OPTION, IDENTITY_TRANSFORM),
new ArrayList<>(columns.keySet()), options);
return HoodieIndexDefinition.newBuilder()
.withIndexName(fullIndexName)
.withIndexType(indexType)
.withIndexFunction(options.getOrDefault(EXPRESSION_OPTION, IDENTITY_TRANSFORM))
.withSourceFields(new ArrayList<>(columns.keySet()))
.withIndexOptions(options)
.build();
}

public static boolean indexExists(HoodieTableMetaClient metaClient, String indexName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,12 @@
import static org.apache.hudi.metadata.HoodieMetadataWriteUtils.createMetadataWriteConfig;
import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_EXPRESSION_INDEX_PREFIX;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getExpressionIndexPartitionsToInit;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightMetadataPartitions;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionLatestFileSlicesIncludingInflight;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getProjectedSchemaForExpressionIndex;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getSecondaryIndexPartitionsToInit;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.readRecordKeysFromBaseFiles;
import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;
import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
Expand All @@ -124,8 +124,6 @@
import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX;
import static org.apache.hudi.metadata.MetadataPartitionType.fromPartitionPath;
import static org.apache.hudi.metadata.MetadataPartitionType.getEnabledPartitions;
import static org.apache.hudi.metadata.MetadataPartitionType.isNewExpressionIndexDefinitionRequired;
import static org.apache.hudi.metadata.MetadataPartitionType.isNewSecondaryIndexDefinitionRequired;
import static org.apache.hudi.metadata.SecondaryIndexRecordGenerationUtils.convertWriteStatsToSecondaryIndexRecords;
import static org.apache.hudi.metadata.SecondaryIndexRecordGenerationUtils.readSecondaryKeysFromFileSlices;

Expand Down Expand Up @@ -429,29 +427,9 @@ private void initializeFromFilesystem(String initializationTime, List<MetadataPa
partitionName = RECORD_INDEX.getPartitionPath();
break;
case EXPRESSION_INDEX:
Set<String> expressionIndexPartitionsToInit = getIndexPartitionsToInit(partitionType);
if (expressionIndexPartitionsToInit.isEmpty()) {
if (isNewExpressionIndexDefinitionRequired(dataWriteConfig.getMetadataConfig(), dataMetaClient)) {
String indexedColumn = dataWriteConfig.getMetadataConfig().getExpressionIndexColumn();
String indexName = dataWriteConfig.getMetadataConfig().getExpressionIndexName();
String indexType = dataWriteConfig.getMetadataConfig().getExpressionIndexType();
// Use a default index name if the indexed column is not specified
if (StringUtils.isNullOrEmpty(indexName) && StringUtils.nonEmpty(indexedColumn)) {
indexName = PARTITION_NAME_EXPRESSION_INDEX_PREFIX + indexedColumn;
}
// Build and register the new index definition
HoodieIndexDefinition indexDefinition = HoodieIndexDefinition.newBuilder()
.withIndexName(indexName)
.withIndexType(indexType)
.withSourceFields(Collections.singletonList(indexedColumn))
.withIndexOptions(dataWriteConfig.getMetadataConfig().getExpressionIndexOptions())
.build();
dataMetaClient.buildIndexDefinition(indexDefinition);
// Re-fetch the partitions after adding the new definition
expressionIndexPartitionsToInit = getIndexPartitionsToInit(partitionType);
} else {
continue;
}
Set<String> expressionIndexPartitionsToInit = getExpressionIndexPartitionsToInit(partitionType, dataWriteConfig.getMetadataConfig(), dataMetaClient);
if (expressionIndexPartitionsToInit == null) {
continue;
}
ValidationUtils.checkState(expressionIndexPartitionsToInit.size() == 1, "Only one expression index at a time is supported for now");
partitionName = expressionIndexPartitionsToInit.iterator().next();
Expand All @@ -468,25 +446,7 @@ private void initializeFromFilesystem(String initializationTime, List<MetadataPa
partitionName = PARTITION_STATS.getPartitionPath();
break;
case SECONDARY_INDEX:
Set<String> secondaryIndexPartitionsToInit = getIndexPartitionsToInit(partitionType);
// if no secondary index partition found, check if new secondary index definition need to be added based on metadata write configs
if (secondaryIndexPartitionsToInit.isEmpty() && isNewSecondaryIndexDefinitionRequired(dataWriteConfig.getMetadataConfig(), dataMetaClient)) {
String indexedColumn = dataWriteConfig.getMetadataConfig().getSecondaryIndexColumn();
String indexName = dataWriteConfig.getMetadataConfig().getSecondaryIndexName();
// Use a default index name if the indexed column is not specified
if (StringUtils.isNullOrEmpty(indexName) && StringUtils.nonEmpty(indexedColumn)) {
indexName = PARTITION_NAME_SECONDARY_INDEX_PREFIX + indexedColumn;
}
// Build and register the new index definition
HoodieIndexDefinition indexDefinition = HoodieIndexDefinition.newBuilder()
.withIndexName(indexName)
.withIndexType(PARTITION_NAME_SECONDARY_INDEX)
.withSourceFields(Collections.singletonList(indexedColumn))
.build();
dataMetaClient.buildIndexDefinition(indexDefinition);
// Re-fetch the partitions after adding the new definition
secondaryIndexPartitionsToInit = getIndexPartitionsToInit(partitionType);
}
Set<String> secondaryIndexPartitionsToInit = getSecondaryIndexPartitionsToInit(partitionType, dataWriteConfig.getMetadataConfig(), dataMetaClient);
if (secondaryIndexPartitionsToInit.size() != 1) {
if (secondaryIndexPartitionsToInit.size() > 1) {
LOG.warn("Skipping secondary index initialization as only one secondary index bootstrap at a time is supported for now. Provided: {}", secondaryIndexPartitionsToInit);
Expand Down Expand Up @@ -655,22 +615,7 @@ HoodieIndexDefinition getIndexDefinition(String indexName) {
return HoodieTableMetadataUtil.getHoodieIndexDefinition(indexName, dataMetaClient);
}

private Set<String> getIndexPartitionsToInit(MetadataPartitionType partitionType) {
if (dataMetaClient.getIndexMetadata().isEmpty()) {
return Collections.emptySet();
}

Set<String> indexPartitions = dataMetaClient.getIndexMetadata().get().getIndexDefinitions().values().stream()
.map(HoodieIndexDefinition::getIndexName)
.filter(indexName -> indexName.startsWith(partitionType.getPartitionPath()))
.collect(Collectors.toSet());
Set<String> completedMetadataPartitions = dataMetaClient.getTableConfig().getMetadataPartitions();
indexPartitions.removeAll(completedMetadataPartitions);
return indexPartitions;
}

private Pair<Integer, HoodieData<HoodieRecord>> initializeSecondaryIndexPartition(String indexName) throws IOException {
// TODO: does index definition already exist at this point, in case we're coming via the indexer?
HoodieIndexDefinition indexDefinition = getIndexDefinition(indexName);
ValidationUtils.checkState(indexDefinition != null, "Secondary Index definition is not present for index " + indexName);
List<Pair<String, FileSlice>> partitionFileSlicePairs = getPartitionFileSlicePairs();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,13 @@ private void createRecordIndex(HoodieTableMetaClient metaClient, String userInde

@Override
public void createOrUpdateColumnStatsIndexDefinition(HoodieTableMetaClient metaClient, List<String> columnsToIndex) {
HoodieIndexDefinition indexDefinition = new HoodieIndexDefinition(PARTITION_NAME_COLUMN_STATS, PARTITION_NAME_COLUMN_STATS, PARTITION_NAME_COLUMN_STATS,
columnsToIndex, Collections.EMPTY_MAP);
HoodieIndexDefinition indexDefinition = HoodieIndexDefinition.newBuilder()
.withIndexName(PARTITION_NAME_COLUMN_STATS)
.withIndexType(PARTITION_NAME_COLUMN_STATS)
.withIndexFunction(PARTITION_NAME_COLUMN_STATS)
.withSourceFields(columnsToIndex)
.withIndexOptions(Collections.EMPTY_MAP)
.build();
LOG.info("Registering Or Updating the index " + PARTITION_NAME_COLUMN_STATS);
register(metaClient, indexDefinition);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,13 @@ private HoodieIndexDefinition getIndexDefinition(String indexName, String indexT
private HoodieIndexDefinition getIndexDefinition(String indexName, String indexType, String indexFunc, List<String> sourceFields,
Map<String, String> indexOptions) {
String fullIndexName = getIndexFullName(indexName, indexType);
return new HoodieIndexDefinition(fullIndexName, indexType, indexFunc, sourceFields, indexOptions);
return HoodieIndexDefinition.newBuilder()
.withIndexName(fullIndexName)
.withIndexType(indexType)
.withIndexFunction(indexFunc)
.withSourceFields(sourceFields)
.withIndexOptions(indexOptions)
.build();
}

private String getIndexFullName(String indexName, String indexType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,26 +362,27 @@ public final class HoodieMetadataConfig extends HoodieConfig {
.key(METADATA_PREFIX + ".index.expression.column")
.noDefaultValue()
.markAdvanced()
.sinceVersion("1.0.0")
.sinceVersion("1.0.1")
.withDocumentation("Column for which expression index will be built.");

public static final ConfigProperty<String> EXPRESSION_INDEX_NAME = ConfigProperty
.key(METADATA_PREFIX + ".index.expression.name")
.defaultValue("")
.markAdvanced()
.sinceVersion("1.0.0")
.sinceVersion("1.0.1")
.withDocumentation("Name of the expression index. It is optional and default is the name of the column, prefixed by '" + PARTITION_NAME_EXPRESSION_INDEX_PREFIX + "'.");

public static final ConfigProperty<String> EXPRESSION_INDEX_TYPE = ConfigProperty
.key(METADATA_PREFIX + ".index.expression.type")
.noDefaultValue()
.markAdvanced()
.sinceVersion("1.0.0")
.sinceVersion("1.0.1")
.withDocumentation("Index type i.e. column_stats aor bloom_filters, for which expression index will be built e.g. date_format(ts).");
public static final ConfigProperty<String> EXPRESSION_INDEX_OPTIONS = ConfigProperty
.key(METADATA_PREFIX + ".index.expression.options")
.noDefaultValue()
.markAdvanced()
.sinceVersion("1.0.0")
.sinceVersion("1.0.1")
.withDocumentation("Options for the expression index, e.g. \"expr='from_unixtime', format='yyyy-MM-dd'\"");

public static final ConfigProperty<Boolean> ENABLE_METADATA_INDEX_PARTITION_STATS = ConfigProperty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ public class HoodieIndexDefinition implements Serializable {
public HoodieIndexDefinition() {
}

public HoodieIndexDefinition(String indexName, String indexType, String indexFunction, List<String> sourceFields,
Map<String, String> indexOptions) {
HoodieIndexDefinition(String indexName, String indexType, String indexFunction, List<String> sourceFields, Map<String, String> indexOptions) {
this.indexName = indexName;
this.indexType = indexType;
this.indexFunction = nonEmpty(indexFunction) ? indexFunction : EMPTY_STRING;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,6 @@ public String getIndexDefinitionPath() {
public boolean buildIndexDefinition(HoodieIndexDefinition indexDefinition) {
String indexName = indexDefinition.getIndexName();
boolean isIndexDefnImmutable = !indexDefinition.getIndexName().equals(PARTITION_NAME_COLUMN_STATS); // only col stats is mutable.
if (isIndexDefnImmutable) {
checkState(
!indexMetadataOpt.isPresent() || (!indexMetadataOpt.get().getIndexDefinitions().containsKey(indexName)),
"Index metadata is already present");
}
String indexMetaPath = getIndexDefinitionPath();
boolean updateIndexDefn = true;
if (indexMetadataOpt.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@
import static org.apache.hudi.metadata.HoodieTableMetadata.EMPTY_PARTITION_NAME;
import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME;
import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
import static org.apache.hudi.metadata.MetadataPartitionType.isNewExpressionIndexDefinitionRequired;
import static org.apache.hudi.metadata.MetadataPartitionType.isNewSecondaryIndexDefinitionRequired;

/**
* A utility to convert timeline information to metadata table records.
Expand Down Expand Up @@ -2766,6 +2768,79 @@ private static void validatePayload(int type, Map<String, HoodieMetadataFileInfo
}
}

public static Set<String> getExpressionIndexPartitionsToInit(MetadataPartitionType partitionType, HoodieMetadataConfig metadataConfig, HoodieTableMetaClient dataMetaClient) {
Set<String> expressionIndexPartitionsToInit = getIndexPartitionsToInit(partitionType, dataMetaClient);
if (expressionIndexPartitionsToInit.isEmpty()) {
if (isNewExpressionIndexDefinitionRequired(metadataConfig, dataMetaClient)) {
String indexedColumn = metadataConfig.getExpressionIndexColumn();
String indexName = metadataConfig.getExpressionIndexName();
String indexType = metadataConfig.getExpressionIndexType();
// Use a default index name if the indexed column is specified but index name is not
if (StringUtils.isNullOrEmpty(indexName) && StringUtils.nonEmpty(indexedColumn)) {
indexName = PARTITION_NAME_EXPRESSION_INDEX_PREFIX + indexedColumn;
}
// if user defined index name does not contain the expression_index_ prefix, then add it
if (StringUtils.nonEmpty(indexName) && !indexName.startsWith(PARTITION_NAME_EXPRESSION_INDEX_PREFIX)) {
indexName = PARTITION_NAME_EXPRESSION_INDEX_PREFIX + indexName;
}
// Build and register the new index definition
HoodieIndexDefinition indexDefinition = HoodieIndexDefinition.newBuilder()
.withIndexName(indexName)
.withIndexType(indexType)
.withSourceFields(Collections.singletonList(indexedColumn))
.withIndexOptions(metadataConfig.getExpressionIndexOptions())
.build();
dataMetaClient.buildIndexDefinition(indexDefinition);
// Re-fetch the partitions after adding the new definition
expressionIndexPartitionsToInit = getIndexPartitionsToInit(partitionType, dataMetaClient);
} else {
return null;
}
}
return expressionIndexPartitionsToInit;
}

public static Set<String> getSecondaryIndexPartitionsToInit(MetadataPartitionType partitionType, HoodieMetadataConfig metadataConfig, HoodieTableMetaClient dataMetaClient) {
Set<String> secondaryIndexPartitionsToInit = getIndexPartitionsToInit(partitionType, dataMetaClient);
// if no secondary index partition found, check if new secondary index definition need to be added based on metadata write configs
if (secondaryIndexPartitionsToInit.isEmpty() && isNewSecondaryIndexDefinitionRequired(metadataConfig, dataMetaClient)) {
String indexedColumn = metadataConfig.getSecondaryIndexColumn();
String indexName = metadataConfig.getSecondaryIndexName();
// Use a default index name if the indexed column is specified but index name is not
if (StringUtils.isNullOrEmpty(indexName) && StringUtils.nonEmpty(indexedColumn)) {
indexName = PARTITION_NAME_SECONDARY_INDEX_PREFIX + indexedColumn;
}
// if user defined index name does not contain the secondary_index_ prefix, then add it
if (StringUtils.nonEmpty(indexName) && !indexName.startsWith(PARTITION_NAME_SECONDARY_INDEX_PREFIX)) {
indexName = PARTITION_NAME_SECONDARY_INDEX_PREFIX + indexName;
}
// Build and register the new index definition
HoodieIndexDefinition indexDefinition = HoodieIndexDefinition.newBuilder()
.withIndexName(indexName)
.withIndexType(PARTITION_NAME_SECONDARY_INDEX)
.withSourceFields(Collections.singletonList(indexedColumn))
.build();
dataMetaClient.buildIndexDefinition(indexDefinition);
// Re-fetch the partitions after adding the new definition
secondaryIndexPartitionsToInit = getIndexPartitionsToInit(partitionType, dataMetaClient);
}
return secondaryIndexPartitionsToInit;
}

private static Set<String> getIndexPartitionsToInit(MetadataPartitionType partitionType, HoodieTableMetaClient dataMetaClient) {
if (dataMetaClient.getIndexMetadata().isEmpty()) {
return Collections.emptySet();
}

Set<String> indexPartitions = dataMetaClient.getIndexMetadata().get().getIndexDefinitions().values().stream()
.map(HoodieIndexDefinition::getIndexName)
.filter(indexName -> indexName.startsWith(partitionType.getPartitionPath()))
.collect(Collectors.toSet());
Set<String> completedMetadataPartitions = dataMetaClient.getTableConfig().getMetadataPartitions();
indexPartitions.removeAll(completedMetadataPartitions);
return indexPartitions;
}

/**
* A class which represents a directory and the files and directories inside it.
* <p>
Expand Down
Loading

0 comments on commit 7baa38c

Please sign in to comment.