From af123ccb10376b1473c7aad20a790f4b822bc450 Mon Sep 17 00:00:00 2001 From: Davis Zhang Date: Tue, 21 Jan 2025 14:07:28 -0800 Subject: [PATCH] cover USE_TRANSITION_TIME for ckp conversion --- .../table/checkpoint/CheckpointUtils.java | 20 +++++++-- .../table/checkpoint/TestCheckpointUtils.java | 45 +++++++++++++++++-- .../hudi/streaming/HoodieStreamSourceV1.scala | 2 +- .../utilities/sources/HoodieIncrSource.java | 4 +- .../apache/hudi/utilities/sources/Source.java | 2 +- .../sources/helpers/IncrSourceHelper.java | 5 ++- .../streamer/StreamerCheckpointUtils.java | 6 +-- 7 files changed, 67 insertions(+), 17 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java index 8e5c2db3aff97..9b553bc0d5f1a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java @@ -24,19 +24,28 @@ import org.apache.hudi.common.table.HoodieTableVersion; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.exception.HoodieException; +import java.util.Arrays; +import java.util.HashSet; import java.util.Objects; +import java.util.Set; import static org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1; import static org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1.STREAMER_CHECKPOINT_RESET_KEY_V1; import static org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2; import static org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_RESET_KEY_V2; +import static org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.USE_TRANSITION_TIME; public class CheckpointUtils { + public static final Set DATASOURCES_MUST_USE_CKP_V1 = new HashSet<>(Arrays.asList( + "org.apache.hudi.utilities.sources.S3EventsHoodieIncrSource", + "org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource" + )); public static Checkpoint getCheckpoint(HoodieCommitMetadata commitMetadata) { if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_KEY_V2)) || !StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_RESET_KEY_V2))) { @@ -49,14 +58,15 @@ public static Checkpoint getCheckpoint(HoodieCommitMetadata commitMetadata) { throw new HoodieException("Checkpoint is not found in the commit metadata: " + commitMetadata.getExtraMetadata()); } - public static boolean targetCheckpointV2(int writeTableVersion) { - return writeTableVersion >= HoodieTableVersion.EIGHT.versionCode(); + public static boolean targetCheckpointV2(int writeTableVersion, String sourceClassName) { + return writeTableVersion >= HoodieTableVersion.EIGHT.versionCode() + && !DATASOURCES_MUST_USE_CKP_V1.contains(sourceClassName); } // TODO(yihua): for checkpoint translation, handle cases where the checkpoint is not exactly the // instant or completion time public static StreamerCheckpointV2 convertToCheckpointV2ForCommitTime( - Checkpoint checkpoint, HoodieTableMetaClient metaClient) { + Checkpoint checkpoint, HoodieTableMetaClient metaClient, TimelineUtils.HollowCommitHandling handlingMode) { if (checkpoint.checkpointKey.equals(HoodieTimeline.INIT_INSTANT_TS)) { return new StreamerCheckpointV2(HoodieTimeline.INIT_INSTANT_TS); } @@ -65,7 +75,9 @@ public static StreamerCheckpointV2 convertToCheckpointV2ForCommitTime( } if (checkpoint instanceof StreamerCheckpointV1) { // V1 -> V2 translation - // TODO(yihua): handle USE_TRANSITION_TIME in V1 + if (handlingMode.equals(USE_TRANSITION_TIME)) { + return new StreamerCheckpointV2(checkpoint); + } // TODO(yihua): handle different ordering between requested and completion time // TODO(yihua): handle timeline history / archived timeline String instantTime = checkpoint.getCheckpointKey(); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java index 715d89c5078e2..300785f2c5f09 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java @@ -30,9 +30,14 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import java.util.stream.Stream; +import static org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.BLOCK; +import static org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.FAIL; +import static org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.USE_TRANSITION_TIME; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -95,7 +100,7 @@ public void testConvertToCheckpointV2ForCommitTime() { when(activeTimeline.getInstantsAsStream()).thenReturn(Stream.of(instant)); Checkpoint checkpoint = new StreamerCheckpointV1(instantTime); - StreamerCheckpointV2 translatedCheckpoint = CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient); + StreamerCheckpointV2 translatedCheckpoint = CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient, FAIL); assertEquals(completionTime, translatedCheckpoint.getCheckpointKey()); } @@ -126,7 +131,7 @@ public void testConvertToCheckpointV2ThrowsExceptionForMissingCompletionTime() { Checkpoint checkpoint = new StreamerCheckpointV1(instantTime); Exception exception = assertThrows(UnsupportedOperationException.class, - () -> CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient)); + () -> CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient, BLOCK)); assertTrue(exception.getMessage().contains("Unable to find completion time")); } @@ -155,7 +160,7 @@ public void testConvertToCheckpointV2ForCommitTimeEmptyTimeline() { Checkpoint checkpoint = new StreamerCheckpointV1(instantTime); Exception exception = assertThrows(UnsupportedOperationException.class, - () -> CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient)); + () -> CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient, FAIL)); assertTrue(exception.getMessage().contains("Unable to find completion time")); } @@ -168,7 +173,39 @@ public void testConvertCheckpointWithInitTimestamp() { assertEquals(HoodieTimeline.INIT_INSTANT_TS, translated.getCheckpointKey()); checkpoint = new StreamerCheckpointV2(instantTime); - translated = CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient); + translated = CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient, BLOCK); assertEquals(HoodieTimeline.INIT_INSTANT_TS, translated.getCheckpointKey()); } + + @Test + public void testConvertCheckpointWithUseTransitionTime() { + String instantTime = "20231127010101"; + String completionTime = "20231127020102"; + + // Mock active timeline + HoodieInstant instant = new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", instantTime, completionTime, InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR); + when(activeTimeline.getInstantsAsStream()).thenReturn(Stream.of(instant)); + + Checkpoint checkpoint = new StreamerCheckpointV1(completionTime); + StreamerCheckpointV2 translatedCheckpoint = CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient, USE_TRANSITION_TIME); + + assertEquals(completionTime, translatedCheckpoint.getCheckpointKey()); + } + + @ParameterizedTest + @CsvSource({ + // version, sourceClassName, expectedResult + // Version >= 8 with allowed sources should return true + "8, org.apache.hudi.utilities.sources.TestSource, true", + "9, org.apache.hudi.utilities.sources.AnotherSource, true", + // Version < 8 should return false regardless of source + "7, org.apache.hudi.utilities.sources.TestSource, false", + "6, org.apache.hudi.utilities.sources.AnotherSource, false", + // Disallowed sources should return false even with version >= 8 + "8, org.apache.hudi.utilities.sources.S3EventsHoodieIncrSource, false", + "8, org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource, false" + }) + public void testTargetCheckpointV2(int version, String sourceClassName, boolean expected) { + assertEquals(expected, CheckpointUtils.targetCheckpointV2(version, sourceClassName)); + } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV1.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV1.scala index 82a7e80465620..e683521909c1a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV1.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV1.scala @@ -188,7 +188,7 @@ class HoodieStreamSourceV1(sqlContext: SQLContext, private def translateCheckpoint(commitTime: String): String = { if (writeTableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) { CheckpointUtils.convertToCheckpointV2ForCommitTime( - new StreamerCheckpointV1(commitTime), metaClient).getCheckpointKey + new StreamerCheckpointV1(commitTime), metaClient, hollowCommitHandling).getCheckpointKey } else { commitTime } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java index 244bcbb63e313..ed7e938372432 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java @@ -190,7 +190,7 @@ protected Option translateCheckpoint(Option lastCheckpoi @Override public Pair>, Checkpoint> fetchNextBatch(Option lastCheckpoint, long sourceLimit) { - if (CheckpointUtils.targetCheckpointV2(writeTableVersion)) { + if (CheckpointUtils.targetCheckpointV2(writeTableVersion, getClass().getName())) { return fetchNextBatchBasedOnCompletionTime(lastCheckpoint, sourceLimit); } else { return fetchNextBatchBasedOnRequestedTime(lastCheckpoint, sourceLimit); @@ -215,7 +215,7 @@ private Pair>, Checkpoint> fetchNextBatchBasedOnCompletionTi IncrementalQueryAnalyzer analyzer = IncrSourceHelper.getIncrementalQueryAnalyzer( sparkContext, srcPath, lastCheckpoint, missingCheckpointStrategy, getIntWithAltKeys(props, HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH), - getLatestSourceProfile()); + getLatestSourceProfile(), getHollowCommitHandleMode(props)); QueryContext queryContext = analyzer.analyze(); Option instantRange = queryContext.getInstantRange(); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java index 754c9e9fe607f..39e03596867d4 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java @@ -104,7 +104,7 @@ protected Option translateCheckpoint(Option lastCheckpoi if (lastCheckpoint.isEmpty()) { return Option.empty(); } - if (CheckpointUtils.targetCheckpointV2(writeTableVersion)) { + if (CheckpointUtils.targetCheckpointV2(writeTableVersion, getClass().getName())) { // V2 -> V2 if (lastCheckpoint.get() instanceof StreamerCheckpointV2) { return lastCheckpoint; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java index d9fa890886bb8..9b75549c8be21 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java @@ -29,6 +29,7 @@ import org.apache.hudi.common.table.read.IncrementalQueryAnalyzer; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineUtils; import org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; @@ -184,7 +185,7 @@ public static IncrementalQueryAnalyzer getIncrementalQueryAnalyzer( Option lastCheckpoint, MissingCheckpointStrategy missingCheckpointStrategy, int numInstantsFromConfig, - Option> latestSourceProfile) { + Option> latestSourceProfile, TimelineUtils.HollowCommitHandling handlingMode) { HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() .setConf(HadoopFSUtils.getStorageConfWithCopy(jssc.hadoopConfiguration())) .setBasePath(srcPath) @@ -197,7 +198,7 @@ public static IncrementalQueryAnalyzer getIncrementalQueryAnalyzer( if (lastCheckpoint.isPresent() && !lastCheckpoint.get().getCheckpointKey().isEmpty()) { // Translate checkpoint StreamerCheckpointV2 lastStreamerCheckpointV2 = CheckpointUtils.convertToCheckpointV2ForCommitTime( - lastCheckpoint.get(), metaClient); + lastCheckpoint.get(), metaClient, handlingMode); startCompletionTime = lastStreamerCheckpointV2.getCheckpointKey(); rangeType = RangeType.OPEN_CLOSED; } else if (missingCheckpointStrategy != null) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java index c4af4385a335c..b1961e532e622 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java @@ -64,7 +64,7 @@ public static Option getCheckpointToResumeFrom(Option getCheckpointToResumeString(Option com resumeCheckpoint = Option.empty(); } else if (streamerConfig.checkpoint != null && (StringUtils.isNullOrEmpty(checkpointFromCommit.getCheckpointResetKey()) || !streamerConfig.checkpoint.equals(checkpointFromCommit.getCheckpointResetKey()))) { - resumeCheckpoint = Option.of(CheckpointUtils.targetCheckpointV2(writeTableVersion) + resumeCheckpoint = Option.of(CheckpointUtils.targetCheckpointV2(writeTableVersion, streamerConfig.sourceClassName) ? new StreamerCheckpointV2(streamerConfig.checkpoint) : new StreamerCheckpointV1(streamerConfig.checkpoint)); } else if (!StringUtils.isNullOrEmpty(checkpointFromCommit.getCheckpointKey())) { //if previous checkpoint is an empty string, skip resume use Option.empty() @@ -124,7 +124,7 @@ static Option getCheckpointToResumeString(Option com } } else if (streamerConfig.checkpoint != null) { // getLatestCommitMetadataWithValidCheckpointInfo(commitTimelineOpt.get()) will never return a commit metadata w/o any checkpoint key set. - resumeCheckpoint = Option.of(CheckpointUtils.targetCheckpointV2(writeTableVersion) + resumeCheckpoint = Option.of(CheckpointUtils.targetCheckpointV2(writeTableVersion, streamerConfig.sourceClassName) ? new StreamerCheckpointV2(streamerConfig.checkpoint) : new StreamerCheckpointV1(streamerConfig.checkpoint)); } }