-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-8704] Instant time to completion time state upgrade migration for Flink StreamReadMonitoringFunction #12539
base: master
Are you sure you want to change the base?
Conversation
...nk/src/main/java/org/apache/hudi/state/upgrade/source/StreamReadMonitoringStateUpgrader.java
Outdated
Show resolved
Hide resolved
Make versions zero based
@danny0405 Apologies that this took so long, it is ready for review now. |
getClass().getSimpleName(), issuedInstant, issuedOffset, conf.get(FlinkOptions.TABLE_NAME), path); | ||
} | ||
this.issuedInstant = retrievedStates.get(0); | ||
this.issuedOffset = retrievedStates.get(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I expected this be a simple change, check the issuedOffset
against the timeline for the first time of read, if the issuedOffset
is an instant time, then just switch to it's completion time or modification time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies for the misunderstanding, i do not quite understand what you mean, I was under the impression that this would be a backward compatibility improvement when migrating from a Hudi 0.x table to a Hudi 1.0 table.
From what i understand:
version | attribute | available in state? |
---|---|---|
0.x | issuedInstant | true |
0.x | issuedOffset | false |
1.x | issuedInstant | true |
1.x | issuedOffset | true |
Hence, i was under the impression that we needed a way to add a issuedOffset
when upgrading from 0.x
, where issuedOffset
will be null in the state backend during first read after upgrade.
Change Logs
Instant time to completion time state upgrade migration for Flink StreamReadMonitoringFunction.
When performing a version upgrade, users are required to stop their job with savepoint, then restart their job with a newer Hudi-Flink-Bundle. To ensure forward compatibility, we added a state upgrader interface for StreamReadMonitoring function to allow forward compatibility for readers.
StreamReadMonitoringStateUpgrader
to handle issuedInstant (requestInstant) -> issuedInstant + issuedOffset (completionTime) upgradeStreamReadMonitoringStateUpgrader
to avoid confusion.Impact
None
Risk level (write none, low medium or high below)
None
Documentation Update
Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".
ticket number here and follow the instruction to make
changes to the website.
Contributor's checklist