Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8726] cover USE_TRANSITION_TIME for ckp conversion #12687

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,28 @@
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;

import static org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1;
import static org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1.STREAMER_CHECKPOINT_RESET_KEY_V1;
import static org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2;
import static org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_RESET_KEY_V2;
import static org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.USE_TRANSITION_TIME;

public class CheckpointUtils {

public static final Set<String> DATASOURCES_MUST_USE_CKP_V1 = new HashSet<>(Arrays.asList(
"org.apache.hudi.utilities.sources.S3EventsHoodieIncrSource",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we do S3EventsHoodieIncrSource.class.getClassName just to ensure we are resilient to moving classes to a diff package.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's not ideal because:
this general purpose utility method "shouldTargetCheckpointV2" is called everywhere, where not all the call site have access to class definition of them. It requires non trivial amount of code refactoring to move code to some common place where everyone can access.

As compensation, I added unit test validation for S3EventsHoodieIncrSource.class.getClassName and Gcs. This can also guard against class moving to other packages

"org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource"
));
public static Checkpoint getCheckpoint(HoodieCommitMetadata commitMetadata) {
if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_KEY_V2))
|| !StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_RESET_KEY_V2))) {
Expand All @@ -49,14 +58,15 @@ public static Checkpoint getCheckpoint(HoodieCommitMetadata commitMetadata) {
throw new HoodieException("Checkpoint is not found in the commit metadata: " + commitMetadata.getExtraMetadata());
}

public static boolean targetCheckpointV2(int writeTableVersion) {
return writeTableVersion >= HoodieTableVersion.EIGHT.versionCode();
public static boolean targetCheckpointV2(int writeTableVersion, String sourceClassName) {
return writeTableVersion >= HoodieTableVersion.EIGHT.versionCode()
&& !DATASOURCES_MUST_USE_CKP_V1.contains(sourceClassName);
}

// TODO(yihua): for checkpoint translation, handle cases where the checkpoint is not exactly the
// instant or completion time
public static StreamerCheckpointV2 convertToCheckpointV2ForCommitTime(
Checkpoint checkpoint, HoodieTableMetaClient metaClient) {
Checkpoint checkpoint, HoodieTableMetaClient metaClient, TimelineUtils.HollowCommitHandling handlingMode) {
if (checkpoint.checkpointKey.equals(HoodieTimeline.INIT_INSTANT_TS)) {
return new StreamerCheckpointV2(HoodieTimeline.INIT_INSTANT_TS);
}
Expand All @@ -65,7 +75,9 @@ public static StreamerCheckpointV2 convertToCheckpointV2ForCommitTime(
}
if (checkpoint instanceof StreamerCheckpointV1) {
// V1 -> V2 translation
// TODO(yihua): handle USE_TRANSITION_TIME in V1
if (handlingMode.equals(USE_TRANSITION_TIME)) {
return new StreamerCheckpointV2(checkpoint);
}
// TODO(yihua): handle different ordering between requested and completion time
// TODO(yihua): handle timeline history / archived timeline
String instantTime = checkpoint.getCheckpointKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,14 @@

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

import java.util.stream.Stream;

import static org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.BLOCK;
import static org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.FAIL;
import static org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.USE_TRANSITION_TIME;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -95,7 +100,7 @@ public void testConvertToCheckpointV2ForCommitTime() {
when(activeTimeline.getInstantsAsStream()).thenReturn(Stream.of(instant));

Checkpoint checkpoint = new StreamerCheckpointV1(instantTime);
StreamerCheckpointV2 translatedCheckpoint = CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient);
StreamerCheckpointV2 translatedCheckpoint = CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient, FAIL);

assertEquals(completionTime, translatedCheckpoint.getCheckpointKey());
}
Expand Down Expand Up @@ -126,7 +131,7 @@ public void testConvertToCheckpointV2ThrowsExceptionForMissingCompletionTime() {
Checkpoint checkpoint = new StreamerCheckpointV1(instantTime);

Exception exception = assertThrows(UnsupportedOperationException.class,
() -> CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient));
() -> CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient, BLOCK));
assertTrue(exception.getMessage().contains("Unable to find completion time"));
}

Expand Down Expand Up @@ -155,7 +160,7 @@ public void testConvertToCheckpointV2ForCommitTimeEmptyTimeline() {

Checkpoint checkpoint = new StreamerCheckpointV1(instantTime);
Exception exception = assertThrows(UnsupportedOperationException.class,
() -> CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient));
() -> CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient, FAIL));
assertTrue(exception.getMessage().contains("Unable to find completion time"));
}

Expand All @@ -168,7 +173,39 @@ public void testConvertCheckpointWithInitTimestamp() {
assertEquals(HoodieTimeline.INIT_INSTANT_TS, translated.getCheckpointKey());

checkpoint = new StreamerCheckpointV2(instantTime);
translated = CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient);
translated = CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient, BLOCK);
assertEquals(HoodieTimeline.INIT_INSTANT_TS, translated.getCheckpointKey());
}

@Test
public void testConvertCheckpointWithUseTransitionTime() {
String instantTime = "20231127010101";
String completionTime = "20231127020102";

// Mock active timeline
HoodieInstant instant = new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", instantTime, completionTime, InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR);
when(activeTimeline.getInstantsAsStream()).thenReturn(Stream.of(instant));

Checkpoint checkpoint = new StreamerCheckpointV1(completionTime);
StreamerCheckpointV2 translatedCheckpoint = CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient, USE_TRANSITION_TIME);

assertEquals(completionTime, translatedCheckpoint.getCheckpointKey());
}

@ParameterizedTest
@CsvSource({
// version, sourceClassName, expectedResult
// Version >= 8 with allowed sources should return true
"8, org.apache.hudi.utilities.sources.TestSource, true",
"9, org.apache.hudi.utilities.sources.AnotherSource, true",
// Version < 8 should return false regardless of source
"7, org.apache.hudi.utilities.sources.TestSource, false",
"6, org.apache.hudi.utilities.sources.AnotherSource, false",
// Disallowed sources should return false even with version >= 8
"8, org.apache.hudi.utilities.sources.S3EventsHoodieIncrSource, false",
"8, org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource, false"
})
public void testTargetCheckpointV2(int version, String sourceClassName, boolean expected) {
assertEquals(expected, CheckpointUtils.targetCheckpointV2(version, sourceClassName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ class HoodieStreamSourceV1(sqlContext: SQLContext,
private def translateCheckpoint(commitTime: String): String = {
if (writeTableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
CheckpointUtils.convertToCheckpointV2ForCommitTime(
new StreamerCheckpointV1(commitTime), metaClient).getCheckpointKey
new StreamerCheckpointV1(commitTime), metaClient, hollowCommitHandling).getCheckpointKey
} else {
commitTime
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ protected Option<Checkpoint> translateCheckpoint(Option<Checkpoint> lastCheckpoi

@Override
public Pair<Option<Dataset<Row>>, Checkpoint> fetchNextBatch(Option<Checkpoint> lastCheckpoint, long sourceLimit) {
if (CheckpointUtils.targetCheckpointV2(writeTableVersion)) {
if (CheckpointUtils.targetCheckpointV2(writeTableVersion, getClass().getName())) {
return fetchNextBatchBasedOnCompletionTime(lastCheckpoint, sourceLimit);
} else {
return fetchNextBatchBasedOnRequestedTime(lastCheckpoint, sourceLimit);
Expand All @@ -215,7 +215,7 @@ private Pair<Option<Dataset<Row>>, Checkpoint> fetchNextBatchBasedOnCompletionTi
IncrementalQueryAnalyzer analyzer = IncrSourceHelper.getIncrementalQueryAnalyzer(
sparkContext, srcPath, lastCheckpoint, missingCheckpointStrategy,
getIntWithAltKeys(props, HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH),
getLatestSourceProfile());
getLatestSourceProfile(), getHollowCommitHandleMode(props));
QueryContext queryContext = analyzer.analyze();
Option<InstantRange> instantRange = queryContext.getInstantRange();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ protected Option<Checkpoint> translateCheckpoint(Option<Checkpoint> lastCheckpoi
if (lastCheckpoint.isEmpty()) {
return Option.empty();
}
if (CheckpointUtils.targetCheckpointV2(writeTableVersion)) {
if (CheckpointUtils.targetCheckpointV2(writeTableVersion, getClass().getName())) {
// V2 -> V2
if (lastCheckpoint.get() instanceof StreamerCheckpointV2) {
return lastCheckpoint;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hudi.common.table.read.IncrementalQueryAnalyzer;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
Expand Down Expand Up @@ -184,7 +185,7 @@ public static IncrementalQueryAnalyzer getIncrementalQueryAnalyzer(
Option<Checkpoint> lastCheckpoint,
MissingCheckpointStrategy missingCheckpointStrategy,
int numInstantsFromConfig,
Option<SourceProfile<Integer>> latestSourceProfile) {
Option<SourceProfile<Integer>> latestSourceProfile, TimelineUtils.HollowCommitHandling handlingMode) {
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
.setConf(HadoopFSUtils.getStorageConfWithCopy(jssc.hadoopConfiguration()))
.setBasePath(srcPath)
Expand All @@ -197,7 +198,7 @@ public static IncrementalQueryAnalyzer getIncrementalQueryAnalyzer(
if (lastCheckpoint.isPresent() && !lastCheckpoint.get().getCheckpointKey().isEmpty()) {
// Translate checkpoint
StreamerCheckpointV2 lastStreamerCheckpointV2 = CheckpointUtils.convertToCheckpointV2ForCommitTime(
lastCheckpoint.get(), metaClient);
lastCheckpoint.get(), metaClient, handlingMode);
startCompletionTime = lastStreamerCheckpointV2.getCheckpointKey();
rangeType = RangeType.OPEN_CLOSED;
} else if (missingCheckpointStrategy != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public static Option<Checkpoint> getCheckpointToResumeFrom(Option<HoodieTimeline
LOG.debug("Checkpoint from config: " + streamerConfig.checkpoint);
if (!checkpoint.isPresent() && streamerConfig.checkpoint != null) {
int writeTableVersion = ConfigUtils.getIntWithAltKeys(props, HoodieWriteConfig.WRITE_TABLE_VERSION);
checkpoint = Option.of(CheckpointUtils.targetCheckpointV2(writeTableVersion)
checkpoint = Option.of(CheckpointUtils.targetCheckpointV2(writeTableVersion, streamerConfig.sourceClassName)
? new StreamerCheckpointV2(streamerConfig.checkpoint) : new StreamerCheckpointV1(streamerConfig.checkpoint));
}
return checkpoint;
Expand Down Expand Up @@ -106,7 +106,7 @@ static Option<Checkpoint> getCheckpointToResumeString(Option<HoodieTimeline> com
resumeCheckpoint = Option.empty();
} else if (streamerConfig.checkpoint != null && (StringUtils.isNullOrEmpty(checkpointFromCommit.getCheckpointResetKey())
|| !streamerConfig.checkpoint.equals(checkpointFromCommit.getCheckpointResetKey()))) {
resumeCheckpoint = Option.of(CheckpointUtils.targetCheckpointV2(writeTableVersion)
resumeCheckpoint = Option.of(CheckpointUtils.targetCheckpointV2(writeTableVersion, streamerConfig.sourceClassName)
? new StreamerCheckpointV2(streamerConfig.checkpoint) : new StreamerCheckpointV1(streamerConfig.checkpoint));
} else if (!StringUtils.isNullOrEmpty(checkpointFromCommit.getCheckpointKey())) {
//if previous checkpoint is an empty string, skip resume use Option.empty()
Expand All @@ -124,7 +124,7 @@ static Option<Checkpoint> getCheckpointToResumeString(Option<HoodieTimeline> com
}
} else if (streamerConfig.checkpoint != null) {
// getLatestCommitMetadataWithValidCheckpointInfo(commitTimelineOpt.get()) will never return a commit metadata w/o any checkpoint key set.
resumeCheckpoint = Option.of(CheckpointUtils.targetCheckpointV2(writeTableVersion)
resumeCheckpoint = Option.of(CheckpointUtils.targetCheckpointV2(writeTableVersion, streamerConfig.sourceClassName)
? new StreamerCheckpointV2(streamerConfig.checkpoint) : new StreamerCheckpointV1(streamerConfig.checkpoint));
}
}
Expand Down
Loading