-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-8163] Refactor UnMergedLogHandler with iterators #12608
base: master
Are you sure you want to change the base?
Conversation
.build(); | ||
scanner.scan(false); | ||
Iterator<HoodieRecord<?>> recordIterator = scanner.iterator(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible we return an iterator too for method getUnmergedLogFileRecords
to the list add/get can be simplified.
@@ -617,6 +617,8 @@ && compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), GREATER_THA | |||
/** | |||
* Iterate over the GenericRecord in the block, read the hoodie key and partition path and call subclass processors to | |||
* handle it. | |||
* TODO: | |||
* 1. what is the purpose of this method? should the HoodieRecord be added to a queue and consumed by an iterator? | |||
*/ | |||
private void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec> keySpecOpt) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be the main entry for handling the log blocks, each kind of log block would trigger invocation of specific row-level handling methods.
} | ||
|
||
public Builder withRecordDeletionCallback(RecordDeletionCallback recordDeletionCallback) { | ||
this.recordDeletionCallback = recordDeletionCallback; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The recordDeletionCallback
is only consumer for deleted reords from the delete avro block, we may need another iterator impl specifically for it.
while (recordIterator.hasNext()) { | ||
try { | ||
records.add(recordIterator.next()); | ||
} catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code is exactly the same if we collect all the records in the collection again , maybe we do not remove those two callbacks and just add a new iterator impl for the case that really need it: the Flink streaming reader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i kept the callbacks and added iterator for flink usage.
This reverts commit 6354f79.
* Returns an iterator over the log records. | ||
*/ | ||
public Iterator<HoodieRecord<?>> iterator() { | ||
List<HoodieRecord<?>> records = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we try to eliminate the list and return a nested iterator if possible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, replaced with nested ClosableIterator.
.build(); | ||
// Scan all the delta-log files, filling in the queue | ||
scanner.scan(); | ||
HoodieUnMergedLogRecordScanner scanner = scannerBuilder.build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice refactoring, can we also fix the usage of HoodieUnMergedLogRecordScanner
for Flink MergeOnReadInputFormat#getUnMergedLogFileIterator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adjusted the usage within BoundedMemoryRecords in MergeOnReadInputFormat#getUnMergedLogFileIterator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The BoundedMemoryRecords
can be eliminated.
|
||
@Override | ||
public HoodieRecord<?> next() { | ||
if (!hasNext()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we remove this check because hasNext
should always be invoked before next
.
HoodieUnMergedLogRecordScanner scanner = scannerBuilder.build(); | ||
try (ClosableIterator<HoodieRecord<?>> logRecordIterator = scanner.iterator()) { | ||
while (logRecordIterator.hasNext()) { | ||
queue.insertRecord(logRecordIterator.next()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove the whole producers and queue I think.
@@ -181,9 +172,6 @@ public static HoodieMergedLogRecordScanner logScanner( | |||
* Utility to read and buffer the records in the unMerged log record scanner. | |||
*/ | |||
public static class BoundedMemoryRecords { | |||
// Executor that runs the above producers in parallel |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BoundedMemoryRecords
can be eliminated too I guess.
Yes can be removed, theres a post processing that drains the queue, will
fix.
…On Mon, Jan 20, 2025 at 8:06 PM Danny Chan ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
<#12608 (comment)>:
> @@ -181,9 +172,6 @@ public static HoodieMergedLogRecordScanner logScanner(
* Utility to read and buffer the records in the unMerged log record scanner.
*/
public static class BoundedMemoryRecords {
- // Executor that runs the above producers in parallel
BoundedMemoryRecords can be eliminated too I guess.
—
Reply to this email directly, view it on GitHub
<#12608 (review)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/A6NDCQI5UHSGFKFYZWFAKYT2LXBU7AVCNFSM6AAAAABU4DKOA2VHI2DSMVQWIX3LMV43YUDVNRWFEZLROVSXG5CSMV3GSZLXHMZDKNRTGQZTGMRXHE>
.
You are receiving this because you authored the thread.Message ID:
***@***.***>
|
flinkConf.getInteger(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(), | ||
HoodieMemoryConfig.DEFAULT_MR_MAX_DFS_STREAM_BUFFER_SIZE)) | ||
.withInstantRange(split.getInstantRange()) | ||
.withRecordMerger(merger) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure whether the merger is necessary because for HoodieUnMergedLogRecordScanner
, the merge never happens.
@@ -372,7 +372,7 @@ private void scanInternalV1(Option<KeySpec> keySpecOpt) { | |||
} | |||
} | |||
// merge the last read block when all the blocks are done reading | |||
if (!currentInstantLogBlocks.isEmpty()) { | |||
if (!currentInstantLogBlocks.isEmpty() && !skipProcessingBlocks) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this change related with the fix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, without this whatever log instants (currentInstantLogBlocks
) that are created during scan() are processed immediately in processQueuedBlocksForInstant
.
The v2 scan exposes a interface to skip this processing, but I didn't want to migrate from v1 to v2 as part of this change, hence added this logic to skip in v1.
Change Logs
Refactor code to deprecate usage of callbacks and replace with iterators
Impact
Describe any public API or user-facing feature change or any performance impact.
Improves performance by using iterator to produce records on-demand.
Risk level (write none, low medium or high below)
low.
Contributor's checklist