Skip to content

Commit

Permalink
cover USE_TRANSITION_TIME for ckp conversion
Browse files Browse the repository at this point in the history
  • Loading branch information
Davis-Zhang-Onehouse committed Jan 23, 2025
1 parent 3992cbd commit af123cc
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,28 @@
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;

import static org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1;
import static org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1.STREAMER_CHECKPOINT_RESET_KEY_V1;
import static org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2;
import static org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_RESET_KEY_V2;
import static org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.USE_TRANSITION_TIME;

public class CheckpointUtils {

public static final Set<String> DATASOURCES_MUST_USE_CKP_V1 = new HashSet<>(Arrays.asList(
"org.apache.hudi.utilities.sources.S3EventsHoodieIncrSource",
"org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource"
));
public static Checkpoint getCheckpoint(HoodieCommitMetadata commitMetadata) {
if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_KEY_V2))
|| !StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_RESET_KEY_V2))) {
Expand All @@ -49,14 +58,15 @@ public static Checkpoint getCheckpoint(HoodieCommitMetadata commitMetadata) {
throw new HoodieException("Checkpoint is not found in the commit metadata: " + commitMetadata.getExtraMetadata());
}

public static boolean targetCheckpointV2(int writeTableVersion) {
return writeTableVersion >= HoodieTableVersion.EIGHT.versionCode();
public static boolean targetCheckpointV2(int writeTableVersion, String sourceClassName) {
return writeTableVersion >= HoodieTableVersion.EIGHT.versionCode()
&& !DATASOURCES_MUST_USE_CKP_V1.contains(sourceClassName);
}

// TODO(yihua): for checkpoint translation, handle cases where the checkpoint is not exactly the
// instant or completion time
public static StreamerCheckpointV2 convertToCheckpointV2ForCommitTime(
Checkpoint checkpoint, HoodieTableMetaClient metaClient) {
Checkpoint checkpoint, HoodieTableMetaClient metaClient, TimelineUtils.HollowCommitHandling handlingMode) {
if (checkpoint.checkpointKey.equals(HoodieTimeline.INIT_INSTANT_TS)) {
return new StreamerCheckpointV2(HoodieTimeline.INIT_INSTANT_TS);
}
Expand All @@ -65,7 +75,9 @@ public static StreamerCheckpointV2 convertToCheckpointV2ForCommitTime(
}
if (checkpoint instanceof StreamerCheckpointV1) {
// V1 -> V2 translation
// TODO(yihua): handle USE_TRANSITION_TIME in V1
if (handlingMode.equals(USE_TRANSITION_TIME)) {
return new StreamerCheckpointV2(checkpoint);
}
// TODO(yihua): handle different ordering between requested and completion time
// TODO(yihua): handle timeline history / archived timeline
String instantTime = checkpoint.getCheckpointKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,14 @@

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

import java.util.stream.Stream;

import static org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.BLOCK;
import static org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.FAIL;
import static org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.USE_TRANSITION_TIME;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -95,7 +100,7 @@ public void testConvertToCheckpointV2ForCommitTime() {
when(activeTimeline.getInstantsAsStream()).thenReturn(Stream.of(instant));

Checkpoint checkpoint = new StreamerCheckpointV1(instantTime);
StreamerCheckpointV2 translatedCheckpoint = CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient);
StreamerCheckpointV2 translatedCheckpoint = CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient, FAIL);

assertEquals(completionTime, translatedCheckpoint.getCheckpointKey());
}
Expand Down Expand Up @@ -126,7 +131,7 @@ public void testConvertToCheckpointV2ThrowsExceptionForMissingCompletionTime() {
Checkpoint checkpoint = new StreamerCheckpointV1(instantTime);

Exception exception = assertThrows(UnsupportedOperationException.class,
() -> CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient));
() -> CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient, BLOCK));
assertTrue(exception.getMessage().contains("Unable to find completion time"));
}

Expand Down Expand Up @@ -155,7 +160,7 @@ public void testConvertToCheckpointV2ForCommitTimeEmptyTimeline() {

Checkpoint checkpoint = new StreamerCheckpointV1(instantTime);
Exception exception = assertThrows(UnsupportedOperationException.class,
() -> CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient));
() -> CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient, FAIL));
assertTrue(exception.getMessage().contains("Unable to find completion time"));
}

Expand All @@ -168,7 +173,39 @@ public void testConvertCheckpointWithInitTimestamp() {
assertEquals(HoodieTimeline.INIT_INSTANT_TS, translated.getCheckpointKey());

checkpoint = new StreamerCheckpointV2(instantTime);
translated = CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient);
translated = CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient, BLOCK);
assertEquals(HoodieTimeline.INIT_INSTANT_TS, translated.getCheckpointKey());
}

@Test
public void testConvertCheckpointWithUseTransitionTime() {
String instantTime = "20231127010101";
String completionTime = "20231127020102";

// Mock active timeline
HoodieInstant instant = new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", instantTime, completionTime, InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR);
when(activeTimeline.getInstantsAsStream()).thenReturn(Stream.of(instant));

Checkpoint checkpoint = new StreamerCheckpointV1(completionTime);
StreamerCheckpointV2 translatedCheckpoint = CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient, USE_TRANSITION_TIME);

assertEquals(completionTime, translatedCheckpoint.getCheckpointKey());
}

@ParameterizedTest
@CsvSource({
// version, sourceClassName, expectedResult
// Version >= 8 with allowed sources should return true
"8, org.apache.hudi.utilities.sources.TestSource, true",
"9, org.apache.hudi.utilities.sources.AnotherSource, true",
// Version < 8 should return false regardless of source
"7, org.apache.hudi.utilities.sources.TestSource, false",
"6, org.apache.hudi.utilities.sources.AnotherSource, false",
// Disallowed sources should return false even with version >= 8
"8, org.apache.hudi.utilities.sources.S3EventsHoodieIncrSource, false",
"8, org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource, false"
})
public void testTargetCheckpointV2(int version, String sourceClassName, boolean expected) {
assertEquals(expected, CheckpointUtils.targetCheckpointV2(version, sourceClassName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ class HoodieStreamSourceV1(sqlContext: SQLContext,
private def translateCheckpoint(commitTime: String): String = {
if (writeTableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
CheckpointUtils.convertToCheckpointV2ForCommitTime(
new StreamerCheckpointV1(commitTime), metaClient).getCheckpointKey
new StreamerCheckpointV1(commitTime), metaClient, hollowCommitHandling).getCheckpointKey
} else {
commitTime
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ protected Option<Checkpoint> translateCheckpoint(Option<Checkpoint> lastCheckpoi

@Override
public Pair<Option<Dataset<Row>>, Checkpoint> fetchNextBatch(Option<Checkpoint> lastCheckpoint, long sourceLimit) {
if (CheckpointUtils.targetCheckpointV2(writeTableVersion)) {
if (CheckpointUtils.targetCheckpointV2(writeTableVersion, getClass().getName())) {
return fetchNextBatchBasedOnCompletionTime(lastCheckpoint, sourceLimit);
} else {
return fetchNextBatchBasedOnRequestedTime(lastCheckpoint, sourceLimit);
Expand All @@ -215,7 +215,7 @@ private Pair<Option<Dataset<Row>>, Checkpoint> fetchNextBatchBasedOnCompletionTi
IncrementalQueryAnalyzer analyzer = IncrSourceHelper.getIncrementalQueryAnalyzer(
sparkContext, srcPath, lastCheckpoint, missingCheckpointStrategy,
getIntWithAltKeys(props, HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH),
getLatestSourceProfile());
getLatestSourceProfile(), getHollowCommitHandleMode(props));
QueryContext queryContext = analyzer.analyze();
Option<InstantRange> instantRange = queryContext.getInstantRange();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ protected Option<Checkpoint> translateCheckpoint(Option<Checkpoint> lastCheckpoi
if (lastCheckpoint.isEmpty()) {
return Option.empty();
}
if (CheckpointUtils.targetCheckpointV2(writeTableVersion)) {
if (CheckpointUtils.targetCheckpointV2(writeTableVersion, getClass().getName())) {
// V2 -> V2
if (lastCheckpoint.get() instanceof StreamerCheckpointV2) {
return lastCheckpoint;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hudi.common.table.read.IncrementalQueryAnalyzer;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
Expand Down Expand Up @@ -184,7 +185,7 @@ public static IncrementalQueryAnalyzer getIncrementalQueryAnalyzer(
Option<Checkpoint> lastCheckpoint,
MissingCheckpointStrategy missingCheckpointStrategy,
int numInstantsFromConfig,
Option<SourceProfile<Integer>> latestSourceProfile) {
Option<SourceProfile<Integer>> latestSourceProfile, TimelineUtils.HollowCommitHandling handlingMode) {
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
.setConf(HadoopFSUtils.getStorageConfWithCopy(jssc.hadoopConfiguration()))
.setBasePath(srcPath)
Expand All @@ -197,7 +198,7 @@ public static IncrementalQueryAnalyzer getIncrementalQueryAnalyzer(
if (lastCheckpoint.isPresent() && !lastCheckpoint.get().getCheckpointKey().isEmpty()) {
// Translate checkpoint
StreamerCheckpointV2 lastStreamerCheckpointV2 = CheckpointUtils.convertToCheckpointV2ForCommitTime(
lastCheckpoint.get(), metaClient);
lastCheckpoint.get(), metaClient, handlingMode);
startCompletionTime = lastStreamerCheckpointV2.getCheckpointKey();
rangeType = RangeType.OPEN_CLOSED;
} else if (missingCheckpointStrategy != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public static Option<Checkpoint> getCheckpointToResumeFrom(Option<HoodieTimeline
LOG.debug("Checkpoint from config: " + streamerConfig.checkpoint);
if (!checkpoint.isPresent() && streamerConfig.checkpoint != null) {
int writeTableVersion = ConfigUtils.getIntWithAltKeys(props, HoodieWriteConfig.WRITE_TABLE_VERSION);
checkpoint = Option.of(CheckpointUtils.targetCheckpointV2(writeTableVersion)
checkpoint = Option.of(CheckpointUtils.targetCheckpointV2(writeTableVersion, streamerConfig.sourceClassName)
? new StreamerCheckpointV2(streamerConfig.checkpoint) : new StreamerCheckpointV1(streamerConfig.checkpoint));
}
return checkpoint;
Expand Down Expand Up @@ -106,7 +106,7 @@ static Option<Checkpoint> getCheckpointToResumeString(Option<HoodieTimeline> com
resumeCheckpoint = Option.empty();
} else if (streamerConfig.checkpoint != null && (StringUtils.isNullOrEmpty(checkpointFromCommit.getCheckpointResetKey())
|| !streamerConfig.checkpoint.equals(checkpointFromCommit.getCheckpointResetKey()))) {
resumeCheckpoint = Option.of(CheckpointUtils.targetCheckpointV2(writeTableVersion)
resumeCheckpoint = Option.of(CheckpointUtils.targetCheckpointV2(writeTableVersion, streamerConfig.sourceClassName)
? new StreamerCheckpointV2(streamerConfig.checkpoint) : new StreamerCheckpointV1(streamerConfig.checkpoint));
} else if (!StringUtils.isNullOrEmpty(checkpointFromCommit.getCheckpointKey())) {
//if previous checkpoint is an empty string, skip resume use Option.empty()
Expand All @@ -124,7 +124,7 @@ static Option<Checkpoint> getCheckpointToResumeString(Option<HoodieTimeline> com
}
} else if (streamerConfig.checkpoint != null) {
// getLatestCommitMetadataWithValidCheckpointInfo(commitTimelineOpt.get()) will never return a commit metadata w/o any checkpoint key set.
resumeCheckpoint = Option.of(CheckpointUtils.targetCheckpointV2(writeTableVersion)
resumeCheckpoint = Option.of(CheckpointUtils.targetCheckpointV2(writeTableVersion, streamerConfig.sourceClassName)
? new StreamerCheckpointV2(streamerConfig.checkpoint) : new StreamerCheckpointV1(streamerConfig.checkpoint));
}
}
Expand Down

0 comments on commit af123cc

Please sign in to comment.