Skip to content

Commit

Permalink
cover USE_TRANSITION_TIME for ckp conversion
Browse files Browse the repository at this point in the history
  • Loading branch information
Davis-Zhang-Onehouse committed Jan 21, 2025
1 parent 3992cbd commit d904332
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException;
Expand All @@ -34,6 +35,7 @@
import static org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1.STREAMER_CHECKPOINT_RESET_KEY_V1;
import static org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2;
import static org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_RESET_KEY_V2;
import static org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.USE_TRANSITION_TIME;

public class CheckpointUtils {

Expand All @@ -56,7 +58,7 @@ public static boolean targetCheckpointV2(int writeTableVersion) {
// TODO(yihua): for checkpoint translation, handle cases where the checkpoint is not exactly the
// instant or completion time
public static StreamerCheckpointV2 convertToCheckpointV2ForCommitTime(
Checkpoint checkpoint, HoodieTableMetaClient metaClient) {
Checkpoint checkpoint, HoodieTableMetaClient metaClient, TimelineUtils.HollowCommitHandling handlingMode) {
if (checkpoint.checkpointKey.equals(HoodieTimeline.INIT_INSTANT_TS)) {
return new StreamerCheckpointV2(HoodieTimeline.INIT_INSTANT_TS);
}
Expand All @@ -65,7 +67,9 @@ public static StreamerCheckpointV2 convertToCheckpointV2ForCommitTime(
}
if (checkpoint instanceof StreamerCheckpointV1) {
// V1 -> V2 translation
// TODO(yihua): handle USE_TRANSITION_TIME in V1
if (handlingMode.equals(USE_TRANSITION_TIME)) {
return new StreamerCheckpointV2(checkpoint);
}
// TODO(yihua): handle different ordering between requested and completion time
// TODO(yihua): handle timeline history / archived timeline
String instantTime = checkpoint.getCheckpointKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@

import java.util.stream.Stream;

import static org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.BLOCK;
import static org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.FAIL;
import static org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.USE_TRANSITION_TIME;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -95,7 +98,7 @@ public void testConvertToCheckpointV2ForCommitTime() {
when(activeTimeline.getInstantsAsStream()).thenReturn(Stream.of(instant));

Checkpoint checkpoint = new StreamerCheckpointV1(instantTime);
StreamerCheckpointV2 translatedCheckpoint = CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient);
StreamerCheckpointV2 translatedCheckpoint = CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient, FAIL);

assertEquals(completionTime, translatedCheckpoint.getCheckpointKey());
}
Expand Down Expand Up @@ -126,7 +129,7 @@ public void testConvertToCheckpointV2ThrowsExceptionForMissingCompletionTime() {
Checkpoint checkpoint = new StreamerCheckpointV1(instantTime);

Exception exception = assertThrows(UnsupportedOperationException.class,
() -> CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient));
() -> CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient, BLOCK));
assertTrue(exception.getMessage().contains("Unable to find completion time"));
}

Expand Down Expand Up @@ -155,7 +158,7 @@ public void testConvertToCheckpointV2ForCommitTimeEmptyTimeline() {

Checkpoint checkpoint = new StreamerCheckpointV1(instantTime);
Exception exception = assertThrows(UnsupportedOperationException.class,
() -> CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient));
() -> CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient, FAIL));
assertTrue(exception.getMessage().contains("Unable to find completion time"));
}

Expand All @@ -168,7 +171,22 @@ public void testConvertCheckpointWithInitTimestamp() {
assertEquals(HoodieTimeline.INIT_INSTANT_TS, translated.getCheckpointKey());

checkpoint = new StreamerCheckpointV2(instantTime);
translated = CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient);
translated = CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient, BLOCK);
assertEquals(HoodieTimeline.INIT_INSTANT_TS, translated.getCheckpointKey());
}

@Test
public void testConvertCheckpointWithUseTransitionTime() {
String instantTime = "20231127010101";
String completionTime = "20231127020102";

// Mock active timeline
HoodieInstant instant = new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", instantTime, completionTime, InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR);
when(activeTimeline.getInstantsAsStream()).thenReturn(Stream.of(instant));

Checkpoint checkpoint = new StreamerCheckpointV1(completionTime);
StreamerCheckpointV2 translatedCheckpoint = CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient, USE_TRANSITION_TIME);

assertEquals(completionTime, translatedCheckpoint.getCheckpointKey());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ class HoodieStreamSourceV1(sqlContext: SQLContext,
private def translateCheckpoint(commitTime: String): String = {
if (writeTableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
CheckpointUtils.convertToCheckpointV2ForCommitTime(
new StreamerCheckpointV1(commitTime), metaClient).getCheckpointKey
new StreamerCheckpointV1(commitTime), metaClient, hollowCommitHandling).getCheckpointKey
} else {
commitTime
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ private Pair<Option<Dataset<Row>>, Checkpoint> fetchNextBatchBasedOnCompletionTi
IncrementalQueryAnalyzer analyzer = IncrSourceHelper.getIncrementalQueryAnalyzer(
sparkContext, srcPath, lastCheckpoint, missingCheckpointStrategy,
getIntWithAltKeys(props, HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH),
getLatestSourceProfile());
getLatestSourceProfile(), getHollowCommitHandleMode(props));
QueryContext queryContext = analyzer.analyze();
Option<InstantRange> instantRange = queryContext.getInstantRange();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hudi.common.table.read.IncrementalQueryAnalyzer;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
Expand Down Expand Up @@ -184,7 +185,7 @@ public static IncrementalQueryAnalyzer getIncrementalQueryAnalyzer(
Option<Checkpoint> lastCheckpoint,
MissingCheckpointStrategy missingCheckpointStrategy,
int numInstantsFromConfig,
Option<SourceProfile<Integer>> latestSourceProfile) {
Option<SourceProfile<Integer>> latestSourceProfile, TimelineUtils.HollowCommitHandling handlingMode) {
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
.setConf(HadoopFSUtils.getStorageConfWithCopy(jssc.hadoopConfiguration()))
.setBasePath(srcPath)
Expand All @@ -197,7 +198,7 @@ public static IncrementalQueryAnalyzer getIncrementalQueryAnalyzer(
if (lastCheckpoint.isPresent() && !lastCheckpoint.get().getCheckpointKey().isEmpty()) {
// Translate checkpoint
StreamerCheckpointV2 lastStreamerCheckpointV2 = CheckpointUtils.convertToCheckpointV2ForCommitTime(
lastCheckpoint.get(), metaClient);
lastCheckpoint.get(), metaClient, handlingMode);
startCompletionTime = lastStreamerCheckpointV2.getCheckpointKey();
rangeType = RangeType.OPEN_CLOSED;
} else if (missingCheckpointStrategy != null) {
Expand Down

0 comments on commit d904332

Please sign in to comment.