Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8163] Refactor UnMergedLogHandler with iterators #12608

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,12 @@ public static HoodieData<HoodieRecord> getExpressionIndexRecordsUsingBloomFilter
public static List<Row> readRecordsAsRows(StoragePath[] paths, SQLContext sqlContext,
HoodieTableMetaClient metaClient, Schema schema,
HoodieWriteConfig dataWriteConfig, boolean isBaseFile) {
List<HoodieRecord> records = isBaseFile ? getBaseFileRecords(new HoodieBaseFile(paths[0].toString()), metaClient, schema)
Iterator<? extends HoodieRecord> records = isBaseFile ? getBaseFileRecords(new HoodieBaseFile(paths[0].toString()), metaClient, schema)
: getUnmergedLogFileRecords(Arrays.stream(paths).map(StoragePath::toString).collect(Collectors.toList()), metaClient, schema);
return toRows(records, schema, dataWriteConfig, sqlContext, paths[0].toString());
}

private static List<HoodieRecord> getUnmergedLogFileRecords(List<String> logFilePaths, HoodieTableMetaClient metaClient, Schema readerSchema) {
List<HoodieRecord> records = new ArrayList<>();
private static Iterator<HoodieRecord<?>> getUnmergedLogFileRecords(List<String> logFilePaths, HoodieTableMetaClient metaClient, Schema readerSchema) {
HoodieUnMergedLogRecordScanner scanner = HoodieUnMergedLogRecordScanner.newBuilder()
.withStorage(metaClient.getStorage())
.withBasePath(metaClient.getBasePath())
Expand All @@ -190,40 +189,36 @@ private static List<HoodieRecord> getUnmergedLogFileRecords(List<String> logFile
.withLatestInstantTime(metaClient.getActiveTimeline().getCommitsTimeline().lastInstant().get().requestedTime())
.withReaderSchema(readerSchema)
.withTableMetaClient(metaClient)
.withLogRecordScannerCallback(records::add)
.build();
scanner.scan(false);
return records;
return scanner.iterator();
}

private static List<HoodieRecord> getBaseFileRecords(HoodieBaseFile baseFile, HoodieTableMetaClient metaClient, Schema readerSchema) {
List<HoodieRecord> records = new ArrayList<>();
HoodieRecordMerger recordMerger =
HoodieRecordUtils.createRecordMerger(metaClient.getBasePath().toString(), EngineType.SPARK, Collections.emptyList(),
metaClient.getTableConfig().getRecordMergeStrategyId());
try (HoodieFileReader baseFileReader = HoodieIOFactory.getIOFactory(metaClient.getStorage()).getReaderFactory(recordMerger.getRecordType())
.getFileReader(getReaderConfigs(metaClient.getStorageConf()), baseFile.getStoragePath())) {
baseFileReader.getRecordIterator(readerSchema).forEachRemaining((record) -> records.add((HoodieRecord) record));
return records;
} catch (IOException e) {
throw new HoodieIOException("Error reading base file " + baseFile.getFileName(), e);
}
private static Iterator<HoodieRecord> getBaseFileRecords(HoodieBaseFile baseFile, HoodieTableMetaClient metaClient, Schema readerSchema) {
HoodieRecordMerger recordMerger =
HoodieRecordUtils.createRecordMerger(metaClient.getBasePath().toString(), EngineType.SPARK, Collections.emptyList(),
metaClient.getTableConfig().getRecordMergeStrategyId());
try {
HoodieFileReader baseFileReader = HoodieIOFactory.getIOFactory(metaClient.getStorage()).getReaderFactory(recordMerger.getRecordType())
.getFileReader(getReaderConfigs(metaClient.getStorageConf()), baseFile.getStoragePath());
return baseFileReader.getRecordIterator(readerSchema);
} catch (IOException e) {
throw new HoodieIOException("Error reading base file " + baseFile.getFileName(), e);
}

private static List<Row> toRows(List<HoodieRecord> records, Schema schema, HoodieWriteConfig dataWriteConfig, SQLContext sqlContext, String path) {
}
private static List<Row> toRows(Iterator<? extends HoodieRecord> records, Schema schema, HoodieWriteConfig dataWriteConfig, SQLContext sqlContext, String path) {
StructType structType = AvroConversionUtils.convertAvroSchemaToStructType(schema);
Function1<GenericRecord, Row> converterToRow = AvroConversionUtils.createConverterToRow(schema, structType);
List<Row> avroRecords = records.stream()
.map(r -> {
try {
return (GenericRecord) (r.getData() instanceof GenericRecord ? r.getData()
: ((HoodieRecordPayload) r.getData()).getInsertValue(schema, dataWriteConfig.getProps()).get());
} catch (IOException e) {
throw new HoodieIOException("Could not fetch record payload");
}
})
.map(converterToRow::apply)
.collect(Collectors.toList());
List<Row> avroRecords = new ArrayList<>();
records.forEachRemaining(record -> {
try {
GenericRecord genericRecord = (GenericRecord) (record.getData() instanceof GenericRecord ? record.getData()
: ((HoodieRecordPayload) record.getData()).getInsertValue(schema, dataWriteConfig.getProps()).get());
avroRecords.add(converterToRow.apply(genericRecord));
} catch (IOException e) {
throw new HoodieIOException("Could not fetch record payload", e);
}
});
return avroRecords;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,7 @@ public List<String> getValidBlockInstants() {
return validBlockInstants;
}

private Pair<ClosableIterator<HoodieRecord>, Schema> getRecordsIterator(
protected Pair<ClosableIterator<HoodieRecord>, Schema> getRecordsIterator(
HoodieDataBlock dataBlock, Option<KeySpec> keySpecOpt) throws IOException {
ClosableIterator<HoodieRecord> blockRecordsIterator;
if (keySpecOpt.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,23 @@
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;

import org.apache.avro.Schema;

import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -111,6 +118,48 @@ public interface RecordDeletionCallback {
void apply(HoodieKey deletedKey);
}

/**
* Returns an iterator over the log records.
*/
public ClosableIterator<HoodieRecord<?>> iterator() {
return new ClosableIterator<HoodieRecord<?>>() {
private final Iterator<HoodieLogBlock> logBlockIterator = getCurrentInstantLogBlocks().iterator();
private ClosableIterator<HoodieRecord> recordIterator = null;

@Override
public boolean hasNext() {
try {
while ((recordIterator == null || !recordIterator.hasNext()) && logBlockIterator.hasNext()) {
HoodieLogBlock logBlock = logBlockIterator.next();
if (logBlock instanceof HoodieDataBlock) {
HoodieDataBlock dataBlock = (HoodieDataBlock) logBlock;
Pair<ClosableIterator<HoodieRecord>, Schema> recordsIteratorSchemaPair = getRecordsIterator(dataBlock, Option.empty());
recordIterator = recordsIteratorSchemaPair.getLeft();
}
}
return recordIterator != null && recordIterator.hasNext();
} catch (Exception e) {
throw new HoodieException("Error while iterating over log records", e);
}
}

@Override
public HoodieRecord<?> next() {
if (!hasNext()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we remove this check because hasNext should always be invoked before next.

throw new NoSuchElementException();
}
return recordIterator.next();
}

@Override
public void close() {
if (recordIterator != null) {
recordIterator.close();
}
}
};
}

/**
* Builder used to build {@code HoodieUnMergedLogRecordScanner}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
import org.apache.hudi.common.util.queue.HoodieProducer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.internal.schema.InternalSchema;
Expand All @@ -51,6 +53,7 @@
import org.apache.avro.generic.IndexedRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.apache.hadoop.conf.Configuration;

import java.io.IOException;
Expand Down Expand Up @@ -237,10 +240,14 @@ private List<HoodieProducer<HoodieRecord<?>>> getParallelProducers(
) {
List<HoodieProducer<HoodieRecord<?>>> producers = new ArrayList<>();
producers.add(new FunctionBasedQueueProducer<>(queue -> {
HoodieUnMergedLogRecordScanner scanner =
scannerBuilder.withLogRecordScannerCallback(queue::insertRecord).build();
// Scan all the delta-log files, filling in the queue
scanner.scan();
HoodieUnMergedLogRecordScanner scanner = scannerBuilder.build();
try (ClosableIterator<HoodieRecord<?>> logRecordIterator = scanner.iterator()) {
while (logRecordIterator.hasNext()) {
queue.insertRecord(logRecordIterator.next());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove the whole producers and queue I think.

}
} catch (Exception e) {
throw new HoodieException("Error converting the record to avro", e);
}
return null;
}));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@
package org.apache.hudi.hadoop.realtime;

import org.apache.hudi.common.config.HoodieMemoryConfig;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
import org.apache.hudi.common.util.queue.HoodieProducer;
import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.RecordReaderValueIterator;
import org.apache.hudi.hadoop.SafeParquetRecordReaderWrapper;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
Expand Down Expand Up @@ -69,7 +72,7 @@ class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader
* @param realReader Parquet Reader
*/
public RealtimeUnmergedRecordReader(RealtimeSplit split, JobConf job,
RecordReader<NullWritable, ArrayWritable> realReader) {
RecordReader<NullWritable, ArrayWritable> realReader) {
super(split, job);
this.parquetReader = new SafeParquetRecordReaderWrapper(realReader);
// Iterator for consuming records from parquet file
Expand Down Expand Up @@ -105,16 +108,18 @@ private List<HoodieProducer<ArrayWritable>> getParallelProducers(
) {
return Arrays.asList(
new FunctionBasedQueueProducer<>(queue -> {
HoodieUnMergedLogRecordScanner scanner =
scannerBuilder.withLogRecordScannerCallback(record -> {
// convert Hoodie log record to Hadoop AvroWritable and buffer
GenericRecord rec = (GenericRecord) record.toIndexedRecord(getReaderSchema(), payloadProps).get().getData();
ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(rec, getHiveSchema(), isSupportTimestamp());
queue.insertRecord(aWritable);
})
.build();
// Scan all the delta-log files, filling in the queue
scanner.scan();
HoodieUnMergedLogRecordScanner scanner = scannerBuilder.build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice refactoring, can we also fix the usage of HoodieUnMergedLogRecordScanner for Flink MergeOnReadInputFormat#getUnMergedLogFileIterator

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adjusted the usage within BoundedMemoryRecords in MergeOnReadInputFormat#getUnMergedLogFileIterator

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The BoundedMemoryRecords can be eliminated.

try (ClosableIterator<HoodieRecord<?>> logRecordIterator = scanner.iterator()) {
while (logRecordIterator.hasNext()) {
HoodieRecord<?> record = logRecordIterator.next();
// convert Hoodie log record to Hadoop AvroWritable and buffer
GenericRecord rec = (GenericRecord) record.toIndexedRecord(getReaderSchema(), payloadProps).get().getData();
ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(rec, getHiveSchema(), isSupportTimestamp());
queue.insertRecord(aWritable);
}
} catch (Exception e) {
throw new HoodieException("Error converting the record to avro", e);
}
return null;
}),
new IteratorBasedQueueProducer<>(parquetRecordsIterator)
Expand Down