From 6354f79ea771070b7b61f2e5959dd642425ff3b8 Mon Sep 17 00:00:00 2001 From: vamshi Date: Thu, 9 Jan 2025 04:30:29 -0800 Subject: [PATCH 01/15] Replace callbacks with iterator --- .../utils/SparkMetadataWriterUtils.java | 11 +++- .../log/AbstractHoodieLogRecordScanner.java | 5 +- .../log/HoodieUnMergedLogRecordScanner.java | 64 ++++++++++++------- .../metadata/HoodieTableMetadataUtil.java | 34 +++++++--- .../apache/hudi/table/format/FormatUtils.java | 15 +++-- .../RealtimeUnmergedRecordReader.java | 23 ++++--- .../testutils/LogFileColStatsTestUtil.java | 14 +++- 7 files changed, 115 insertions(+), 51 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java index eb466ee5252da..7fa52716dcf73 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java @@ -190,9 +190,16 @@ private static List getUnmergedLogFileRecords(List logFile .withLatestInstantTime(metaClient.getActiveTimeline().getCommitsTimeline().lastInstant().get().requestedTime()) .withReaderSchema(readerSchema) .withTableMetaClient(metaClient) - .withLogRecordScannerCallback(records::add) .build(); - scanner.scan(false); + Iterator> recordIterator = scanner.iterator(); + while (recordIterator.hasNext()) { + try { + records.add(recordIterator.next + ()); + } catch (Exception e) { + throw new HoodieException("Error while inserting record into queue", e); + } + } return records; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java index 10aa50efe3e16..447238abf8eea 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java @@ -617,6 +617,8 @@ && compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), GREATER_THA /** * Iterate over the GenericRecord in the block, read the hoodie key and partition path and call subclass processors to * handle it. + * TODO: + * 1. what is the purpose of this method? should the HoodieRecord be added to a queue and consumed by an iterator? */ private void processDataBlock(HoodieDataBlock dataBlock, Option keySpecOpt) throws Exception { checkState(partitionNameOverrideOpt.isPresent() || partitionPathFieldOpt.isPresent(), @@ -675,6 +677,7 @@ private void processQueuedBlocksForInstant(Deque logBlocks, int processDataBlock((HoodieDataBlock) lastBlock, keySpecOpt); break; case DELETE_BLOCK: + // TODO: same question as processDataBlock Arrays.stream(((HoodieDeleteBlock) lastBlock).getRecordsToDelete()).forEach(this::processNextDeletedRecord); break; case CORRUPT_BLOCK: @@ -797,7 +800,7 @@ public List getValidBlockInstants() { return validBlockInstants; } - private Pair, Schema> getRecordsIterator( + protected Pair, Schema> getRecordsIterator( HoodieDataBlock dataBlock, Option keySpecOpt) throws IOException { ClosableIterator blockRecordsIterator; if (keySpecOpt.isPresent()) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java index c23c5f42f1923..8afa64abfc8fc 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java @@ -25,15 +25,22 @@ import org.apache.hudi.common.model.HoodieRecordMerger; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.cdc.HoodieCDCUtils; +import org.apache.hudi.common.table.log.block.HoodieDataBlock; +import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.util.HoodieRecordUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; import org.apache.avro.Schema; +import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.stream.Collectors; @@ -42,20 +49,14 @@ */ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordScanner { - private final LogRecordScannerCallback callback; - private final RecordDeletionCallback recordDeletionCallback; - private HoodieUnMergedLogRecordScanner(HoodieStorage storage, String basePath, List logFilePaths, Schema readerSchema, String latestInstantTime, boolean reverseReader, int bufferSize, - LogRecordScannerCallback callback, RecordDeletionCallback recordDeletionCallback, Option instantRange, InternalSchema internalSchema, boolean enableOptimizedLogBlocksScan, HoodieRecordMerger recordMerger, Option hoodieTableMetaClientOption) { super(storage, basePath, logFilePaths, readerSchema, latestInstantTime, reverseReader, bufferSize, instantRange, false, true, Option.empty(), internalSchema, Option.empty(), enableOptimizedLogBlocksScan, recordMerger, hoodieTableMetaClientOption); - this.callback = callback; - this.recordDeletionCallback = recordDeletionCallback; } /** @@ -76,22 +77,23 @@ public static HoodieUnMergedLogRecordScanner.Builder newBuilder() { return new Builder(); } + // TODO: check whether we can remove this method @Override protected void processNextRecord(HoodieRecord hoodieRecord) throws Exception { // NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific // payload pointing into a shared, mutable (underlying) buffer we get a clean copy of // it since these records will be put into queue of BoundedInMemoryExecutor. // Just call callback without merging - if (callback != null) { - callback.apply(hoodieRecord.copy()); - } +// if (callback != null) { +// callback.apply(hoodieRecord.copy()); +// } } @Override protected void processNextDeletedRecord(DeleteRecord deleteRecord) { - if (recordDeletionCallback != null) { - recordDeletionCallback.apply(deleteRecord.getHoodieKey()); - } +// if (recordDeletionCallback != null) { +// recordDeletionCallback.apply(deleteRecord.getHoodieKey()); +// } } /** @@ -111,6 +113,32 @@ public interface RecordDeletionCallback { void apply(HoodieKey deletedKey); } + /** + * Returns an iterator over the log records. + */ + public Iterator> iterator() { + List> records = new ArrayList<>(); + try { + scan(); + + while (!getCurrentInstantLogBlocks().isEmpty()) { + HoodieLogBlock lastBlock = getCurrentInstantLogBlocks().pollLast(); + if (lastBlock instanceof HoodieDataBlock) { + HoodieDataBlock dataBlock = (HoodieDataBlock) lastBlock; + Pair, Schema> recordsIteratorSchemaPair = getRecordsIterator(dataBlock, Option.empty()); + try (ClosableIterator recordIterator = recordsIteratorSchemaPair.getLeft()) { + while (recordIterator.hasNext()) { + records.add(recordIterator.next()); + } + } + } + } + } catch (Exception e) { + throw new HoodieException("Error while iterating over log records", e); + } + return records.iterator(); + } + /** * Builder used to build {@code HoodieUnMergedLogRecordScanner}. */ @@ -185,16 +213,6 @@ public Builder withInstantRange(Option instantRange) { return this; } - public Builder withLogRecordScannerCallback(LogRecordScannerCallback callback) { - this.callback = callback; - return this; - } - - public Builder withRecordDeletionCallback(RecordDeletionCallback recordDeletionCallback) { - this.recordDeletionCallback = recordDeletionCallback; - return this; - } - @Override public Builder withOptimizedLogBlocksScan(boolean enableOptimizedLogBlocksScan) { this.enableOptimizedLogBlocksScan = enableOptimizedLogBlocksScan; @@ -219,7 +237,7 @@ public HoodieUnMergedLogRecordScanner build() { ValidationUtils.checkArgument(recordMerger != null); return new HoodieUnMergedLogRecordScanner(storage, basePath, logFilePaths, readerSchema, - latestInstantTime, reverseReader, bufferSize, callback, recordDeletionCallback, instantRange, + latestInstantTime, reverseReader, bufferSize, instantRange, internalSchema, enableOptimizedLogBlocksScan, recordMerger, Option.ofNullable(hoodieTableMetaClient)); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 184f11220dbaa..c8cdac5bf30e6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -127,6 +127,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -1053,14 +1054,21 @@ public static Set getRecordKeys(List logFilePaths, HoodieTableMe .withLatestInstantTime(latestCommitTimestamp) .withReaderSchema(writerSchemaOpt.get()) .withTableMetaClient(datasetMetaClient); - if (includeValidKeys) { - builder.withLogRecordScannerCallback(record -> allRecordKeys.add(record.getRecordKey())); - } - if (includeDeletedKeys) { - builder.withRecordDeletionCallback(deletedKey -> allRecordKeys.add(deletedKey.getRecordKey())); + + Iterator> recordIterator = builder.build().iterator(); + while (recordIterator.hasNext()) { + try { + HoodieRecord record = recordIterator.next(); + if (includeValidKeys) { + allRecordKeys.add(record.getRecordKey()); + } + if (includeDeletedKeys) { + allRecordKeys.add(record.getRecordKey()); + } + } catch (Exception e) { + throw new HoodieException("Error while inserting record into queue", e); + } } - HoodieUnMergedLogRecordScanner scanner = builder.build(); - scanner.scan(); return allRecordKeys; } return Collections.emptySet(); @@ -1656,9 +1664,17 @@ public static List> getLogFileColumnRangeM .withLatestInstantTime(datasetMetaClient.getActiveTimeline().getCommitsTimeline().lastInstant().get().requestedTime()) .withReaderSchema(writerSchemaOpt.get()) .withTableMetaClient(datasetMetaClient) - .withLogRecordScannerCallback(records::add) .build(); - scanner.scan(); + + Iterator> recordIterator = scanner.iterator(); + while (recordIterator.hasNext()) { + try { + records.add(recordIterator.next()); + } catch (Exception e) { + throw new HoodieException("Error while inserting record into queue", e); + } + } + if (records.isEmpty()) { return Collections.emptyList(); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java index 0628673b8e58e..56c96ca51be92 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java @@ -36,6 +36,7 @@ import org.apache.hudi.common.util.queue.HoodieProducer; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.internal.schema.InternalSchema; @@ -237,10 +238,16 @@ private List>> getParallelProducers( ) { List>> producers = new ArrayList<>(); producers.add(new FunctionBasedQueueProducer<>(queue -> { - HoodieUnMergedLogRecordScanner scanner = - scannerBuilder.withLogRecordScannerCallback(queue::insertRecord).build(); - // Scan all the delta-log files, filling in the queue - scanner.scan(); + HoodieUnMergedLogRecordScanner scanner = scannerBuilder.build(); + // Use the iterator to process records + Iterator> recordIterator = scanner.iterator(); + while (recordIterator.hasNext()) { + try { + queue.insertRecord(recordIterator.next()); + } catch (Exception e) { + throw new HoodieException("Error while inserting record into queue", e); + } + } return null; })); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java index 396705c151892..ac1cbf64c530f 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java @@ -19,6 +19,7 @@ package org.apache.hudi.hadoop.realtime; import org.apache.hudi.common.config.HoodieMemoryConfig; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner; import org.apache.hudi.common.util.DefaultSizeEstimator; import org.apache.hudi.common.util.Functions; @@ -27,6 +28,7 @@ import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer; import org.apache.hudi.common.util.queue.HoodieProducer; import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hadoop.RecordReaderValueIterator; import org.apache.hudi.hadoop.SafeParquetRecordReaderWrapper; import org.apache.hudi.hadoop.fs.HadoopFSUtils; @@ -105,16 +107,17 @@ private List> getParallelProducers( ) { return Arrays.asList( new FunctionBasedQueueProducer<>(queue -> { - HoodieUnMergedLogRecordScanner scanner = - scannerBuilder.withLogRecordScannerCallback(record -> { - // convert Hoodie log record to Hadoop AvroWritable and buffer - GenericRecord rec = (GenericRecord) record.toIndexedRecord(getReaderSchema(), payloadProps).get().getData(); - ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(rec, getHiveSchema(), isSupportTimestamp()); - queue.insertRecord(aWritable); - }) - .build(); - // Scan all the delta-log files, filling in the queue - scanner.scan(); + HoodieUnMergedLogRecordScanner scanner = scannerBuilder.build(); + Iterator> recordIterator = scanner.iterator(); + while (recordIterator.hasNext()) { + try { + GenericRecord rec = (GenericRecord) recordIterator.next().toIndexedRecord(getReaderSchema(), payloadProps).get().getData(); + ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(rec, getHiveSchema(), isSupportTimestamp()); + queue.insertRecord(aWritable); + } catch (Exception e) { + throw new HoodieException("Error while inserting record into queue", e); + } + } return null; }), new IteratorBasedQueueProducer<>(parquetRecordsIterator) diff --git a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/testutils/LogFileColStatsTestUtil.java b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/testutils/LogFileColStatsTestUtil.java index 464ad5ddca1e4..c1ee48a7fb684 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/testutils/LogFileColStatsTestUtil.java +++ b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/testutils/LogFileColStatsTestUtil.java @@ -33,6 +33,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -59,12 +60,21 @@ public static Option getLogFileColumnRangeMetadata(String filePath, HoodieT .withBufferSize(maxBufferSize) .withLatestInstantTime(latestCommitTime) .withReaderSchema(writerSchemaOpt.get()) - .withLogRecordScannerCallback(records::add) .build(); - scanner.scan(); + + Iterator> recordIterator = scanner.iterator(); + while (recordIterator.hasNext()) { + try { + records.add(recordIterator.next()); + } catch (Exception e) { + throw new HoodieException("Error while inserting record into queue", e); + } + } + if (records.isEmpty()) { return Option.empty(); } + Map> columnRangeMetadataMap = collectColumnRangeMetadata(records, fieldsToIndex, filePath, writerSchemaOpt.get()); List> columnRangeMetadataList = new ArrayList<>(columnRangeMetadataMap.values()); From 64a97560d6f9cb9eee3979e9b54b358b586f095e Mon Sep 17 00:00:00 2001 From: vamshi Date: Tue, 14 Jan 2025 20:25:49 -0800 Subject: [PATCH 02/15] Revert "Replace callbacks with iterator" This reverts commit 6354f79ea771070b7b61f2e5959dd642425ff3b8. --- .../utils/SparkMetadataWriterUtils.java | 11 +--- .../log/AbstractHoodieLogRecordScanner.java | 5 +- .../log/HoodieUnMergedLogRecordScanner.java | 64 +++++++------------ .../metadata/HoodieTableMetadataUtil.java | 34 +++------- .../apache/hudi/table/format/FormatUtils.java | 15 ++--- .../RealtimeUnmergedRecordReader.java | 23 +++---- .../testutils/LogFileColStatsTestUtil.java | 14 +--- 7 files changed, 51 insertions(+), 115 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java index 7fa52716dcf73..eb466ee5252da 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java @@ -190,16 +190,9 @@ private static List getUnmergedLogFileRecords(List logFile .withLatestInstantTime(metaClient.getActiveTimeline().getCommitsTimeline().lastInstant().get().requestedTime()) .withReaderSchema(readerSchema) .withTableMetaClient(metaClient) + .withLogRecordScannerCallback(records::add) .build(); - Iterator> recordIterator = scanner.iterator(); - while (recordIterator.hasNext()) { - try { - records.add(recordIterator.next - ()); - } catch (Exception e) { - throw new HoodieException("Error while inserting record into queue", e); - } - } + scanner.scan(false); return records; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java index 447238abf8eea..10aa50efe3e16 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java @@ -617,8 +617,6 @@ && compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), GREATER_THA /** * Iterate over the GenericRecord in the block, read the hoodie key and partition path and call subclass processors to * handle it. - * TODO: - * 1. what is the purpose of this method? should the HoodieRecord be added to a queue and consumed by an iterator? */ private void processDataBlock(HoodieDataBlock dataBlock, Option keySpecOpt) throws Exception { checkState(partitionNameOverrideOpt.isPresent() || partitionPathFieldOpt.isPresent(), @@ -677,7 +675,6 @@ private void processQueuedBlocksForInstant(Deque logBlocks, int processDataBlock((HoodieDataBlock) lastBlock, keySpecOpt); break; case DELETE_BLOCK: - // TODO: same question as processDataBlock Arrays.stream(((HoodieDeleteBlock) lastBlock).getRecordsToDelete()).forEach(this::processNextDeletedRecord); break; case CORRUPT_BLOCK: @@ -800,7 +797,7 @@ public List getValidBlockInstants() { return validBlockInstants; } - protected Pair, Schema> getRecordsIterator( + private Pair, Schema> getRecordsIterator( HoodieDataBlock dataBlock, Option keySpecOpt) throws IOException { ClosableIterator blockRecordsIterator; if (keySpecOpt.isPresent()) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java index 8afa64abfc8fc..c23c5f42f1923 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java @@ -25,22 +25,15 @@ import org.apache.hudi.common.model.HoodieRecordMerger; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.cdc.HoodieCDCUtils; -import org.apache.hudi.common.table.log.block.HoodieDataBlock; -import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.util.HoodieRecordUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; -import org.apache.hudi.common.util.collection.ClosableIterator; -import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.exception.HoodieException; import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; import org.apache.avro.Schema; -import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import java.util.stream.Collectors; @@ -49,14 +42,20 @@ */ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordScanner { + private final LogRecordScannerCallback callback; + private final RecordDeletionCallback recordDeletionCallback; + private HoodieUnMergedLogRecordScanner(HoodieStorage storage, String basePath, List logFilePaths, Schema readerSchema, String latestInstantTime, boolean reverseReader, int bufferSize, + LogRecordScannerCallback callback, RecordDeletionCallback recordDeletionCallback, Option instantRange, InternalSchema internalSchema, boolean enableOptimizedLogBlocksScan, HoodieRecordMerger recordMerger, Option hoodieTableMetaClientOption) { super(storage, basePath, logFilePaths, readerSchema, latestInstantTime, reverseReader, bufferSize, instantRange, false, true, Option.empty(), internalSchema, Option.empty(), enableOptimizedLogBlocksScan, recordMerger, hoodieTableMetaClientOption); + this.callback = callback; + this.recordDeletionCallback = recordDeletionCallback; } /** @@ -77,23 +76,22 @@ public static HoodieUnMergedLogRecordScanner.Builder newBuilder() { return new Builder(); } - // TODO: check whether we can remove this method @Override protected void processNextRecord(HoodieRecord hoodieRecord) throws Exception { // NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific // payload pointing into a shared, mutable (underlying) buffer we get a clean copy of // it since these records will be put into queue of BoundedInMemoryExecutor. // Just call callback without merging -// if (callback != null) { -// callback.apply(hoodieRecord.copy()); -// } + if (callback != null) { + callback.apply(hoodieRecord.copy()); + } } @Override protected void processNextDeletedRecord(DeleteRecord deleteRecord) { -// if (recordDeletionCallback != null) { -// recordDeletionCallback.apply(deleteRecord.getHoodieKey()); -// } + if (recordDeletionCallback != null) { + recordDeletionCallback.apply(deleteRecord.getHoodieKey()); + } } /** @@ -113,32 +111,6 @@ public interface RecordDeletionCallback { void apply(HoodieKey deletedKey); } - /** - * Returns an iterator over the log records. - */ - public Iterator> iterator() { - List> records = new ArrayList<>(); - try { - scan(); - - while (!getCurrentInstantLogBlocks().isEmpty()) { - HoodieLogBlock lastBlock = getCurrentInstantLogBlocks().pollLast(); - if (lastBlock instanceof HoodieDataBlock) { - HoodieDataBlock dataBlock = (HoodieDataBlock) lastBlock; - Pair, Schema> recordsIteratorSchemaPair = getRecordsIterator(dataBlock, Option.empty()); - try (ClosableIterator recordIterator = recordsIteratorSchemaPair.getLeft()) { - while (recordIterator.hasNext()) { - records.add(recordIterator.next()); - } - } - } - } - } catch (Exception e) { - throw new HoodieException("Error while iterating over log records", e); - } - return records.iterator(); - } - /** * Builder used to build {@code HoodieUnMergedLogRecordScanner}. */ @@ -213,6 +185,16 @@ public Builder withInstantRange(Option instantRange) { return this; } + public Builder withLogRecordScannerCallback(LogRecordScannerCallback callback) { + this.callback = callback; + return this; + } + + public Builder withRecordDeletionCallback(RecordDeletionCallback recordDeletionCallback) { + this.recordDeletionCallback = recordDeletionCallback; + return this; + } + @Override public Builder withOptimizedLogBlocksScan(boolean enableOptimizedLogBlocksScan) { this.enableOptimizedLogBlocksScan = enableOptimizedLogBlocksScan; @@ -237,7 +219,7 @@ public HoodieUnMergedLogRecordScanner build() { ValidationUtils.checkArgument(recordMerger != null); return new HoodieUnMergedLogRecordScanner(storage, basePath, logFilePaths, readerSchema, - latestInstantTime, reverseReader, bufferSize, instantRange, + latestInstantTime, reverseReader, bufferSize, callback, recordDeletionCallback, instantRange, internalSchema, enableOptimizedLogBlocksScan, recordMerger, Option.ofNullable(hoodieTableMetaClient)); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index c8cdac5bf30e6..184f11220dbaa 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -127,7 +127,6 @@ import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -1054,21 +1053,14 @@ public static Set getRecordKeys(List logFilePaths, HoodieTableMe .withLatestInstantTime(latestCommitTimestamp) .withReaderSchema(writerSchemaOpt.get()) .withTableMetaClient(datasetMetaClient); - - Iterator> recordIterator = builder.build().iterator(); - while (recordIterator.hasNext()) { - try { - HoodieRecord record = recordIterator.next(); - if (includeValidKeys) { - allRecordKeys.add(record.getRecordKey()); - } - if (includeDeletedKeys) { - allRecordKeys.add(record.getRecordKey()); - } - } catch (Exception e) { - throw new HoodieException("Error while inserting record into queue", e); - } + if (includeValidKeys) { + builder.withLogRecordScannerCallback(record -> allRecordKeys.add(record.getRecordKey())); + } + if (includeDeletedKeys) { + builder.withRecordDeletionCallback(deletedKey -> allRecordKeys.add(deletedKey.getRecordKey())); } + HoodieUnMergedLogRecordScanner scanner = builder.build(); + scanner.scan(); return allRecordKeys; } return Collections.emptySet(); @@ -1664,17 +1656,9 @@ public static List> getLogFileColumnRangeM .withLatestInstantTime(datasetMetaClient.getActiveTimeline().getCommitsTimeline().lastInstant().get().requestedTime()) .withReaderSchema(writerSchemaOpt.get()) .withTableMetaClient(datasetMetaClient) + .withLogRecordScannerCallback(records::add) .build(); - - Iterator> recordIterator = scanner.iterator(); - while (recordIterator.hasNext()) { - try { - records.add(recordIterator.next()); - } catch (Exception e) { - throw new HoodieException("Error while inserting record into queue", e); - } - } - + scanner.scan(); if (records.isEmpty()) { return Collections.emptyList(); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java index 56c96ca51be92..0628673b8e58e 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java @@ -36,7 +36,6 @@ import org.apache.hudi.common.util.queue.HoodieProducer; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; -import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.internal.schema.InternalSchema; @@ -238,16 +237,10 @@ private List>> getParallelProducers( ) { List>> producers = new ArrayList<>(); producers.add(new FunctionBasedQueueProducer<>(queue -> { - HoodieUnMergedLogRecordScanner scanner = scannerBuilder.build(); - // Use the iterator to process records - Iterator> recordIterator = scanner.iterator(); - while (recordIterator.hasNext()) { - try { - queue.insertRecord(recordIterator.next()); - } catch (Exception e) { - throw new HoodieException("Error while inserting record into queue", e); - } - } + HoodieUnMergedLogRecordScanner scanner = + scannerBuilder.withLogRecordScannerCallback(queue::insertRecord).build(); + // Scan all the delta-log files, filling in the queue + scanner.scan(); return null; })); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java index ac1cbf64c530f..396705c151892 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java @@ -19,7 +19,6 @@ package org.apache.hudi.hadoop.realtime; import org.apache.hudi.common.config.HoodieMemoryConfig; -import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner; import org.apache.hudi.common.util.DefaultSizeEstimator; import org.apache.hudi.common.util.Functions; @@ -28,7 +27,6 @@ import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer; import org.apache.hudi.common.util.queue.HoodieProducer; import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer; -import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hadoop.RecordReaderValueIterator; import org.apache.hudi.hadoop.SafeParquetRecordReaderWrapper; import org.apache.hudi.hadoop.fs.HadoopFSUtils; @@ -107,17 +105,16 @@ private List> getParallelProducers( ) { return Arrays.asList( new FunctionBasedQueueProducer<>(queue -> { - HoodieUnMergedLogRecordScanner scanner = scannerBuilder.build(); - Iterator> recordIterator = scanner.iterator(); - while (recordIterator.hasNext()) { - try { - GenericRecord rec = (GenericRecord) recordIterator.next().toIndexedRecord(getReaderSchema(), payloadProps).get().getData(); - ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(rec, getHiveSchema(), isSupportTimestamp()); - queue.insertRecord(aWritable); - } catch (Exception e) { - throw new HoodieException("Error while inserting record into queue", e); - } - } + HoodieUnMergedLogRecordScanner scanner = + scannerBuilder.withLogRecordScannerCallback(record -> { + // convert Hoodie log record to Hadoop AvroWritable and buffer + GenericRecord rec = (GenericRecord) record.toIndexedRecord(getReaderSchema(), payloadProps).get().getData(); + ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(rec, getHiveSchema(), isSupportTimestamp()); + queue.insertRecord(aWritable); + }) + .build(); + // Scan all the delta-log files, filling in the queue + scanner.scan(); return null; }), new IteratorBasedQueueProducer<>(parquetRecordsIterator) diff --git a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/testutils/LogFileColStatsTestUtil.java b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/testutils/LogFileColStatsTestUtil.java index c1ee48a7fb684..464ad5ddca1e4 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/testutils/LogFileColStatsTestUtil.java +++ b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/testutils/LogFileColStatsTestUtil.java @@ -33,7 +33,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -60,21 +59,12 @@ public static Option getLogFileColumnRangeMetadata(String filePath, HoodieT .withBufferSize(maxBufferSize) .withLatestInstantTime(latestCommitTime) .withReaderSchema(writerSchemaOpt.get()) + .withLogRecordScannerCallback(records::add) .build(); - - Iterator> recordIterator = scanner.iterator(); - while (recordIterator.hasNext()) { - try { - records.add(recordIterator.next()); - } catch (Exception e) { - throw new HoodieException("Error while inserting record into queue", e); - } - } - + scanner.scan(); if (records.isEmpty()) { return Option.empty(); } - Map> columnRangeMetadataMap = collectColumnRangeMetadata(records, fieldsToIndex, filePath, writerSchemaOpt.get()); List> columnRangeMetadataList = new ArrayList<>(columnRangeMetadataMap.values()); From 6425371ce9694a53250ebeffb08405ef4cb73d79 Mon Sep 17 00:00:00 2001 From: vamshi Date: Tue, 14 Jan 2025 18:35:55 -0800 Subject: [PATCH 03/15] save --- .../utils/SparkMetadataWriterUtils.java | 16 --------- .../log/AbstractHoodieLogRecordScanner.java | 2 +- .../log/HoodieUnMergedLogRecordScanner.java | 33 +++++++++++++++++++ 3 files changed, 34 insertions(+), 17 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java index eb466ee5252da..e86f167bcf72c 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java @@ -180,22 +180,6 @@ public static List readRecordsAsRows(StoragePath[] paths, SQLContext sqlCon return toRows(records, schema, dataWriteConfig, sqlContext, paths[0].toString()); } - private static List getUnmergedLogFileRecords(List logFilePaths, HoodieTableMetaClient metaClient, Schema readerSchema) { - List records = new ArrayList<>(); - HoodieUnMergedLogRecordScanner scanner = HoodieUnMergedLogRecordScanner.newBuilder() - .withStorage(metaClient.getStorage()) - .withBasePath(metaClient.getBasePath()) - .withLogFilePaths(logFilePaths) - .withBufferSize(MAX_DFS_STREAM_BUFFER_SIZE.defaultValue()) - .withLatestInstantTime(metaClient.getActiveTimeline().getCommitsTimeline().lastInstant().get().requestedTime()) - .withReaderSchema(readerSchema) - .withTableMetaClient(metaClient) - .withLogRecordScannerCallback(records::add) - .build(); - scanner.scan(false); - return records; - } - private static List getBaseFileRecords(HoodieBaseFile baseFile, HoodieTableMetaClient metaClient, Schema readerSchema) { List records = new ArrayList<>(); HoodieRecordMerger recordMerger = diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java index 10aa50efe3e16..da22a548b504a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java @@ -797,7 +797,7 @@ public List getValidBlockInstants() { return validBlockInstants; } - private Pair, Schema> getRecordsIterator( + protected Pair, Schema> getRecordsIterator( HoodieDataBlock dataBlock, Option keySpecOpt) throws IOException { ClosableIterator blockRecordsIterator; if (keySpecOpt.isPresent()) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java index c23c5f42f1923..54833ffaf685a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java @@ -25,15 +25,22 @@ import org.apache.hudi.common.model.HoodieRecordMerger; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.cdc.HoodieCDCUtils; +import org.apache.hudi.common.table.log.block.HoodieDataBlock; +import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.util.HoodieRecordUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; import org.apache.avro.Schema; +import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.stream.Collectors; @@ -111,6 +118,32 @@ public interface RecordDeletionCallback { void apply(HoodieKey deletedKey); } + /** + * Returns an iterator over the log records. + */ + public Iterator> iterator() { + List> records = new ArrayList<>(); + try { + scan(); + + while (!getCurrentInstantLogBlocks().isEmpty()) { + HoodieLogBlock lastBlock = getCurrentInstantLogBlocks().pollLast(); + if (lastBlock instanceof HoodieDataBlock) { + HoodieDataBlock dataBlock = (HoodieDataBlock) lastBlock; + Pair, Schema> recordsIteratorSchemaPair = getRecordsIterator(dataBlock, Option.empty()); + try (ClosableIterator recordIterator = recordsIteratorSchemaPair.getLeft()) { + while (recordIterator.hasNext()) { + records.add(recordIterator.next()); + } + } + } + } + } catch (Exception e) { + throw new HoodieException("Error while iterating over log records", e); + } + return records.iterator(); + } + /** * Builder used to build {@code HoodieUnMergedLogRecordScanner}. */ From 8a46112d4c161f0b65ac6e3b85f2ca1e31695d52 Mon Sep 17 00:00:00 2001 From: vamshi Date: Tue, 14 Jan 2025 20:04:28 -0800 Subject: [PATCH 04/15] Review comments --- .../utils/SparkMetadataWriterUtils.java | 61 +++++++++++-------- .../RealtimeUnmergedRecordReader.java | 30 +++++---- 2 files changed, 55 insertions(+), 36 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java index e86f167bcf72c..aa21fe435d300 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java @@ -175,39 +175,50 @@ public static HoodieData getExpressionIndexRecordsUsingBloomFilter public static List readRecordsAsRows(StoragePath[] paths, SQLContext sqlContext, HoodieTableMetaClient metaClient, Schema schema, HoodieWriteConfig dataWriteConfig, boolean isBaseFile) { - List records = isBaseFile ? getBaseFileRecords(new HoodieBaseFile(paths[0].toString()), metaClient, schema) + Iterator records = isBaseFile ? getBaseFileRecords(new HoodieBaseFile(paths[0].toString()), metaClient, schema) : getUnmergedLogFileRecords(Arrays.stream(paths).map(StoragePath::toString).collect(Collectors.toList()), metaClient, schema); return toRows(records, schema, dataWriteConfig, sqlContext, paths[0].toString()); } - private static List getBaseFileRecords(HoodieBaseFile baseFile, HoodieTableMetaClient metaClient, Schema readerSchema) { - List records = new ArrayList<>(); - HoodieRecordMerger recordMerger = - HoodieRecordUtils.createRecordMerger(metaClient.getBasePath().toString(), EngineType.SPARK, Collections.emptyList(), - metaClient.getTableConfig().getRecordMergeStrategyId()); - try (HoodieFileReader baseFileReader = HoodieIOFactory.getIOFactory(metaClient.getStorage()).getReaderFactory(recordMerger.getRecordType()) - .getFileReader(getReaderConfigs(metaClient.getStorageConf()), baseFile.getStoragePath())) { - baseFileReader.getRecordIterator(readerSchema).forEachRemaining((record) -> records.add((HoodieRecord) record)); - return records; - } catch (IOException e) { - throw new HoodieIOException("Error reading base file " + baseFile.getFileName(), e); - } + private static Iterator> getUnmergedLogFileRecords(List logFilePaths, HoodieTableMetaClient metaClient, Schema readerSchema) { + HoodieUnMergedLogRecordScanner scanner = HoodieUnMergedLogRecordScanner.newBuilder() + .withStorage(metaClient.getStorage()) + .withBasePath(metaClient.getBasePath()) + .withLogFilePaths(logFilePaths) + .withBufferSize(MAX_DFS_STREAM_BUFFER_SIZE.defaultValue()) + .withLatestInstantTime(metaClient.getActiveTimeline().getCommitsTimeline().lastInstant().get().requestedTime()) + .withReaderSchema(readerSchema) + .withTableMetaClient(metaClient) + .build(); + scanner.scan(false); + return scanner.iterator(); } - private static List toRows(List records, Schema schema, HoodieWriteConfig dataWriteConfig, SQLContext sqlContext, String path) { +private static Iterator getBaseFileRecords(HoodieBaseFile baseFile, HoodieTableMetaClient metaClient, Schema readerSchema) { + HoodieRecordMerger recordMerger = + HoodieRecordUtils.createRecordMerger(metaClient.getBasePath().toString(), EngineType.SPARK, Collections.emptyList(), + metaClient.getTableConfig().getRecordMergeStrategyId()); + try { + HoodieFileReader baseFileReader = HoodieIOFactory.getIOFactory(metaClient.getStorage()).getReaderFactory(recordMerger.getRecordType()) + .getFileReader(getReaderConfigs(metaClient.getStorageConf()), baseFile.getStoragePath()); + return baseFileReader.getRecordIterator(readerSchema); + } catch (IOException e) { + throw new HoodieIOException("Error reading base file " + baseFile.getFileName(), e); + } +} + private static List toRows(Iterator records, Schema schema, HoodieWriteConfig dataWriteConfig, SQLContext sqlContext, String path) { StructType structType = AvroConversionUtils.convertAvroSchemaToStructType(schema); Function1 converterToRow = AvroConversionUtils.createConverterToRow(schema, structType); - List avroRecords = records.stream() - .map(r -> { - try { - return (GenericRecord) (r.getData() instanceof GenericRecord ? r.getData() - : ((HoodieRecordPayload) r.getData()).getInsertValue(schema, dataWriteConfig.getProps()).get()); - } catch (IOException e) { - throw new HoodieIOException("Could not fetch record payload"); - } - }) - .map(converterToRow::apply) - .collect(Collectors.toList()); + List avroRecords = new ArrayList<>(); + records.forEachRemaining(record -> { + try { + GenericRecord genericRecord = (GenericRecord) (record.getData() instanceof GenericRecord ? record.getData() + : ((HoodieRecordPayload) record.getData()).getInsertValue(schema, dataWriteConfig.getProps()).get()); + avroRecords.add(converterToRow.apply(genericRecord)); + } catch (IOException e) { + throw new HoodieIOException("Could not fetch record payload", e); + } + }); return avroRecords; } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java index 396705c151892..ad6c508079a59 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java @@ -19,6 +19,7 @@ package org.apache.hudi.hadoop.realtime; import org.apache.hudi.common.config.HoodieMemoryConfig; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner; import org.apache.hudi.common.util.DefaultSizeEstimator; import org.apache.hudi.common.util.Functions; @@ -27,6 +28,9 @@ import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer; import org.apache.hudi.common.util.queue.HoodieProducer; import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer; +import org.apache.hudi.exception.HoodieAvroSchemaException; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.RecordReaderValueIterator; import org.apache.hudi.hadoop.SafeParquetRecordReaderWrapper; import org.apache.hudi.hadoop.fs.HadoopFSUtils; @@ -69,7 +73,7 @@ class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader * @param realReader Parquet Reader */ public RealtimeUnmergedRecordReader(RealtimeSplit split, JobConf job, - RecordReader realReader) { + RecordReader realReader) { super(split, job); this.parquetReader = new SafeParquetRecordReaderWrapper(realReader); // Iterator for consuming records from parquet file @@ -105,16 +109,20 @@ private List> getParallelProducers( ) { return Arrays.asList( new FunctionBasedQueueProducer<>(queue -> { - HoodieUnMergedLogRecordScanner scanner = - scannerBuilder.withLogRecordScannerCallback(record -> { - // convert Hoodie log record to Hadoop AvroWritable and buffer - GenericRecord rec = (GenericRecord) record.toIndexedRecord(getReaderSchema(), payloadProps).get().getData(); - ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(rec, getHiveSchema(), isSupportTimestamp()); - queue.insertRecord(aWritable); - }) - .build(); - // Scan all the delta-log files, filling in the queue - scanner.scan(); + HoodieUnMergedLogRecordScanner scanner = scannerBuilder.build(); + Iterator> logRecordIterator = scanner.iterator(); + try { + while (logRecordIterator.hasNext()) { + HoodieRecord record = logRecordIterator.next(); + // convert Hoodie log record to Hadoop AvroWritable and buffer + GenericRecord rec = null; + rec = (GenericRecord) record.toIndexedRecord(getReaderSchema(), payloadProps).get().getData(); + ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(rec, getHiveSchema(), isSupportTimestamp()); + queue.insertRecord(aWritable); + } + } catch (Exception e) { + throw new HoodieException("Error converting the record to avro", e); + } return null; }), new IteratorBasedQueueProducer<>(parquetRecordsIterator) From 49c86fa53447cb68433750eeb99c94a9b9e9d67a Mon Sep 17 00:00:00 2001 From: vamshi Date: Tue, 14 Jan 2025 20:32:04 -0800 Subject: [PATCH 05/15] fix cs --- .../hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java index ad6c508079a59..b1109945beb4f 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java @@ -28,9 +28,7 @@ import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer; import org.apache.hudi.common.util.queue.HoodieProducer; import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer; -import org.apache.hudi.exception.HoodieAvroSchemaException; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.RecordReaderValueIterator; import org.apache.hudi.hadoop.SafeParquetRecordReaderWrapper; import org.apache.hudi.hadoop.fs.HadoopFSUtils; From 930e1e9856041e5f1bd552f1d28bcb402c943bc1 Mon Sep 17 00:00:00 2001 From: vamshi Date: Thu, 16 Jan 2025 22:25:29 -0800 Subject: [PATCH 06/15] return nested iterator --- .../log/HoodieUnMergedLogRecordScanner.java | 52 ++++++++++++------- .../apache/hudi/table/format/FormatUtils.java | 15 ++++-- .../RealtimeUnmergedRecordReader.java | 7 ++- 3 files changed, 48 insertions(+), 26 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java index 54833ffaf685a..466fd6ba05597 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java @@ -39,9 +39,9 @@ import org.apache.avro.Schema; -import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.NoSuchElementException; import java.util.stream.Collectors; /** @@ -121,27 +121,43 @@ public interface RecordDeletionCallback { /** * Returns an iterator over the log records. */ - public Iterator> iterator() { - List> records = new ArrayList<>(); - try { - scan(); - - while (!getCurrentInstantLogBlocks().isEmpty()) { - HoodieLogBlock lastBlock = getCurrentInstantLogBlocks().pollLast(); - if (lastBlock instanceof HoodieDataBlock) { - HoodieDataBlock dataBlock = (HoodieDataBlock) lastBlock; - Pair, Schema> recordsIteratorSchemaPair = getRecordsIterator(dataBlock, Option.empty()); - try (ClosableIterator recordIterator = recordsIteratorSchemaPair.getLeft()) { - while (recordIterator.hasNext()) { - records.add(recordIterator.next()); + public ClosableIterator> iterator() { + return new ClosableIterator>() { + private final Iterator logBlockIterator = getCurrentInstantLogBlocks().iterator(); + private ClosableIterator recordIterator = null; + + @Override + public boolean hasNext() { + try { + while ((recordIterator == null || !recordIterator.hasNext()) && logBlockIterator.hasNext()) { + HoodieLogBlock logBlock = logBlockIterator.next(); + if (logBlock instanceof HoodieDataBlock) { + HoodieDataBlock dataBlock = (HoodieDataBlock) logBlock; + Pair, Schema> recordsIteratorSchemaPair = getRecordsIterator(dataBlock, Option.empty()); + recordIterator = recordsIteratorSchemaPair.getLeft(); } } + return recordIterator != null && recordIterator.hasNext(); + } catch (Exception e) { + throw new HoodieException("Error while iterating over log records", e); } } - } catch (Exception e) { - throw new HoodieException("Error while iterating over log records", e); - } - return records.iterator(); + + @Override + public HoodieRecord next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return recordIterator.next(); + } + + @Override + public void close() { + if (recordIterator != null) { + recordIterator.close(); + } + } + }; } /** diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java index 0628673b8e58e..94f9f7024609d 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java @@ -30,12 +30,14 @@ import org.apache.hudi.common.util.Functions; import org.apache.hudi.common.util.HoodieRecordUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer; import org.apache.hudi.common.util.queue.HoodieProducer; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.internal.schema.InternalSchema; @@ -51,6 +53,7 @@ import org.apache.avro.generic.IndexedRecord; import org.apache.flink.table.data.RowData; import org.apache.flink.types.RowKind; +import org.apache.flink.util.CloseableIterator; import org.apache.hadoop.conf.Configuration; import java.io.IOException; @@ -237,10 +240,14 @@ private List>> getParallelProducers( ) { List>> producers = new ArrayList<>(); producers.add(new FunctionBasedQueueProducer<>(queue -> { - HoodieUnMergedLogRecordScanner scanner = - scannerBuilder.withLogRecordScannerCallback(queue::insertRecord).build(); - // Scan all the delta-log files, filling in the queue - scanner.scan(); + HoodieUnMergedLogRecordScanner scanner = scannerBuilder.build(); + try (ClosableIterator> logRecordIterator = scanner.iterator()) { + while (logRecordIterator.hasNext()) { + queue.insertRecord(logRecordIterator.next()); + } + } catch (Exception e) { + throw new HoodieException("Error converting the record to avro", e); + } return null; })); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java index b1109945beb4f..1198d627a6323 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.util.DefaultSizeEstimator; import org.apache.hudi.common.util.Functions; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer; import org.apache.hudi.common.util.queue.HoodieProducer; @@ -108,13 +109,11 @@ private List> getParallelProducers( return Arrays.asList( new FunctionBasedQueueProducer<>(queue -> { HoodieUnMergedLogRecordScanner scanner = scannerBuilder.build(); - Iterator> logRecordIterator = scanner.iterator(); - try { + try (ClosableIterator> logRecordIterator = scanner.iterator()) { while (logRecordIterator.hasNext()) { HoodieRecord record = logRecordIterator.next(); // convert Hoodie log record to Hadoop AvroWritable and buffer - GenericRecord rec = null; - rec = (GenericRecord) record.toIndexedRecord(getReaderSchema(), payloadProps).get().getData(); + GenericRecord rec = (GenericRecord) record.toIndexedRecord(getReaderSchema(), payloadProps).get().getData(); ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(rec, getHiveSchema(), isSupportTimestamp()); queue.insertRecord(aWritable); } From 33c8c4a7c91df17cfc044c03b85af36c51c679fe Mon Sep 17 00:00:00 2001 From: vamshi Date: Fri, 17 Jan 2025 00:57:27 -0800 Subject: [PATCH 07/15] Scan before iterating --- .../hudi/common/table/log/HoodieUnMergedLogRecordScanner.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java index 466fd6ba05597..22b0923de5256 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java @@ -122,6 +122,7 @@ public interface RecordDeletionCallback { * Returns an iterator over the log records. */ public ClosableIterator> iterator() { + scan(); return new ClosableIterator>() { private final Iterator logBlockIterator = getCurrentInstantLogBlocks().iterator(); private ClosableIterator recordIterator = null; From ba9a9efe2bce230d632059962eaf48da1e81ea4b Mon Sep 17 00:00:00 2001 From: vamshi Date: Fri, 17 Jan 2025 01:01:34 -0800 Subject: [PATCH 08/15] fix cs --- .../utils/SparkMetadataWriterUtils.java | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java index aa21fe435d300..fbd145a7caccd 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java @@ -156,7 +156,7 @@ public static HoodieData getExpressionIndexRecordsUsingBloomFilter Dataset bloomFilterRecords = dataset.select(columnToIndex, SparkMetadataWriterUtils.getExpressionIndexColumnNames()) // row.get(1) refers to partition path value and row.get(2) refers to file name. .groupByKey((MapFunction) row -> Pair.of(row.getString(1), row.getString(2)), Encoders.kryo(Pair.class)) - .flatMapGroups((FlatMapGroupsFunction) ((pair, iterator) -> { + .flatMapGroups((FlatMapGroupsFunction) ((pair, iterator) -> { String partition = pair.getLeft().toString(); String relativeFilePath = pair.getRight().toString(); String fileName = FSUtils.getFileName(relativeFilePath, partition); @@ -194,18 +194,20 @@ private static Iterator> getUnmergedLogFileRecords(List return scanner.iterator(); } -private static Iterator getBaseFileRecords(HoodieBaseFile baseFile, HoodieTableMetaClient metaClient, Schema readerSchema) { - HoodieRecordMerger recordMerger = - HoodieRecordUtils.createRecordMerger(metaClient.getBasePath().toString(), EngineType.SPARK, Collections.emptyList(), - metaClient.getTableConfig().getRecordMergeStrategyId()); - try { - HoodieFileReader baseFileReader = HoodieIOFactory.getIOFactory(metaClient.getStorage()).getReaderFactory(recordMerger.getRecordType()) - .getFileReader(getReaderConfigs(metaClient.getStorageConf()), baseFile.getStoragePath()); - return baseFileReader.getRecordIterator(readerSchema); - } catch (IOException e) { - throw new HoodieIOException("Error reading base file " + baseFile.getFileName(), e); + + private static Iterator getBaseFileRecords(HoodieBaseFile baseFile, HoodieTableMetaClient metaClient, Schema readerSchema) { + HoodieRecordMerger recordMerger = + HoodieRecordUtils.createRecordMerger(metaClient.getBasePath().toString(), EngineType.SPARK, Collections.emptyList(), + metaClient.getTableConfig().getRecordMergeStrategyId()); + try { + HoodieFileReader baseFileReader = HoodieIOFactory.getIOFactory(metaClient.getStorage()).getReaderFactory(recordMerger.getRecordType()) + .getFileReader(getReaderConfigs(metaClient.getStorageConf()), baseFile.getStoragePath()); + return baseFileReader.getRecordIterator(readerSchema); + } catch (IOException e) { + throw new HoodieIOException("Error reading base file " + baseFile.getFileName(), e); + } } -} + private static List toRows(Iterator records, Schema schema, HoodieWriteConfig dataWriteConfig, SQLContext sqlContext, String path) { StructType structType = AvroConversionUtils.convertAvroSchemaToStructType(schema); Function1 converterToRow = AvroConversionUtils.createConverterToRow(schema, structType); From 5ca19a8f0fb4d36d43630e476bd16d6421adf96a Mon Sep 17 00:00:00 2001 From: vamshi Date: Fri, 17 Jan 2025 01:04:34 -0800 Subject: [PATCH 09/15] fix cs --- .../org/apache/hudi/client/utils/SparkMetadataWriterUtils.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java index fbd145a7caccd..07f8d237e75be 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java @@ -193,7 +193,6 @@ private static Iterator> getUnmergedLogFileRecords(List scanner.scan(false); return scanner.iterator(); } - private static Iterator getBaseFileRecords(HoodieBaseFile baseFile, HoodieTableMetaClient metaClient, Schema readerSchema) { HoodieRecordMerger recordMerger = From cde5b65cb494a4ce2a43d1fde318f122082e653a Mon Sep 17 00:00:00 2001 From: vamshi Date: Fri, 17 Jan 2025 01:13:29 -0800 Subject: [PATCH 10/15] fix cs --- .../org/apache/hudi/client/utils/SparkMetadataWriterUtils.java | 2 +- .../src/main/java/org/apache/hudi/table/format/FormatUtils.java | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java index 07f8d237e75be..bc0885a13c17e 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java @@ -193,7 +193,7 @@ private static Iterator> getUnmergedLogFileRecords(List scanner.scan(false); return scanner.iterator(); } - + private static Iterator getBaseFileRecords(HoodieBaseFile baseFile, HoodieTableMetaClient metaClient, Schema readerSchema) { HoodieRecordMerger recordMerger = HoodieRecordUtils.createRecordMerger(metaClient.getBasePath().toString(), EngineType.SPARK, Collections.emptyList(), diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java index 94f9f7024609d..d0b9613378dab 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java @@ -53,7 +53,6 @@ import org.apache.avro.generic.IndexedRecord; import org.apache.flink.table.data.RowData; import org.apache.flink.types.RowKind; -import org.apache.flink.util.CloseableIterator; import org.apache.hadoop.conf.Configuration; import java.io.IOException; From f25f5bcb8a98d45fd319ea2d8fbca0dc47ea6f35 Mon Sep 17 00:00:00 2001 From: vamshi Date: Sun, 19 Jan 2025 18:03:49 -0800 Subject: [PATCH 11/15] remove queue --- .../log/HoodieUnMergedLogRecordScanner.java | 3 - .../apache/hudi/table/format/FormatUtils.java | 74 +++++-------------- 2 files changed, 17 insertions(+), 60 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java index 22b0923de5256..9dcaf5ac26e8d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java @@ -146,9 +146,6 @@ public boolean hasNext() { @Override public HoodieRecord next() { - if (!hasNext()) { - throw new NoSuchElementException(); - } return recordIterator.next(); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java index d0b9613378dab..d1a2c15097910 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java @@ -27,17 +27,12 @@ import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner; import org.apache.hudi.common.util.DefaultSizeEstimator; -import org.apache.hudi.common.util.Functions; import org.apache.hudi.common.util.HoodieRecordUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.common.util.collection.ExternalSpillableMap; -import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; -import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer; -import org.apache.hudi.common.util.queue.HoodieProducer; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; -import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.internal.schema.InternalSchema; @@ -45,7 +40,6 @@ import org.apache.hudi.storage.HoodieStorageUtils; import org.apache.hudi.table.format.mor.MergeOnReadInputSplit; import org.apache.hudi.util.FlinkWriteClients; -import org.apache.hudi.util.StreamerUtil; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -56,12 +50,10 @@ import org.apache.hadoop.conf.Configuration; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Locale; -import java.util.function.Function; import java.util.stream.Collectors; /** @@ -180,9 +172,6 @@ public static HoodieMergedLogRecordScanner logScanner( * Utility to read and buffer the records in the unMerged log record scanner. */ public static class BoundedMemoryRecords { - // Executor that runs the above producers in parallel - private final BoundedInMemoryExecutor, HoodieRecord, ?> executor; - // Iterator for the buffer consumer private final Iterator> iterator; @@ -198,63 +187,34 @@ public BoundedMemoryRecords( .collect(Collectors.toList()); HoodieRecordMerger merger = HoodieRecordUtils.createRecordMerger( split.getTablePath(), EngineType.FLINK, mergers, flinkConf.getString(FlinkOptions.RECORD_MERGER_STRATEGY_ID)); - HoodieUnMergedLogRecordScanner.Builder scannerBuilder = + HoodieUnMergedLogRecordScanner scanner = HoodieUnMergedLogRecordScanner.newBuilder() .withStorage(HoodieStorageUtils.getStorage( split.getTablePath(), HadoopFSUtils.getStorageConf(hadoopConf))) - .withBasePath(split.getTablePath()) - .withLogFilePaths(split.getLogPaths().get()) - .withReaderSchema(logSchema) - .withInternalSchema(internalSchema) - .withLatestInstantTime(split.getLatestCommit()) - .withReverseReader(false) - .withBufferSize( - flinkConf.getInteger(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(), - HoodieMemoryConfig.DEFAULT_MR_MAX_DFS_STREAM_BUFFER_SIZE)) - .withInstantRange(split.getInstantRange()) - .withRecordMerger(merger); - - this.executor = new BoundedInMemoryExecutor<>( - StreamerUtil.getMaxCompactionMemoryInBytes(flinkConf), - getParallelProducers(scannerBuilder), - Option.empty(), - Function.identity(), - new DefaultSizeEstimator<>(), - Functions.noop()); - this.iterator = this.executor.getRecordIterator(); + .withBasePath(split.getTablePath()) + .withLogFilePaths(split.getLogPaths().get()) + .withReaderSchema(logSchema) + .withInternalSchema(internalSchema) + .withLatestInstantTime(split.getLatestCommit()) + .withReverseReader(false) + .withBufferSize( + flinkConf.getInteger(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(), + HoodieMemoryConfig.DEFAULT_MR_MAX_DFS_STREAM_BUFFER_SIZE)) + .withInstantRange(split.getInstantRange()) + .withRecordMerger(merger) + .build(); - // Start reading and buffering - this.executor.startProducingAsync(); + this.iterator = scanner.iterator(); } public Iterator> getRecordsIterator() { return this.iterator; } - /** - * Setup log and parquet reading in parallel. Both write to central buffer. - */ - private List>> getParallelProducers( - HoodieUnMergedLogRecordScanner.Builder scannerBuilder - ) { - List>> producers = new ArrayList<>(); - producers.add(new FunctionBasedQueueProducer<>(queue -> { - HoodieUnMergedLogRecordScanner scanner = scannerBuilder.build(); - try (ClosableIterator> logRecordIterator = scanner.iterator()) { - while (logRecordIterator.hasNext()) { - queue.insertRecord(logRecordIterator.next()); - } - } catch (Exception e) { - throw new HoodieException("Error converting the record to avro", e); - } - return null; - })); - - return producers; - } - public void close() { - this.executor.shutdownNow(); + if (this.iterator instanceof ClosableIterator) { + ((ClosableIterator>) this.iterator).close(); + } } } From d02d371d69c39de9fc8d8cb010219f10f52b4b42 Mon Sep 17 00:00:00 2001 From: vamshi Date: Sun, 19 Jan 2025 19:27:48 -0800 Subject: [PATCH 12/15] save cs --- .../hudi/common/table/log/HoodieUnMergedLogRecordScanner.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java index 9dcaf5ac26e8d..8f5decdbf871a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java @@ -40,8 +40,8 @@ import org.apache.avro.Schema; import java.util.Iterator; + import java.util.List; -import java.util.NoSuchElementException; import java.util.stream.Collectors; /** From 072c7dde17e0ac5c468f00cb404f7277e7777dee Mon Sep 17 00:00:00 2001 From: vamshi Date: Sat, 25 Jan 2025 13:10:05 -0800 Subject: [PATCH 13/15] save changes --- .../log/AbstractHoodieLogRecordScanner.java | 29 ++++++++++++------- .../log/HoodieUnMergedLogRecordScanner.java | 15 +++++++--- 2 files changed, 29 insertions(+), 15 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java index da22a548b504a..8674329e27089 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java @@ -247,12 +247,12 @@ protected final void scanInternal(Option keySpecOpt, boolean skipProces if (enableOptimizedLogBlocksScan) { scanInternalV2(keySpecOpt, skipProcessingBlocks); } else { - scanInternalV1(keySpecOpt); + scanInternalV1(keySpecOpt, skipProcessingBlocks); } } } - private void scanInternalV1(Option keySpecOpt) { + private void scanInternalV1(Option keySpecOpt, boolean skipProcessingBlocks) { currentInstantLogBlocks = new ArrayDeque<>(); progress = 0.0f; @@ -372,7 +372,7 @@ private void scanInternalV1(Option keySpecOpt) { } } // merge the last read block when all the blocks are done reading - if (!currentInstantLogBlocks.isEmpty()) { + if (!currentInstantLogBlocks.isEmpty() && !skipProcessingBlocks) { // if there are no dups, we can take currentInstantLogBlocks as is. LOG.info("Merging the final data blocks"); processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keySpecOpt); @@ -631,20 +631,27 @@ private void processDataBlock(HoodieDataBlock dataBlock, Option keySpec try (ClosableIterator recordIterator = recordsIteratorSchemaPair.getLeft()) { while (recordIterator.hasNext()) { - HoodieRecord completedRecord = recordIterator.next() - .wrapIntoHoodieRecordPayloadWithParams(recordsIteratorSchemaPair.getRight(), - hoodieTableMetaClient.getTableConfig().getProps(), - recordKeyPartitionPathFieldPair, - this.withOperationField, - this.partitionNameOverrideOpt, - populateMetaFields, - Option.empty()); + HoodieRecord completedRecord = getWrapIntoHoodieRecordPayloadWithParams(recordIterator, recordsIteratorSchemaPair, recordKeyPartitionPathFieldPair); processNextRecord(completedRecord); totalLogRecords.incrementAndGet(); } } } + protected HoodieRecord getWrapIntoHoodieRecordPayloadWithParams(ClosableIterator recordIterator, + Pair, Schema> recordsIteratorSchemaPair, + Option> recordKeyPartitionPathFieldPair) throws IOException { + return recordIterator + .next() + .wrapIntoHoodieRecordPayloadWithParams(recordsIteratorSchemaPair.getRight(), + hoodieTableMetaClient.getTableConfig().getProps(), + recordKeyPartitionPathFieldPair, + this.withOperationField, + this.partitionNameOverrideOpt, + populateMetaFields, + Option.empty()); + } + /** * Process next record. * diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java index 8f5decdbf871a..272dfcc9768f4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java @@ -39,6 +39,7 @@ import org.apache.avro.Schema; +import java.io.IOException; import java.util.Iterator; import java.util.List; @@ -119,12 +120,13 @@ public interface RecordDeletionCallback { } /** - * Returns an iterator over the log records. + * Returns a nested iterator over the log records. */ public ClosableIterator> iterator() { - scan(); + scan(true); return new ClosableIterator>() { private final Iterator logBlockIterator = getCurrentInstantLogBlocks().iterator(); + Pair, Schema> recordsIteratorSchemaPair = null; private ClosableIterator recordIterator = null; @Override @@ -134,7 +136,7 @@ public boolean hasNext() { HoodieLogBlock logBlock = logBlockIterator.next(); if (logBlock instanceof HoodieDataBlock) { HoodieDataBlock dataBlock = (HoodieDataBlock) logBlock; - Pair, Schema> recordsIteratorSchemaPair = getRecordsIterator(dataBlock, Option.empty()); + recordsIteratorSchemaPair = getRecordsIterator(dataBlock, Option.empty()); recordIterator = recordsIteratorSchemaPair.getLeft(); } } @@ -146,7 +148,12 @@ public boolean hasNext() { @Override public HoodieRecord next() { - return recordIterator.next(); + try { + // fetch next record and wrap into hoodie record + return getWrapIntoHoodieRecordPayloadWithParams(recordIterator, recordsIteratorSchemaPair, Option.empty()); + } catch (IOException e) { + throw new HoodieException("Error while wrapping into hoodie record", e); + } } @Override From 3df692e8ee64e023d73687b73fd8144888604719 Mon Sep 17 00:00:00 2001 From: vamshi Date: Mon, 27 Jan 2025 23:58:54 -0800 Subject: [PATCH 14/15] Replace BoundedMemoryRecords with iterator --- .../apache/hudi/table/format/FormatUtils.java | 72 +++++++------------ .../format/mor/MergeOnReadInputFormat.java | 63 ++++++++-------- 2 files changed, 57 insertions(+), 78 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java index d1a2c15097910..d050024edcb89 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java @@ -168,54 +168,34 @@ public static HoodieMergedLogRecordScanner logScanner( .build(); } - /** - * Utility to read and buffer the records in the unMerged log record scanner. - */ - public static class BoundedMemoryRecords { - // Iterator for the buffer consumer - private final Iterator> iterator; + public static HoodieUnMergedLogRecordScanner getUnMergedLogRecordScanner(MergeOnReadInputSplit split, + Schema logSchema, + InternalSchema internalSchema, + Configuration hadoopConf, + org.apache.flink.configuration.Configuration flinkConf) { + List mergers = Arrays.stream(flinkConf.getString(FlinkOptions.RECORD_MERGER_IMPLS).split(",")) + .map(String::trim) + .distinct() + .collect(Collectors.toList()); - public BoundedMemoryRecords( - MergeOnReadInputSplit split, - Schema logSchema, - InternalSchema internalSchema, - Configuration hadoopConf, - org.apache.flink.configuration.Configuration flinkConf) { - List mergers = Arrays.stream(flinkConf.getString(FlinkOptions.RECORD_MERGER_IMPLS).split(",")) - .map(String::trim) - .distinct() - .collect(Collectors.toList()); - HoodieRecordMerger merger = HoodieRecordUtils.createRecordMerger( - split.getTablePath(), EngineType.FLINK, mergers, flinkConf.getString(FlinkOptions.RECORD_MERGER_STRATEGY_ID)); - HoodieUnMergedLogRecordScanner scanner = - HoodieUnMergedLogRecordScanner.newBuilder() - .withStorage(HoodieStorageUtils.getStorage( - split.getTablePath(), HadoopFSUtils.getStorageConf(hadoopConf))) - .withBasePath(split.getTablePath()) - .withLogFilePaths(split.getLogPaths().get()) - .withReaderSchema(logSchema) - .withInternalSchema(internalSchema) - .withLatestInstantTime(split.getLatestCommit()) - .withReverseReader(false) - .withBufferSize( - flinkConf.getInteger(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(), - HoodieMemoryConfig.DEFAULT_MR_MAX_DFS_STREAM_BUFFER_SIZE)) - .withInstantRange(split.getInstantRange()) - .withRecordMerger(merger) - .build(); + HoodieRecordMerger merger = HoodieRecordUtils.createRecordMerger( + split.getTablePath(), EngineType.FLINK, mergers, flinkConf.getString(FlinkOptions.RECORD_MERGER_STRATEGY_ID)); - this.iterator = scanner.iterator(); - } - - public Iterator> getRecordsIterator() { - return this.iterator; - } - - public void close() { - if (this.iterator instanceof ClosableIterator) { - ((ClosableIterator>) this.iterator).close(); - } - } + return HoodieUnMergedLogRecordScanner.newBuilder() + .withStorage(HoodieStorageUtils.getStorage( + split.getTablePath(), HadoopFSUtils.getStorageConf(hadoopConf))) + .withBasePath(split.getTablePath()) + .withLogFilePaths(split.getLogPaths().get()) + .withReaderSchema(logSchema) + .withInternalSchema(internalSchema) + .withLatestInstantTime(split.getLatestCommit()) + .withReverseReader(false) + .withBufferSize( + flinkConf.getInteger(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(), + HoodieMemoryConfig.DEFAULT_MR_MAX_DFS_STREAM_BUFFER_SIZE)) + .withInstantRange(split.getInstantRange()) + .withRecordMerger(merger) + .build(); } public static HoodieMergedLogRecordScanner logScanner( diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index 53d97bd2440d8..0371237bff9fd 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -432,42 +432,41 @@ private ClosableIterator getUnMergedLogFileIterator(MergeOnReadInputSpl final GenericRecordBuilder recordBuilder = new GenericRecordBuilder(requiredSchema); final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType(), conf.getBoolean(FlinkOptions.READ_UTC_TIMEZONE)); - final FormatUtils.BoundedMemoryRecords records = new FormatUtils.BoundedMemoryRecords(split, tableSchema, internalSchemaManager.getQuerySchema(), hadoopConf, conf); - final Iterator> recordsIterator = records.getRecordsIterator(); - - return new ClosableIterator() { - private RowData currentRecord; - - @Override - public boolean hasNext() { - while (recordsIterator.hasNext()) { - final HoodieAvroRecord hoodieRecord = (HoodieAvroRecord) recordsIterator.next(); - Option curAvroRecord = getInsertVal(hoodieRecord, tableSchema); - if (curAvroRecord.isPresent()) { - final IndexedRecord avroRecord = curAvroRecord.get(); - GenericRecord requiredAvroRecord = buildAvroRecordBySchema( - avroRecord, - requiredSchema, - requiredPos, - recordBuilder); - currentRecord = (RowData) avroToRowDataConverter.convert(requiredAvroRecord); - FormatUtils.setRowKind(currentRecord, avroRecord, tableState.getOperationPos()); - return true; + try (ClosableIterator> recordsIterator = FormatUtils.getUnMergedLogRecordScanner(split, tableSchema, internalSchemaManager.getQuerySchema(), hadoopConf, conf).iterator()) { + return new ClosableIterator() { + private RowData currentRecord; + + @Override + public boolean hasNext() { + while (recordsIterator.hasNext()) { + final HoodieAvroRecord hoodieRecord = (HoodieAvroRecord) recordsIterator.next(); + Option curAvroRecord = getInsertVal(hoodieRecord, tableSchema); + if (curAvroRecord.isPresent()) { + final IndexedRecord avroRecord = curAvroRecord.get(); + GenericRecord requiredAvroRecord = buildAvroRecordBySchema( + avroRecord, + requiredSchema, + requiredPos, + recordBuilder); + currentRecord = (RowData) avroToRowDataConverter.convert(requiredAvroRecord); + FormatUtils.setRowKind(currentRecord, avroRecord, tableState.getOperationPos()); + return true; + } } + return false; } - return false; - } - @Override - public RowData next() { - return currentRecord; - } + @Override + public RowData next() { + return currentRecord; + } - @Override - public void close() { - records.close(); - } - }; + @Override + public void close() { + recordsIterator.close(); + } + }; + } } protected static Option getInsertVal(HoodieAvroRecord hoodieRecord, Schema tableSchema) { From 28c2982b3fa1cfce1cc0dc8c2f64c1610077ee87 Mon Sep 17 00:00:00 2001 From: vamshi Date: Mon, 3 Feb 2025 21:49:37 -0800 Subject: [PATCH 15/15] avoid passing merge --- .../org/apache/hudi/table/format/FormatUtils.java | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java index d050024edcb89..ea8f760ee8366 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java @@ -20,16 +20,11 @@ import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.HoodieMemoryConfig; -import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.model.HoodieOperation; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordMerger; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner; import org.apache.hudi.common.util.DefaultSizeEstimator; -import org.apache.hudi.common.util.HoodieRecordUtils; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; @@ -54,7 +49,6 @@ import java.util.Iterator; import java.util.List; import java.util.Locale; -import java.util.stream.Collectors; /** * Utilities for format. @@ -173,14 +167,6 @@ public static HoodieUnMergedLogRecordScanner getUnMergedLogRecordScanner(MergeOn InternalSchema internalSchema, Configuration hadoopConf, org.apache.flink.configuration.Configuration flinkConf) { - List mergers = Arrays.stream(flinkConf.getString(FlinkOptions.RECORD_MERGER_IMPLS).split(",")) - .map(String::trim) - .distinct() - .collect(Collectors.toList()); - - HoodieRecordMerger merger = HoodieRecordUtils.createRecordMerger( - split.getTablePath(), EngineType.FLINK, mergers, flinkConf.getString(FlinkOptions.RECORD_MERGER_STRATEGY_ID)); - return HoodieUnMergedLogRecordScanner.newBuilder() .withStorage(HoodieStorageUtils.getStorage( split.getTablePath(), HadoopFSUtils.getStorageConf(hadoopConf))) @@ -194,7 +180,6 @@ public static HoodieUnMergedLogRecordScanner getUnMergedLogRecordScanner(MergeOn flinkConf.getInteger(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(), HoodieMemoryConfig.DEFAULT_MR_MAX_DFS_STREAM_BUFFER_SIZE)) .withInstantRange(split.getInstantRange()) - .withRecordMerger(merger) .build(); }