Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8163] Refactor UnMergedLogHandler with iterators #12608

Open
wants to merge 15 commits into
base: master
Choose a base branch
from

Conversation

vamshipasunuru
Copy link

@vamshipasunuru vamshipasunuru commented Jan 9, 2025

Change Logs

Refactor code to deprecate usage of callbacks and replace with iterators

Impact

Describe any public API or user-facing feature change or any performance impact.

Improves performance by using iterator to produce records on-demand.

Risk level (write none, low medium or high below)

low.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable, TestHoodieRealtimeRecordReader#testUnMergedReader contains the test the covers newly added iterator
  • CI passed

@vamshipasunuru vamshipasunuru marked this pull request as draft January 9, 2025 12:57
@vamshipasunuru
Copy link
Author

@danny0405

@github-actions github-actions bot added the size:M PR with lines of changes in (100, 300] label Jan 9, 2025
.build();
scanner.scan(false);
Iterator<HoodieRecord<?>> recordIterator = scanner.iterator();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible we return an iterator too for method getUnmergedLogFileRecords to the list add/get can be simplified.

@@ -617,6 +617,8 @@ && compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), GREATER_THA
/**
* Iterate over the GenericRecord in the block, read the hoodie key and partition path and call subclass processors to
* handle it.
* TODO:
* 1. what is the purpose of this method? should the HoodieRecord be added to a queue and consumed by an iterator?
*/
private void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec> keySpecOpt) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be the main entry for handling the log blocks, each kind of log block would trigger invocation of specific row-level handling methods.

}

public Builder withRecordDeletionCallback(RecordDeletionCallback recordDeletionCallback) {
this.recordDeletionCallback = recordDeletionCallback;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The recordDeletionCallback is only consumer for deleted reords from the delete avro block, we may need another iterator impl specifically for it.

while (recordIterator.hasNext()) {
try {
records.add(recordIterator.next());
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code is exactly the same if we collect all the records in the collection again , maybe we do not remove those two callbacks and just add a new iterator impl for the case that really need it: the Flink streaming reader.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i kept the callbacks and added iterator for flink usage.

@vamshipasunuru vamshipasunuru marked this pull request as ready for review January 15, 2025 17:17
* Returns an iterator over the log records.
*/
public Iterator<HoodieRecord<?>> iterator() {
List<HoodieRecord<?>> records = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we try to eliminate the list and return a nested iterator if possible?

Copy link
Author

@vamshipasunuru vamshipasunuru Jan 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, replaced with nested ClosableIterator.

.build();
// Scan all the delta-log files, filling in the queue
scanner.scan();
HoodieUnMergedLogRecordScanner scanner = scannerBuilder.build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice refactoring, can we also fix the usage of HoodieUnMergedLogRecordScanner for Flink MergeOnReadInputFormat#getUnMergedLogFileIterator

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adjusted the usage within BoundedMemoryRecords in MergeOnReadInputFormat#getUnMergedLogFileIterator

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The BoundedMemoryRecords can be eliminated.

@vamshipasunuru vamshipasunuru changed the title [Early feedback] [HUDI-8163] Refactor UnMergedLogHandler with iterators [HUDI-8163] Refactor UnMergedLogHandler with iterators Jan 17, 2025
@vamshipasunuru vamshipasunuru marked this pull request as draft January 17, 2025 08:03

@Override
public HoodieRecord<?> next() {
if (!hasNext()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we remove this check because hasNext should always be invoked before next.

HoodieUnMergedLogRecordScanner scanner = scannerBuilder.build();
try (ClosableIterator<HoodieRecord<?>> logRecordIterator = scanner.iterator()) {
while (logRecordIterator.hasNext()) {
queue.insertRecord(logRecordIterator.next());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove the whole producers and queue I think.

@@ -181,9 +172,6 @@ public static HoodieMergedLogRecordScanner logScanner(
* Utility to read and buffer the records in the unMerged log record scanner.
*/
public static class BoundedMemoryRecords {
// Executor that runs the above producers in parallel
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BoundedMemoryRecords can be eliminated too I guess.

@vamshipasunuru
Copy link
Author

vamshipasunuru commented Jan 21, 2025 via email

@github-actions github-actions bot added size:L PR with lines of changes in (300, 1000] and removed size:M PR with lines of changes in (100, 300] labels Jan 28, 2025
flinkConf.getInteger(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(),
HoodieMemoryConfig.DEFAULT_MR_MAX_DFS_STREAM_BUFFER_SIZE))
.withInstantRange(split.getInstantRange())
.withRecordMerger(merger)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure whether the merger is necessary because for HoodieUnMergedLogRecordScanner, the merge never happens.

@@ -372,7 +372,7 @@ private void scanInternalV1(Option<KeySpec> keySpecOpt) {
}
}
// merge the last read block when all the blocks are done reading
if (!currentInstantLogBlocks.isEmpty()) {
if (!currentInstantLogBlocks.isEmpty() && !skipProcessingBlocks) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change related with the fix?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, without this whatever log instants (currentInstantLogBlocks) that are created during scan() are processed immediately in processQueuedBlocksForInstant.
The v2 scan exposes a interface to skip this processing, but I didn't want to migrate from v1 to v2 as part of this change, hence added this logic to skip in v1.

@hudi-bot
Copy link

hudi-bot commented Feb 4, 2025

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@vamshipasunuru vamshipasunuru marked this pull request as ready for review February 4, 2025 19:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size:L PR with lines of changes in (300, 1000]
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants