diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java index 8e5c2db3aff9..3eb36620286b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.table.HoodieTableVersion; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.exception.HoodieException; @@ -34,6 +35,7 @@ import static org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1.STREAMER_CHECKPOINT_RESET_KEY_V1; import static org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2; import static org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_RESET_KEY_V2; +import static org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.USE_TRANSITION_TIME; public class CheckpointUtils { @@ -56,7 +58,7 @@ public static boolean targetCheckpointV2(int writeTableVersion) { // TODO(yihua): for checkpoint translation, handle cases where the checkpoint is not exactly the // instant or completion time public static StreamerCheckpointV2 convertToCheckpointV2ForCommitTime( - Checkpoint checkpoint, HoodieTableMetaClient metaClient) { + Checkpoint checkpoint, HoodieTableMetaClient metaClient, TimelineUtils.HollowCommitHandling handlingMode) { if (checkpoint.checkpointKey.equals(HoodieTimeline.INIT_INSTANT_TS)) { return new StreamerCheckpointV2(HoodieTimeline.INIT_INSTANT_TS); } @@ -65,7 +67,9 @@ public static StreamerCheckpointV2 convertToCheckpointV2ForCommitTime( } if (checkpoint instanceof StreamerCheckpointV1) { // V1 -> V2 translation - // TODO(yihua): handle USE_TRANSITION_TIME in V1 + if (handlingMode.equals(USE_TRANSITION_TIME)) { + return new StreamerCheckpointV2(checkpoint); + } // TODO(yihua): handle different ordering between requested and completion time // TODO(yihua): handle timeline history / archived timeline String instantTime = checkpoint.getCheckpointKey(); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java index 715d89c5078e..6894f8329ae6 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java @@ -33,6 +33,9 @@ import java.util.stream.Stream; +import static org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.BLOCK; +import static org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.FAIL; +import static org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.USE_TRANSITION_TIME; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -95,7 +98,7 @@ public void testConvertToCheckpointV2ForCommitTime() { when(activeTimeline.getInstantsAsStream()).thenReturn(Stream.of(instant)); Checkpoint checkpoint = new StreamerCheckpointV1(instantTime); - StreamerCheckpointV2 translatedCheckpoint = CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient); + StreamerCheckpointV2 translatedCheckpoint = CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient, FAIL); assertEquals(completionTime, translatedCheckpoint.getCheckpointKey()); } @@ -126,7 +129,7 @@ public void testConvertToCheckpointV2ThrowsExceptionForMissingCompletionTime() { Checkpoint checkpoint = new StreamerCheckpointV1(instantTime); Exception exception = assertThrows(UnsupportedOperationException.class, - () -> CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient)); + () -> CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient, BLOCK)); assertTrue(exception.getMessage().contains("Unable to find completion time")); } @@ -155,7 +158,7 @@ public void testConvertToCheckpointV2ForCommitTimeEmptyTimeline() { Checkpoint checkpoint = new StreamerCheckpointV1(instantTime); Exception exception = assertThrows(UnsupportedOperationException.class, - () -> CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient)); + () -> CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient, FAIL)); assertTrue(exception.getMessage().contains("Unable to find completion time")); } @@ -168,7 +171,22 @@ public void testConvertCheckpointWithInitTimestamp() { assertEquals(HoodieTimeline.INIT_INSTANT_TS, translated.getCheckpointKey()); checkpoint = new StreamerCheckpointV2(instantTime); - translated = CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient); + translated = CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient, BLOCK); assertEquals(HoodieTimeline.INIT_INSTANT_TS, translated.getCheckpointKey()); } + + @Test + public void testConvertCheckpointWithUseTransitionTime() { + String instantTime = "20231127010101"; + String completionTime = "20231127020102"; + + // Mock active timeline + HoodieInstant instant = new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", instantTime, completionTime, InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR); + when(activeTimeline.getInstantsAsStream()).thenReturn(Stream.of(instant)); + + Checkpoint checkpoint = new StreamerCheckpointV1(completionTime); + StreamerCheckpointV2 translatedCheckpoint = CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient, USE_TRANSITION_TIME); + + assertEquals(completionTime, translatedCheckpoint.getCheckpointKey()); + } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV1.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV1.scala index 82a7e8046562..e683521909c1 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV1.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV1.scala @@ -188,7 +188,7 @@ class HoodieStreamSourceV1(sqlContext: SQLContext, private def translateCheckpoint(commitTime: String): String = { if (writeTableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) { CheckpointUtils.convertToCheckpointV2ForCommitTime( - new StreamerCheckpointV1(commitTime), metaClient).getCheckpointKey + new StreamerCheckpointV1(commitTime), metaClient, hollowCommitHandling).getCheckpointKey } else { commitTime } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java index 244bcbb63e31..6a98c9442cc3 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java @@ -215,7 +215,7 @@ private Pair>, Checkpoint> fetchNextBatchBasedOnCompletionTi IncrementalQueryAnalyzer analyzer = IncrSourceHelper.getIncrementalQueryAnalyzer( sparkContext, srcPath, lastCheckpoint, missingCheckpointStrategy, getIntWithAltKeys(props, HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH), - getLatestSourceProfile()); + getLatestSourceProfile(), getHollowCommitHandleMode(props)); QueryContext queryContext = analyzer.analyze(); Option instantRange = queryContext.getInstantRange(); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java index d9fa890886bb..9b75549c8be2 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java @@ -29,6 +29,7 @@ import org.apache.hudi.common.table.read.IncrementalQueryAnalyzer; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineUtils; import org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; @@ -184,7 +185,7 @@ public static IncrementalQueryAnalyzer getIncrementalQueryAnalyzer( Option lastCheckpoint, MissingCheckpointStrategy missingCheckpointStrategy, int numInstantsFromConfig, - Option> latestSourceProfile) { + Option> latestSourceProfile, TimelineUtils.HollowCommitHandling handlingMode) { HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() .setConf(HadoopFSUtils.getStorageConfWithCopy(jssc.hadoopConfiguration())) .setBasePath(srcPath) @@ -197,7 +198,7 @@ public static IncrementalQueryAnalyzer getIncrementalQueryAnalyzer( if (lastCheckpoint.isPresent() && !lastCheckpoint.get().getCheckpointKey().isEmpty()) { // Translate checkpoint StreamerCheckpointV2 lastStreamerCheckpointV2 = CheckpointUtils.convertToCheckpointV2ForCommitTime( - lastCheckpoint.get(), metaClient); + lastCheckpoint.get(), metaClient, handlingMode); startCompletionTime = lastStreamerCheckpointV2.getCheckpointKey(); rangeType = RangeType.OPEN_CLOSED; } else if (missingCheckpointStrategy != null) {