From af123ccb10376b1473c7aad20a790f4b822bc450 Mon Sep 17 00:00:00 2001 From: Davis Zhang Date: Tue, 21 Jan 2025 14:07:28 -0800 Subject: [PATCH 1/3] cover USE_TRANSITION_TIME for ckp conversion --- .../table/checkpoint/CheckpointUtils.java | 20 +++++++-- .../table/checkpoint/TestCheckpointUtils.java | 45 +++++++++++++++++-- .../hudi/streaming/HoodieStreamSourceV1.scala | 2 +- .../utilities/sources/HoodieIncrSource.java | 4 +- .../apache/hudi/utilities/sources/Source.java | 2 +- .../sources/helpers/IncrSourceHelper.java | 5 ++- .../streamer/StreamerCheckpointUtils.java | 6 +-- 7 files changed, 67 insertions(+), 17 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java index 8e5c2db3aff97..9b553bc0d5f1a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java @@ -24,19 +24,28 @@ import org.apache.hudi.common.table.HoodieTableVersion; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.exception.HoodieException; +import java.util.Arrays; +import java.util.HashSet; import java.util.Objects; +import java.util.Set; import static org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1; import static org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1.STREAMER_CHECKPOINT_RESET_KEY_V1; import static org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2; import static org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_RESET_KEY_V2; +import static org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.USE_TRANSITION_TIME; public class CheckpointUtils { + public static final Set DATASOURCES_MUST_USE_CKP_V1 = new HashSet<>(Arrays.asList( + "org.apache.hudi.utilities.sources.S3EventsHoodieIncrSource", + "org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource" + )); public static Checkpoint getCheckpoint(HoodieCommitMetadata commitMetadata) { if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_KEY_V2)) || !StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_RESET_KEY_V2))) { @@ -49,14 +58,15 @@ public static Checkpoint getCheckpoint(HoodieCommitMetadata commitMetadata) { throw new HoodieException("Checkpoint is not found in the commit metadata: " + commitMetadata.getExtraMetadata()); } - public static boolean targetCheckpointV2(int writeTableVersion) { - return writeTableVersion >= HoodieTableVersion.EIGHT.versionCode(); + public static boolean targetCheckpointV2(int writeTableVersion, String sourceClassName) { + return writeTableVersion >= HoodieTableVersion.EIGHT.versionCode() + && !DATASOURCES_MUST_USE_CKP_V1.contains(sourceClassName); } // TODO(yihua): for checkpoint translation, handle cases where the checkpoint is not exactly the // instant or completion time public static StreamerCheckpointV2 convertToCheckpointV2ForCommitTime( - Checkpoint checkpoint, HoodieTableMetaClient metaClient) { + Checkpoint checkpoint, HoodieTableMetaClient metaClient, TimelineUtils.HollowCommitHandling handlingMode) { if (checkpoint.checkpointKey.equals(HoodieTimeline.INIT_INSTANT_TS)) { return new StreamerCheckpointV2(HoodieTimeline.INIT_INSTANT_TS); } @@ -65,7 +75,9 @@ public static StreamerCheckpointV2 convertToCheckpointV2ForCommitTime( } if (checkpoint instanceof StreamerCheckpointV1) { // V1 -> V2 translation - // TODO(yihua): handle USE_TRANSITION_TIME in V1 + if (handlingMode.equals(USE_TRANSITION_TIME)) { + return new StreamerCheckpointV2(checkpoint); + } // TODO(yihua): handle different ordering between requested and completion time // TODO(yihua): handle timeline history / archived timeline String instantTime = checkpoint.getCheckpointKey(); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java index 715d89c5078e2..300785f2c5f09 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java @@ -30,9 +30,14 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import java.util.stream.Stream; +import static org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.BLOCK; +import static org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.FAIL; +import static org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.USE_TRANSITION_TIME; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -95,7 +100,7 @@ public void testConvertToCheckpointV2ForCommitTime() { when(activeTimeline.getInstantsAsStream()).thenReturn(Stream.of(instant)); Checkpoint checkpoint = new StreamerCheckpointV1(instantTime); - StreamerCheckpointV2 translatedCheckpoint = CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient); + StreamerCheckpointV2 translatedCheckpoint = CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient, FAIL); assertEquals(completionTime, translatedCheckpoint.getCheckpointKey()); } @@ -126,7 +131,7 @@ public void testConvertToCheckpointV2ThrowsExceptionForMissingCompletionTime() { Checkpoint checkpoint = new StreamerCheckpointV1(instantTime); Exception exception = assertThrows(UnsupportedOperationException.class, - () -> CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient)); + () -> CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient, BLOCK)); assertTrue(exception.getMessage().contains("Unable to find completion time")); } @@ -155,7 +160,7 @@ public void testConvertToCheckpointV2ForCommitTimeEmptyTimeline() { Checkpoint checkpoint = new StreamerCheckpointV1(instantTime); Exception exception = assertThrows(UnsupportedOperationException.class, - () -> CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient)); + () -> CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient, FAIL)); assertTrue(exception.getMessage().contains("Unable to find completion time")); } @@ -168,7 +173,39 @@ public void testConvertCheckpointWithInitTimestamp() { assertEquals(HoodieTimeline.INIT_INSTANT_TS, translated.getCheckpointKey()); checkpoint = new StreamerCheckpointV2(instantTime); - translated = CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient); + translated = CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient, BLOCK); assertEquals(HoodieTimeline.INIT_INSTANT_TS, translated.getCheckpointKey()); } + + @Test + public void testConvertCheckpointWithUseTransitionTime() { + String instantTime = "20231127010101"; + String completionTime = "20231127020102"; + + // Mock active timeline + HoodieInstant instant = new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", instantTime, completionTime, InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR); + when(activeTimeline.getInstantsAsStream()).thenReturn(Stream.of(instant)); + + Checkpoint checkpoint = new StreamerCheckpointV1(completionTime); + StreamerCheckpointV2 translatedCheckpoint = CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient, USE_TRANSITION_TIME); + + assertEquals(completionTime, translatedCheckpoint.getCheckpointKey()); + } + + @ParameterizedTest + @CsvSource({ + // version, sourceClassName, expectedResult + // Version >= 8 with allowed sources should return true + "8, org.apache.hudi.utilities.sources.TestSource, true", + "9, org.apache.hudi.utilities.sources.AnotherSource, true", + // Version < 8 should return false regardless of source + "7, org.apache.hudi.utilities.sources.TestSource, false", + "6, org.apache.hudi.utilities.sources.AnotherSource, false", + // Disallowed sources should return false even with version >= 8 + "8, org.apache.hudi.utilities.sources.S3EventsHoodieIncrSource, false", + "8, org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource, false" + }) + public void testTargetCheckpointV2(int version, String sourceClassName, boolean expected) { + assertEquals(expected, CheckpointUtils.targetCheckpointV2(version, sourceClassName)); + } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV1.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV1.scala index 82a7e80465620..e683521909c1a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV1.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV1.scala @@ -188,7 +188,7 @@ class HoodieStreamSourceV1(sqlContext: SQLContext, private def translateCheckpoint(commitTime: String): String = { if (writeTableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) { CheckpointUtils.convertToCheckpointV2ForCommitTime( - new StreamerCheckpointV1(commitTime), metaClient).getCheckpointKey + new StreamerCheckpointV1(commitTime), metaClient, hollowCommitHandling).getCheckpointKey } else { commitTime } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java index 244bcbb63e313..ed7e938372432 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java @@ -190,7 +190,7 @@ protected Option translateCheckpoint(Option lastCheckpoi @Override public Pair>, Checkpoint> fetchNextBatch(Option lastCheckpoint, long sourceLimit) { - if (CheckpointUtils.targetCheckpointV2(writeTableVersion)) { + if (CheckpointUtils.targetCheckpointV2(writeTableVersion, getClass().getName())) { return fetchNextBatchBasedOnCompletionTime(lastCheckpoint, sourceLimit); } else { return fetchNextBatchBasedOnRequestedTime(lastCheckpoint, sourceLimit); @@ -215,7 +215,7 @@ private Pair>, Checkpoint> fetchNextBatchBasedOnCompletionTi IncrementalQueryAnalyzer analyzer = IncrSourceHelper.getIncrementalQueryAnalyzer( sparkContext, srcPath, lastCheckpoint, missingCheckpointStrategy, getIntWithAltKeys(props, HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH), - getLatestSourceProfile()); + getLatestSourceProfile(), getHollowCommitHandleMode(props)); QueryContext queryContext = analyzer.analyze(); Option instantRange = queryContext.getInstantRange(); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java index 754c9e9fe607f..39e03596867d4 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java @@ -104,7 +104,7 @@ protected Option translateCheckpoint(Option lastCheckpoi if (lastCheckpoint.isEmpty()) { return Option.empty(); } - if (CheckpointUtils.targetCheckpointV2(writeTableVersion)) { + if (CheckpointUtils.targetCheckpointV2(writeTableVersion, getClass().getName())) { // V2 -> V2 if (lastCheckpoint.get() instanceof StreamerCheckpointV2) { return lastCheckpoint; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java index d9fa890886bb8..9b75549c8be21 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java @@ -29,6 +29,7 @@ import org.apache.hudi.common.table.read.IncrementalQueryAnalyzer; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineUtils; import org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; @@ -184,7 +185,7 @@ public static IncrementalQueryAnalyzer getIncrementalQueryAnalyzer( Option lastCheckpoint, MissingCheckpointStrategy missingCheckpointStrategy, int numInstantsFromConfig, - Option> latestSourceProfile) { + Option> latestSourceProfile, TimelineUtils.HollowCommitHandling handlingMode) { HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() .setConf(HadoopFSUtils.getStorageConfWithCopy(jssc.hadoopConfiguration())) .setBasePath(srcPath) @@ -197,7 +198,7 @@ public static IncrementalQueryAnalyzer getIncrementalQueryAnalyzer( if (lastCheckpoint.isPresent() && !lastCheckpoint.get().getCheckpointKey().isEmpty()) { // Translate checkpoint StreamerCheckpointV2 lastStreamerCheckpointV2 = CheckpointUtils.convertToCheckpointV2ForCommitTime( - lastCheckpoint.get(), metaClient); + lastCheckpoint.get(), metaClient, handlingMode); startCompletionTime = lastStreamerCheckpointV2.getCheckpointKey(); rangeType = RangeType.OPEN_CLOSED; } else if (missingCheckpointStrategy != null) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java index c4af4385a335c..b1961e532e622 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java @@ -64,7 +64,7 @@ public static Option getCheckpointToResumeFrom(Option getCheckpointToResumeString(Option com resumeCheckpoint = Option.empty(); } else if (streamerConfig.checkpoint != null && (StringUtils.isNullOrEmpty(checkpointFromCommit.getCheckpointResetKey()) || !streamerConfig.checkpoint.equals(checkpointFromCommit.getCheckpointResetKey()))) { - resumeCheckpoint = Option.of(CheckpointUtils.targetCheckpointV2(writeTableVersion) + resumeCheckpoint = Option.of(CheckpointUtils.targetCheckpointV2(writeTableVersion, streamerConfig.sourceClassName) ? new StreamerCheckpointV2(streamerConfig.checkpoint) : new StreamerCheckpointV1(streamerConfig.checkpoint)); } else if (!StringUtils.isNullOrEmpty(checkpointFromCommit.getCheckpointKey())) { //if previous checkpoint is an empty string, skip resume use Option.empty() @@ -124,7 +124,7 @@ static Option getCheckpointToResumeString(Option com } } else if (streamerConfig.checkpoint != null) { // getLatestCommitMetadataWithValidCheckpointInfo(commitTimelineOpt.get()) will never return a commit metadata w/o any checkpoint key set. - resumeCheckpoint = Option.of(CheckpointUtils.targetCheckpointV2(writeTableVersion) + resumeCheckpoint = Option.of(CheckpointUtils.targetCheckpointV2(writeTableVersion, streamerConfig.sourceClassName) ? new StreamerCheckpointV2(streamerConfig.checkpoint) : new StreamerCheckpointV1(streamerConfig.checkpoint)); } } From ad6a55da97264070e90eaba55e895a3143a2bdd9 Mon Sep 17 00:00:00 2001 From: Davis Zhang Date: Wed, 22 Jan 2025 19:05:53 -0800 Subject: [PATCH 2/3] Test + S3/GCS incremental source ckp version validation --- .../hudi/testutils/HoodieClientTestUtils.java | 2 +- .../SparkClientFunctionalTestHarness.java | 10 + .../table/checkpoint/CheckpointUtils.java | 3 +- .../hudi/common/table/TestTimelineUtils.java | 17 - .../testutils/HoodieCommonTestHarness.java | 25 ++ .../hudi/streaming/HoodieStreamSourceV2.scala | 2 +- .../hudi/utilities/streamer/StreamSync.java | 31 +- .../streamer/StreamerCheckpointUtils.java | 24 +- .../HoodieDeltaStreamerTestBase.java | 4 +- .../TestHoodieDeltaStreamer.java | 21 + .../sources/MockS3EventsHoodieIncrSource.java | 187 +++++++++ .../S3EventsHoodieIncrSourceHarness.java | 308 +++++++++++++++ .../TestGcsEventsHoodieIncrSource.java | 2 +- .../sources/TestS3EventsHoodieIncrSource.java | 262 +------------ ...csEventsHoodieIncrSourceE2ECkpVersion.java | 223 +++++++++++ .../utilities/streamer/TestStreamSync.java | 124 ++++-- .../streamer/TestStreamerCheckpointUtils.java | 366 ++++++++++++++++++ 17 files changed, 1283 insertions(+), 328 deletions(-) create mode 100644 hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/MockS3EventsHoodieIncrSource.java create mode 100644 hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSourceHarness.java create mode 100644 hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestS3GcsEventsHoodieIncrSourceE2ECkpVersion.java create mode 100644 hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamerCheckpointUtils.java diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java index a23bca1701b3a..c305ccd42f093 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java @@ -325,7 +325,7 @@ public static HoodieTableMetaClient createMetaClient(SparkSession spark, String return HoodieTestUtils.createMetaClient(new HadoopStorageConfiguration(spark.sessionState().newHadoopConf()), basePath); } - private static Option getCommitMetadataForInstant(HoodieTableMetaClient metaClient, HoodieInstant instant) { + public static Option getCommitMetadataForInstant(HoodieTableMetaClient metaClient, HoodieInstant instant) { try { HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); byte[] data = timeline.getInstantDetails(instant).get(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java index e5f7406652a2b..a24710ce20193 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java @@ -25,6 +25,7 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.config.HoodieStorageConfig; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieBaseFile; @@ -38,6 +39,7 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.testutils.HoodieTestTable; +import org.apache.hudi.common.util.ConfigUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieCompactionConfig; @@ -87,6 +89,7 @@ import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; import static org.apache.hudi.common.testutils.HoodieTestUtils.RAW_TRIPS_TEST_NAME; +import static org.apache.hudi.config.HoodieWriteConfig.WRITE_TABLE_VERSION; import static org.apache.hudi.testutils.Assertions.assertFileSizesEqual; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -181,12 +184,19 @@ public HoodieTableMetaClient getHoodieMetaClient(StorageConfiguration storage return getHoodieMetaClient(storageConf, basePath, getPropertiesForKeyGen(true)); } + public HoodieTableMetaClient getHoodieMetaClientWithTableVersion(StorageConfiguration storageConf, String basePath, String tableVersion) throws IOException { + Properties props = getPropertiesForKeyGen(true); + props.put(WRITE_TABLE_VERSION.key(), tableVersion); + return getHoodieMetaClient(storageConf, basePath, props); + } + @Override public HoodieTableMetaClient getHoodieMetaClient(StorageConfiguration storageConf, String basePath, Properties props) throws IOException { return HoodieTableMetaClient.newTableBuilder() .setTableName(RAW_TRIPS_TEST_NAME) .setTableType(COPY_ON_WRITE) .setPayloadClass(HoodieAvroPayload.class) + .setTableVersion(ConfigUtils.getIntWithAltKeys(new TypedProperties(props), WRITE_TABLE_VERSION)) .fromProperties(props) .initTable(storageConf.newInstance(), basePath); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java index 9b553bc0d5f1a..9c1796fc80b46 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java @@ -44,7 +44,8 @@ public class CheckpointUtils { public static final Set DATASOURCES_MUST_USE_CKP_V1 = new HashSet<>(Arrays.asList( "org.apache.hudi.utilities.sources.S3EventsHoodieIncrSource", - "org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource" + "org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource", + "org.apache.hudi.utilities.sources.MockS3EventsHoodieIncrSource" )); public static Checkpoint getCheckpoint(HoodieCommitMetadata commitMetadata) { if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_KEY_V2)) diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java index c58ba40199406..96f68e5f3ecf5 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java @@ -25,7 +25,6 @@ import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCleaningPolicy; -import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.WriteOperationType; @@ -544,22 +543,6 @@ private HoodieRollbackMetadata getRollbackMetadataInstance(String basePath, Stri return TimelineMetadataUtils.convertRollbackMetadata(commitTs, Option.empty(), rollbacks, rollbackStats); } - private byte[] getCommitMetadata(String basePath, String partition, String commitTs, int count, Map extraMetadata) - throws IOException { - HoodieCommitMetadata commit = new HoodieCommitMetadata(); - for (int i = 1; i <= count; i++) { - HoodieWriteStat stat = new HoodieWriteStat(); - stat.setFileId(i + ""); - stat.setPartitionPath(Paths.get(basePath, partition).toString()); - stat.setPath(commitTs + "." + i + metaClient.getTableConfig().getBaseFileFormat().getFileExtension()); - commit.addWriteStat(partition, stat); - } - for (Map.Entry extraEntries : extraMetadata.entrySet()) { - commit.addMetadata(extraEntries.getKey(), extraEntries.getValue()); - } - return serializeCommitMetadata(metaClient.getCommitMetadataSerDe(), commit).get(); - } - private byte[] getReplaceCommitMetadata(String basePath, String commitTs, String replacePartition, int replaceCount, String newFilePartition, int newFileCount, Map extraMetadata, WriteOperationType operationType) diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java index 01fade5f48876..eb7a04c83e224 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java @@ -20,9 +20,11 @@ import org.apache.hudi.common.config.HoodieReaderConfig; import org.apache.hudi.common.model.HoodieAvroIndexedRecord; +import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; @@ -55,6 +57,7 @@ import java.io.IOException; import java.net.URI; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -70,6 +73,7 @@ import static org.apache.hudi.common.config.HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME; import static org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME; +import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCommitMetadata; /** * The common hoodie test harness to provide the basic infrastructure. @@ -377,4 +381,25 @@ private static HoodieDataBlock getDataBlock(HoodieLogBlock.HoodieLogBlockType da throw new RuntimeException("Unknown data block type " + dataBlockType); } } + + public byte[] getCommitMetadata(String basePath, String partition, String commitTs, int count, Map extraMetadata) + throws IOException { + return getCommitMetadata(metaClient, basePath, partition, commitTs, count, extraMetadata); + } + + public static byte[] getCommitMetadata(HoodieTableMetaClient metaClient, String basePath, String partition, String commitTs, int count, Map extraMetadata) + throws IOException { + HoodieCommitMetadata commit = new HoodieCommitMetadata(); + for (int i = 1; i <= count; i++) { + HoodieWriteStat stat = new HoodieWriteStat(); + stat.setFileId(i + ""); + stat.setPartitionPath(Paths.get(basePath, partition).toString()); + stat.setPath(commitTs + "." + i + metaClient.getTableConfig().getBaseFileFormat().getFileExtension()); + commit.addWriteStat(partition, stat); + } + for (Map.Entry extraEntries : extraMetadata.entrySet()) { + commit.addMetadata(extraEntries.getKey(), extraEntries.getValue()); + } + return serializeCommitMetadata(metaClient.getCommitMetadataSerDe(), commit).get(); + } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala index f825aa7e186f7..2f3436a1c1e2e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala @@ -163,7 +163,7 @@ class HoodieStreamSourceV2(sqlContext: SQLContext, } private def translateCheckpoint(commitTime: String): String = { - if (writeTableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) { + if (CheckpointUtils.targetCheckpointV2(writeTableVersion.versionCode(), getClass.getName)) { commitTime } else { CheckpointUtils.convertToCheckpointV1ForCommitTime( diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java index bb408753374f9..cd378448595db 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java @@ -47,6 +47,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.checkpoint.Checkpoint; +import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1; import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2; import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -139,6 +140,7 @@ import static org.apache.hudi.common.table.HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE; import static org.apache.hudi.common.table.HoodieTableConfig.TIMELINE_HISTORY_PATH; import static org.apache.hudi.common.table.HoodieTableConfig.URL_ENCODE_PARTITIONING; +import static org.apache.hudi.common.table.checkpoint.CheckpointUtils.targetCheckpointV2; import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys; import static org.apache.hudi.config.HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE; import static org.apache.hudi.config.HoodieClusteringConfig.INLINE_CLUSTERING; @@ -808,14 +810,7 @@ private Pair, JavaRDD> writeToSinkAndDoMetaSync(Stri } boolean hasErrors = totalErrorRecords > 0; if (!hasErrors || cfg.commitOnErrors) { - Map checkpointCommitMetadata = - !getBooleanWithAltKeys(props, CHECKPOINT_FORCE_SKIP) - ? inputBatch.getCheckpointForNextBatch() != null - ? inputBatch.getCheckpointForNextBatch().getCheckpointCommitMetadata( - cfg.checkpoint, cfg.ignoreCheckpoint) - : new StreamerCheckpointV2((String) null).getCheckpointCommitMetadata( - cfg.checkpoint, cfg.ignoreCheckpoint) - : Collections.emptyMap(); + Map checkpointCommitMetadata = extractCheckpointMetadata(inputBatch, props, writeClient.getConfig().getWriteVersion().versionCode(), cfg); if (hasErrors) { LOG.warn("Some records failed to be merged but forcing commit since commitOnErrors set. Errors/Total=" @@ -879,6 +874,26 @@ private Pair, JavaRDD> writeToSinkAndDoMetaSync(Stri return Pair.of(scheduledCompactionInstant, writeStatusRDD); } + Map extractCheckpointMetadata(InputBatch inputBatch, TypedProperties props, int versionCode, HoodieStreamer.Config cfg) { + // If checkpoint force skip is enabled, return empty map + if (getBooleanWithAltKeys(props, CHECKPOINT_FORCE_SKIP)) { + return Collections.emptyMap(); + } + + // If we have a next checkpoint batch, use its metadata + if (inputBatch.getCheckpointForNextBatch() != null) { + return inputBatch.getCheckpointForNextBatch() + .getCheckpointCommitMetadata(cfg.checkpoint, cfg.ignoreCheckpoint); + } + + // Otherwise create new checkpoint based on version + Checkpoint checkpoint = targetCheckpointV2(versionCode, cfg.sourceClassName) + ? new StreamerCheckpointV2((String) null) + : new StreamerCheckpointV1((String) null); + + return checkpoint.getCheckpointCommitMetadata(cfg.checkpoint, cfg.ignoreCheckpoint); + } + /** * Try to start a new commit. *

diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java index b1961e532e622..65df22271769a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java @@ -58,7 +58,7 @@ public static Option getCheckpointToResumeFrom(Option checkpoint = Option.empty(); if (commitsTimelineOpt.isPresent()) { - checkpoint = getCheckpointToResumeString(commitsTimelineOpt, streamerConfig, props); + checkpoint = getCheckpointToResumeString(commitsTimelineOpt.get(), streamerConfig, props); } LOG.debug("Checkpoint from config: " + streamerConfig.checkpoint); @@ -73,28 +73,30 @@ public static Option getCheckpointToResumeFrom(Option getCheckpointToResumeString(Option commitsTimelineOpt, + static Option getCheckpointToResumeString(HoodieTimeline commitsTimeline, HoodieStreamer.Config streamerConfig, TypedProperties props) throws IOException { Option resumeCheckpoint = Option.empty(); - // try get checkpoint from commits(including commit and deltacommit) - // in COW migrating to MOR case, the first batch of the deltastreamer will lost the checkpoint from COW table, cause the dataloss - HoodieTimeline deltaCommitTimeline = commitsTimelineOpt.get().filter(instant -> instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)); // has deltacommit and this is a MOR table, then we should get checkpoint from .deltacommit // if changing from mor to cow, before changing we must do a full compaction, so we can only consider .commit in such case - if (streamerConfig.tableType.equals(HoodieTableType.MERGE_ON_READ.name()) && !deltaCommitTimeline.empty()) { - commitsTimelineOpt = Option.of(deltaCommitTimeline); + if (streamerConfig.tableType.equals(HoodieTableType.MERGE_ON_READ.name())) { + // try get checkpoint from commits(including commit and deltacommit) + // in COW migrating to MOR case, the first batch of the deltastreamer will lost the checkpoint from COW table, cause the dataloss + HoodieTimeline deltaCommitTimeline = commitsTimeline.filter(instant -> instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)); + if (!deltaCommitTimeline.empty()) { + commitsTimeline = deltaCommitTimeline; + } } - Option lastCommit = commitsTimelineOpt.get().lastInstant(); + Option lastCommit = commitsTimeline.lastInstant(); if (lastCommit.isPresent()) { // if previous commit metadata did not have the checkpoint key, try traversing previous commits until we find one. - Option commitMetadataOption = getLatestCommitMetadataWithValidCheckpointInfo(commitsTimelineOpt.get()); + Option commitMetadataOption = getLatestCommitMetadataWithValidCheckpointInfo(commitsTimeline); int writeTableVersion = ConfigUtils.getIntWithAltKeys(props, HoodieWriteConfig.WRITE_TABLE_VERSION); if (commitMetadataOption.isPresent()) { HoodieCommitMetadata commitMetadata = commitMetadataOption.get(); @@ -116,7 +118,7 @@ static Option getCheckpointToResumeString(Option com throw new HoodieStreamerException( "Unable to find previous checkpoint. Please double check if this table " + "was indeed built via delta streamer. Last Commit :" + lastCommit + ", Instants :" - + commitsTimelineOpt.get().getInstants()); + + commitsTimeline.getInstants()); } // KAFKA_CHECKPOINT_TYPE will be honored only for first batch. if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(HoodieStreamer.CHECKPOINT_RESET_KEY))) { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java index 7a9372c5ac74e..31102b379ce4d 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java @@ -563,7 +563,7 @@ void assertNoPartitionMatch(String basePath, SQLContext sqlContext, String parti .count()); } - static class TestHelpers { + public static class TestHelpers { static HoodieDeltaStreamer.Config makeDropAllConfig(String basePath, WriteOperationType op) { return makeConfig(basePath, op, Collections.singletonList(TestHoodieDeltaStreamer.DropAllTransformer.class.getName())); @@ -590,7 +590,7 @@ static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType useSchemaProviderClass, 1000, updatePayloadClass, payloadClassName, tableType, "timestamp", null); } - static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, String sourceClassName, + public static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, String sourceClassName, List transformerClassNames, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, int sourceLimit, boolean updatePayloadClass, String payloadClassName, String tableType, String sourceOrderingField, String checkpoint) { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java index 55c03716d53f8..beb90fd4aad09 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java @@ -83,6 +83,7 @@ import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.StoragePathInfo; import org.apache.hudi.sync.common.HoodieSyncConfig; +import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.hudi.utilities.DummySchemaProvider; import org.apache.hudi.utilities.HoodieClusteringJob; import org.apache.hudi.utilities.HoodieIndexer; @@ -167,6 +168,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1; +import static org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2; import static org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN; import static org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_FILE_NAME_GENERATOR; import static org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient; @@ -422,6 +425,17 @@ public void testInferKeyGenerator(String propsFilename, expectedKeyGeneratorClassName, metaClient.getTableConfig().getKeyGeneratorClassName()); Dataset res = sqlContext.read().format("hudi").load(tableBasePath); assertEquals(1000, res.count()); + assertUseV2Checkpoint(metaClient); + } + + private static void assertUseV2Checkpoint(HoodieTableMetaClient metaClient) { + metaClient.reloadActiveTimeline(); + Option metadata = HoodieClientTestUtils.getCommitMetadataForInstant( + metaClient, metaClient.getActiveTimeline().lastInstant().get()); + assertFalse(metadata.isEmpty()); + Map extraMetadata = metadata.get().getExtraMetadata(); + assertTrue(extraMetadata.containsKey(STREAMER_CHECKPOINT_KEY_V2)); + assertFalse(extraMetadata.containsKey(STREAMER_CHECKPOINT_KEY_V1)); } @Test @@ -492,6 +506,7 @@ public void testBulkInsertsAndUpsertsWithBootstrap(HoodieRecordType recordType) cfg.configs.add(String.format("%s=false", HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key())); cfg.targetBasePath = newDatasetBasePath; new HoodieDeltaStreamer(cfg, jsc).sync(); + assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage, newDatasetBasePath)); Dataset res = sqlContext.read().format("org.apache.hudi").load(newDatasetBasePath); LOG.info("Schema :"); res.printSchema(); @@ -568,6 +583,8 @@ public void testSchemaEvolution(String tableType, boolean useUserProvidedSchema, cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true"); new HoodieDeltaStreamer(cfg, jsc).sync(); + assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage, tableBasePath)); + assertRecordCount(1000, tableBasePath, sqlContext); TestHelpers.assertCommitMetadata("00000", tableBasePath, 1); @@ -579,6 +596,7 @@ public void testSchemaEvolution(String tableType, boolean useUserProvidedSchema, cfg.configs.add("hoodie.streamer.schemaprovider.target.schema.file=" + basePath + "/source_evolved.avsc"); cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true"); new HoodieDeltaStreamer(cfg, jsc).sync(); + assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage, tableBasePath)); // out of 1000 new records, 500 are inserts, 450 are updates and 50 are deletes. assertRecordCount(1450, tableBasePath, sqlContext); TestHelpers.assertCommitMetadata("00001", tableBasePath, 2); @@ -647,6 +665,7 @@ public void testUpsertsCOW_ContinuousModeDisabled() throws Exception { assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext); assertFalse(Metrics.isInitialized(tableBasePath), "Metrics should be shutdown"); UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath); + assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage, tableBasePath)); } @Timeout(600) @@ -676,6 +695,7 @@ public void testUpsertsMOR_ContinuousModeDisabled() throws Exception { assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext); assertFalse(Metrics.isInitialized(tableBasePath), "Metrics should be shutdown"); UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath); + assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage, tableBasePath)); } private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir, HoodieRecordType recordType) throws Exception { @@ -1384,6 +1404,7 @@ private void testBulkInsertRowWriterMultiBatches(Boolean useSchemaProvider, List entry, metaClient, WriteOperationType.BULK_INSERT)); } } + assertUseV2Checkpoint(createMetaClient(jsc, tableBasePath)); } finally { deltaStreamer.shutdownGracefully(); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/MockS3EventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/MockS3EventsHoodieIncrSource.java new file mode 100644 index 0000000000000..8604792c8511f --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/MockS3EventsHoodieIncrSource.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.table.checkpoint.Checkpoint; +import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics; +import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher; +import org.apache.hudi.utilities.sources.helpers.QueryRunner; +import org.apache.hudi.utilities.streamer.DefaultStreamContext; +import org.apache.hudi.utilities.streamer.StreamContext; + +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +import java.util.function.BiFunction; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class MockS3EventsHoodieIncrSource extends S3EventsHoodieIncrSource { + + public static final String OP_FETCH_NEXT_BATCH = "mockTestFetchNextBatchOp"; + public static final String VAL_INPUT_CKP = "valInputCkp"; + + public static final String OP_EMPTY_ROW_SET_NONE_NULL_CKP1_KEY = "OP_EMPTY_ROW_SET_NONE_NULL_CKP1_KEY"; + public static final String CUSTOM_CHECKPOINT1 = "custom-checkpoint1"; + + private static final BiFunction, Long, Pair>, Checkpoint>> EMPTY_ROW_SET_NONE_NULL_CKP_1 = + (checkpoint, limit) -> { + Option> empty = Option.empty(); + return Pair.of( + empty, + new StreamerCheckpointV1(CUSTOM_CHECKPOINT1)); + }; + + public static final String OP_EMPTY_ROW_SET_NONE_NULL_CKP2_KEY = "OP_EMPTY_ROW_SET_NONE_NULL_CKP2_KEY"; + public static final String CUSTOM_CHECKPOINT2 = "custom-checkpoint2"; + + private static final BiFunction, Long, Pair>, Checkpoint>> EMPTY_ROW_SET_NONE_NULL_CKP_2 = + (checkpoint, limit) -> { + Option> empty = Option.empty(); + return Pair.of( + empty, + new StreamerCheckpointV1(CUSTOM_CHECKPOINT2) + ); + }; + + public static final String OP_EMPTY_ROW_SET_NULL_CKP_KEY = "OP_EMPTY_ROW_SET_NULL_CKP"; + private static final BiFunction, Long, Pair>, Checkpoint>> EMPTY_ROW_SET_NULL_CKP = + (checkpoint, limit) -> { + Option> empty = Option.empty(); + return Pair.of( + empty, + null + ); + }; + + public static final String VAL_CKP_KEY_EQ_VAL = "VAL_CKP_KEY"; + public static final String VAL_CKP_RESET_KEY_EQUALS = "VAL_CKP_RESET_KEY"; + public static final String VAL_CKP_RESET_KEY_IS_NULL = "VAL_CKP_RESET_KEY_IS_NULL"; + public static final String VAL_CKP_IGNORE_KEY_EQUALS = "VAL_CKP_IGNORE_KEY"; + public static final String VAL_CKP_IGNORE_KEY_IS_NULL = "VAL_CKP_IGNORE_KEY_IS_NULL"; + + public static final String VAL_NON_EMPTY_CKP_ALL_MEMBERS = "VAL_NON_EMPTY_CKP_ALL_MEMBERS"; + private static final BiFunction, TypedProperties, Void> VAL_NON_EMPTY_CKP_WITH_FIXED_VALUE = (ckpOpt, props) -> { + assertFalse(ckpOpt.isEmpty()); + if (props.containsKey(VAL_CKP_KEY_EQ_VAL)) { + assertEquals(props.getString(VAL_CKP_KEY_EQ_VAL), ckpOpt.get().getCheckpointKey()); + } + + if (props.containsKey(VAL_CKP_RESET_KEY_EQUALS)) { + assertEquals(ckpOpt.get().getCheckpointResetKey(), props.getString(VAL_CKP_RESET_KEY_EQUALS)); + } + if (props.containsKey(VAL_CKP_RESET_KEY_IS_NULL)) { + assertNull(ckpOpt.get().getCheckpointResetKey()); + } + + if (props.containsKey(VAL_CKP_IGNORE_KEY_EQUALS)) { + assertEquals(ckpOpt.get().getCheckpointIgnoreKey(), props.getString(VAL_CKP_IGNORE_KEY_EQUALS)); + } + if (props.containsKey(VAL_CKP_IGNORE_KEY_IS_NULL)) { + assertNull(ckpOpt.get().getCheckpointIgnoreKey()); + } + return null; + }; + + public static final String VAL_EMPTY_CKP_KEY = "VAL_EMPTY_CKP_KEY"; + private static final BiFunction, TypedProperties, Void> VAL_EMPTY_CKP = (ckpOpt, props) -> { + assertTrue(ckpOpt.isEmpty()); + return null; + }; + + public static final String VAL_NO_OP = "VAL_NO_OP"; + + public MockS3EventsHoodieIncrSource( + TypedProperties props, + JavaSparkContext sparkContext, + SparkSession sparkSession, + SchemaProvider schemaProvider, + HoodieIngestionMetrics metrics) { + this(props, sparkContext, sparkSession, new QueryRunner(sparkSession, props), + new CloudDataFetcher(props, sparkContext, sparkSession, metrics), new DefaultStreamContext(schemaProvider, Option.empty())); + } + + public MockS3EventsHoodieIncrSource( + TypedProperties props, + JavaSparkContext sparkContext, + SparkSession sparkSession, + HoodieIngestionMetrics metrics, + StreamContext streamContext) { + this(props, sparkContext, sparkSession, new QueryRunner(sparkSession, props), + new CloudDataFetcher(props, sparkContext, sparkSession, metrics), streamContext); + } + + MockS3EventsHoodieIncrSource( + TypedProperties props, + JavaSparkContext sparkContext, + SparkSession sparkSession, + QueryRunner queryRunner, + CloudDataFetcher cloudDataFetcher, + StreamContext streamContext) { + super(props, sparkContext, sparkSession, queryRunner, cloudDataFetcher, streamContext); + } + + @Override + public Pair>, Checkpoint> fetchNextBatch(Option lastCheckpoint, long sourceLimit) { + String valType = (String) props.getOrDefault(VAL_INPUT_CKP, VAL_NO_OP); + validateCheckpointOption(lastCheckpoint, valType); + + String opType = (String) props.getOrDefault(OP_FETCH_NEXT_BATCH, OP_EMPTY_ROW_SET_NONE_NULL_CKP1_KEY); + return applyDummyOpOfFetchNextBatch(lastCheckpoint, sourceLimit, opType); + } + + private Pair>, Checkpoint> applyDummyOpOfFetchNextBatch(Option lastCheckpoint, long sourceLimit, String opType) { + if (opType.equals(OP_EMPTY_ROW_SET_NONE_NULL_CKP1_KEY)) { + return EMPTY_ROW_SET_NONE_NULL_CKP_1.apply(lastCheckpoint, sourceLimit); + } + if (opType.equals(OP_EMPTY_ROW_SET_NONE_NULL_CKP2_KEY)) { + return EMPTY_ROW_SET_NONE_NULL_CKP_2.apply(lastCheckpoint, sourceLimit); + } + if (opType.equals(OP_EMPTY_ROW_SET_NULL_CKP_KEY)) { + return EMPTY_ROW_SET_NULL_CKP.apply(lastCheckpoint, sourceLimit); + } + throw new IllegalArgumentException("Unsupported operation type: " + opType); + } + + private void validateCheckpointOption(Option lastCheckpoint, String valType) { + if (!lastCheckpoint.isEmpty()) { + assertInstanceOf(StreamerCheckpointV1.class, lastCheckpoint.get()); + } + if (valType.equals(VAL_NO_OP)) { + return; + } + if (valType.equals(VAL_NON_EMPTY_CKP_ALL_MEMBERS)) { + VAL_NON_EMPTY_CKP_WITH_FIXED_VALUE.apply(lastCheckpoint, props); + } + if (valType.equals(VAL_EMPTY_CKP_KEY)) { + VAL_EMPTY_CKP.apply(lastCheckpoint, props); + } + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSourceHarness.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSourceHarness.java new file mode 100644 index 0000000000000..03df6201ba346 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSourceHarness.java @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources; + +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieAvroPayload; +import org.apache.hudi.common.model.HoodieAvroRecord; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.checkpoint.Checkpoint; +import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1; +import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; +import org.apache.hudi.common.testutils.SchemaTestUtil; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.common.util.collection.Triple; +import org.apache.hudi.config.HoodieArchivalConfig; +import org.apache.hudi.config.HoodieCleanConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; +import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics; +import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; +import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher; +import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon; +import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper; +import org.apache.hudi.utilities.sources.helpers.QueryInfo; +import org.apache.hudi.utilities.sources.helpers.QueryRunner; +import org.apache.hudi.utilities.sources.helpers.TestCloudObjectsSelectorCommon; +import org.apache.hudi.utilities.streamer.DefaultStreamContext; +import org.apache.hudi.utilities.streamer.SourceProfile; +import org.apache.hudi.utilities.streamer.SourceProfileSupplier; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.stream.Collectors; + +import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class S3EventsHoodieIncrSourceHarness extends SparkClientFunctionalTestHarness { + protected static final Schema S3_METADATA_SCHEMA = SchemaTestUtil.getSchemaFromResource( + S3EventsHoodieIncrSourceHarness.class, "/streamer-config/s3-metadata.avsc", true); + + protected ObjectMapper mapper = new ObjectMapper(); + + protected static final String MY_BUCKET = "some-bucket"; + protected static final String IGNORE_FILE_EXTENSION = ".ignore"; + + protected Option schemaProvider; + @Mock + QueryRunner mockQueryRunner; + @Mock + CloudObjectsSelectorCommon mockCloudObjectsSelectorCommon; + @Mock + SourceProfileSupplier sourceProfileSupplier; + @Mock + QueryInfo queryInfo; + @Mock + HoodieIngestionMetrics metrics; + protected JavaSparkContext jsc; + protected HoodieTableMetaClient metaClient; + + @BeforeEach + public void setUp() throws IOException { + jsc = JavaSparkContext.fromSparkContext(spark().sparkContext()); + String schemaFilePath = TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/sample_gcs_data.avsc").getPath(); + TypedProperties props = new TypedProperties(); + props.put("hoodie.streamer.schemaprovider.source.schema.file", schemaFilePath); + props.put("hoodie.streamer.schema.provider.class.name", FilebasedSchemaProvider.class.getName()); + this.schemaProvider = Option.of(new FilebasedSchemaProvider(props, jsc)); + } + + protected List getSampleS3ObjectKeys(List> filePathSizeAndCommitTime) { + return filePathSizeAndCommitTime.stream().map(f -> { + try { + return generateS3EventMetadata(f.getMiddle(), MY_BUCKET, f.getLeft(), f.getRight()); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + }).collect(Collectors.toList()); + } + + protected Dataset generateDataset(List> filePathSizeAndCommitTime) { + JavaRDD testRdd = jsc.parallelize(getSampleS3ObjectKeys(filePathSizeAndCommitTime), 2); + Dataset inputDs = spark().read().json(testRdd); + return inputDs; + } + + /** + * Generates simple Json structure like below + *

+ * s3 : { + * object : { + * size: + * key: + * } + * bucket: { + * name: + * } + */ + protected String generateS3EventMetadata(Long objectSize, String bucketName, String objectKey, String commitTime) + throws JsonProcessingException { + Map objectMetadata = new HashMap<>(); + objectMetadata.put("size", objectSize); + objectMetadata.put("key", objectKey); + Map bucketMetadata = new HashMap<>(); + bucketMetadata.put("name", bucketName); + Map s3Metadata = new HashMap<>(); + s3Metadata.put("object", objectMetadata); + s3Metadata.put("bucket", bucketMetadata); + Map eventMetadata = new HashMap<>(); + eventMetadata.put("s3", s3Metadata); + eventMetadata.put("_hoodie_commit_time", commitTime); + return mapper.writeValueAsString(eventMetadata); + } + + protected HoodieRecord generateS3EventMetadata(String commitTime, String bucketName, String objectKey, Long objectSize) { + String partitionPath = bucketName; + Schema schema = S3_METADATA_SCHEMA; + GenericRecord rec = new GenericData.Record(schema); + Schema.Field s3Field = schema.getField("s3"); + Schema s3Schema = s3Field.schema().getTypes().get(1); // Assuming the record schema is the second type + // Create a generic record for the "s3" field + GenericRecord s3Record = new GenericData.Record(s3Schema); + + Schema.Field s3BucketField = s3Schema.getField("bucket"); + Schema s3Bucket = s3BucketField.schema().getTypes().get(1); // Assuming the record schema is the second type + GenericRecord s3BucketRec = new GenericData.Record(s3Bucket); + s3BucketRec.put("name", bucketName); + + + Schema.Field s3ObjectField = s3Schema.getField("object"); + Schema s3Object = s3ObjectField.schema().getTypes().get(1); // Assuming the record schema is the second type + GenericRecord s3ObjectRec = new GenericData.Record(s3Object); + s3ObjectRec.put("key", objectKey); + s3ObjectRec.put("size", objectSize); + + s3Record.put("bucket", s3BucketRec); + s3Record.put("object", s3ObjectRec); + rec.put("s3", s3Record); + rec.put("_hoodie_commit_time", commitTime); + + HoodieAvroPayload payload = new HoodieAvroPayload(Option.of(rec)); + return new HoodieAvroRecord(new HoodieKey(objectKey, partitionPath), payload); + } + + protected TypedProperties setProps(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy) { + Properties properties = new Properties(); + properties.setProperty("hoodie.streamer.source.hoodieincr.path", basePath()); + properties.setProperty("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy", + missingCheckpointStrategy.name()); + properties.setProperty("hoodie.streamer.source.hoodieincr.file.format", "json"); + return new TypedProperties(properties); + } + + protected HoodieWriteConfig.Builder getConfigBuilder(String basePath, HoodieTableMetaClient metaClient) { + return HoodieWriteConfig.newBuilder() + .withPath(basePath) + .withSchema(S3_METADATA_SCHEMA.toString()) + .withParallelism(2, 2) + .withBulkInsertParallelism(2) + .withFinalizeWriteParallelism(2).withDeleteParallelism(2) + .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION) + .forTable(metaClient.getTableConfig().getTableName()); + } + + protected HoodieWriteConfig getWriteConfig() { + return getConfigBuilder(basePath(), metaClient) + .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build()) + .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .withMaxNumDeltaCommitsBeforeCompaction(1).build()) + .build(); + } + + protected Pair> writeS3MetadataRecords(String commitTime) throws IOException { + HoodieWriteConfig writeConfig = getWriteConfig(); + try (SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig)) { + + writeClient.startCommitWithTime(commitTime); + List s3MetadataRecords = Arrays.asList( + generateS3EventMetadata(commitTime, "bucket-1", "data-file-1.json", 1L) + ); + JavaRDD result = writeClient.upsert(jsc().parallelize(s3MetadataRecords, 1), commitTime); + + List statuses = result.collect(); + assertNoWriteErrors(statuses); + + return Pair.of(commitTime, s3MetadataRecords); + } + } + + protected void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, + Option checkpointToPull, long sourceLimit, String expectedCheckpoint, + TypedProperties typedProperties) { + S3EventsHoodieIncrSource incrSource = new S3EventsHoodieIncrSource(typedProperties, jsc(), + spark(), mockQueryRunner, new CloudDataFetcher(typedProperties, jsc(), spark(), metrics, mockCloudObjectsSelectorCommon), + new DefaultStreamContext(schemaProvider.orElse(null), Option.of(sourceProfileSupplier))); + + Pair>, Checkpoint> dataAndCheckpoint = incrSource.fetchNextBatch( + checkpointToPull.isPresent() ? Option.of(new StreamerCheckpointV1(checkpointToPull.get())) : Option.empty(), sourceLimit); + + Option> datasetOpt = dataAndCheckpoint.getLeft(); + Checkpoint nextCheckPoint = dataAndCheckpoint.getRight(); + + Assertions.assertNotNull(nextCheckPoint); + Assertions.assertEquals(new StreamerCheckpointV1(expectedCheckpoint), nextCheckPoint); + } + + protected void setMockQueryRunner(Dataset inputDs) { + setMockQueryRunner(inputDs, Option.empty()); + } + + protected void setMockQueryRunner(Dataset inputDs, Option nextCheckPointOpt) { + + when(mockQueryRunner.run(Mockito.any(QueryInfo.class), Mockito.any())).thenAnswer(invocation -> { + QueryInfo queryInfo = invocation.getArgument(0); + QueryInfo updatedQueryInfo = nextCheckPointOpt.map(nextCheckPoint -> + queryInfo.withUpdatedEndInstant(nextCheckPoint)) + .orElse(queryInfo); + if (updatedQueryInfo.isSnapshot()) { + return Pair.of(updatedQueryInfo, + inputDs.filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, + updatedQueryInfo.getStartInstant())) + .filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, + updatedQueryInfo.getEndInstant()))); + } + return Pair.of(updatedQueryInfo, inputDs); + }); + } + + protected void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, + Option checkpointToPull, long sourceLimit, String expectedCheckpoint) { + TypedProperties typedProperties = setProps(missingCheckpointStrategy); + + readAndAssert(missingCheckpointStrategy, checkpointToPull, sourceLimit, expectedCheckpoint, typedProperties); + } + + static class TestSourceProfile implements SourceProfile { + private final long maxSourceBytes; + private final int sourcePartitions; + private final long bytesPerPartition; + + public TestSourceProfile(long maxSourceBytes, int sourcePartitions, long bytesPerPartition) { + this.maxSourceBytes = maxSourceBytes; + this.sourcePartitions = sourcePartitions; + this.bytesPerPartition = bytesPerPartition; + } + + @Override + public long getMaxSourceBytes() { + return maxSourceBytes; + } + + @Override + public int getSourcePartitions() { + return sourcePartitions; + } + + @Override + public Long getSourceSpecificContext() { + return bytesPerPartition; + } + } + +} \ No newline at end of file diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java index 8b5ab0bddc9e8..e66018f9365db 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java @@ -44,12 +44,12 @@ import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.schema.SchemaProvider; -import org.apache.hudi.utilities.sources.TestS3EventsHoodieIncrSource.TestSourceProfile; import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher; import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon; import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper; import org.apache.hudi.utilities.sources.helpers.QueryInfo; import org.apache.hudi.utilities.sources.helpers.QueryRunner; +import org.apache.hudi.utilities.sources.S3EventsHoodieIncrSourceHarness.TestSourceProfile; import org.apache.hudi.utilities.streamer.DefaultStreamContext; import org.apache.hudi.utilities.streamer.SourceProfileSupplier; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java index e07ff5022de52..4218aaca26d94 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java @@ -18,49 +18,18 @@ package org.apache.hudi.utilities.sources; -import org.apache.hudi.client.SparkRDDWriteClient; -import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.TypedProperties; -import org.apache.hudi.common.model.HoodieAvroPayload; -import org.apache.hudi.common.model.HoodieAvroRecord; -import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.checkpoint.Checkpoint; -import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1; import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2; -import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; -import org.apache.hudi.common.testutils.SchemaTestUtil; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Triple; -import org.apache.hudi.config.HoodieArchivalConfig; -import org.apache.hudi.config.HoodieCleanConfig; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.config.CloudSourceConfig; -import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics; -import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; -import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher; -import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon; -import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper; -import org.apache.hudi.utilities.sources.helpers.QueryInfo; -import org.apache.hudi.utilities.sources.helpers.QueryRunner; -import org.apache.hudi.utilities.sources.helpers.TestCloudObjectsSelectorCommon; import org.apache.hudi.utilities.streamer.DefaultStreamContext; import org.apache.hudi.utilities.streamer.SourceProfile; -import org.apache.hudi.utilities.streamer.SourceProfileSupplier; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericRecord; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; + import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.junit.jupiter.api.Assertions; @@ -71,20 +40,14 @@ import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; -import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.stream.Collectors; -import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -93,158 +56,12 @@ import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) -public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarness { - private static final Schema S3_METADATA_SCHEMA = SchemaTestUtil.getSchemaFromResource( - TestS3EventsHoodieIncrSource.class, "/streamer-config/s3-metadata.avsc", true); - - private ObjectMapper mapper = new ObjectMapper(); - - private static final String MY_BUCKET = "some-bucket"; - private static final String IGNORE_FILE_EXTENSION = ".ignore"; - - private Option schemaProvider; - @Mock - QueryRunner mockQueryRunner; - @Mock - CloudObjectsSelectorCommon mockCloudObjectsSelectorCommon; - @Mock - SourceProfileSupplier sourceProfileSupplier; - @Mock - QueryInfo queryInfo; - @Mock - HoodieIngestionMetrics metrics; - private JavaSparkContext jsc; - private HoodieTableMetaClient metaClient; +public class TestS3EventsHoodieIncrSource extends S3EventsHoodieIncrSourceHarness { @BeforeEach public void setUp() throws IOException { - jsc = JavaSparkContext.fromSparkContext(spark().sparkContext()); + super.setUp(); metaClient = getHoodieMetaClient(storageConf(), basePath()); - String schemaFilePath = TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/sample_gcs_data.avsc").getPath(); - TypedProperties props = new TypedProperties(); - props.put("hoodie.streamer.schemaprovider.source.schema.file", schemaFilePath); - props.put("hoodie.streamer.schema.provider.class.name", FilebasedSchemaProvider.class.getName()); - this.schemaProvider = Option.of(new FilebasedSchemaProvider(props, jsc)); - } - - private List getSampleS3ObjectKeys(List> filePathSizeAndCommitTime) { - return filePathSizeAndCommitTime.stream().map(f -> { - try { - return generateS3EventMetadata(f.getMiddle(), MY_BUCKET, f.getLeft(), f.getRight()); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - }).collect(Collectors.toList()); - } - - private Dataset generateDataset(List> filePathSizeAndCommitTime) { - JavaRDD testRdd = jsc.parallelize(getSampleS3ObjectKeys(filePathSizeAndCommitTime), 2); - Dataset inputDs = spark().read().json(testRdd); - return inputDs; - } - - /** - * Generates simple Json structure like below - *

- * s3 : { - * object : { - * size: - * key: - * } - * bucket: { - * name: - * } - */ - private String generateS3EventMetadata(Long objectSize, String bucketName, String objectKey, String commitTime) - throws JsonProcessingException { - Map objectMetadata = new HashMap<>(); - objectMetadata.put("size", objectSize); - objectMetadata.put("key", objectKey); - Map bucketMetadata = new HashMap<>(); - bucketMetadata.put("name", bucketName); - Map s3Metadata = new HashMap<>(); - s3Metadata.put("object", objectMetadata); - s3Metadata.put("bucket", bucketMetadata); - Map eventMetadata = new HashMap<>(); - eventMetadata.put("s3", s3Metadata); - eventMetadata.put("_hoodie_commit_time", commitTime); - return mapper.writeValueAsString(eventMetadata); - } - - private HoodieRecord generateS3EventMetadata(String commitTime, String bucketName, String objectKey, Long objectSize) { - String partitionPath = bucketName; - Schema schema = S3_METADATA_SCHEMA; - GenericRecord rec = new GenericData.Record(schema); - Schema.Field s3Field = schema.getField("s3"); - Schema s3Schema = s3Field.schema().getTypes().get(1); // Assuming the record schema is the second type - // Create a generic record for the "s3" field - GenericRecord s3Record = new GenericData.Record(s3Schema); - - Schema.Field s3BucketField = s3Schema.getField("bucket"); - Schema s3Bucket = s3BucketField.schema().getTypes().get(1); // Assuming the record schema is the second type - GenericRecord s3BucketRec = new GenericData.Record(s3Bucket); - s3BucketRec.put("name", bucketName); - - - Schema.Field s3ObjectField = s3Schema.getField("object"); - Schema s3Object = s3ObjectField.schema().getTypes().get(1); // Assuming the record schema is the second type - GenericRecord s3ObjectRec = new GenericData.Record(s3Object); - s3ObjectRec.put("key", objectKey); - s3ObjectRec.put("size", objectSize); - - s3Record.put("bucket", s3BucketRec); - s3Record.put("object", s3ObjectRec); - rec.put("s3", s3Record); - rec.put("_hoodie_commit_time", commitTime); - - HoodieAvroPayload payload = new HoodieAvroPayload(Option.of(rec)); - return new HoodieAvroRecord(new HoodieKey(objectKey, partitionPath), payload); - } - - private TypedProperties setProps(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy) { - Properties properties = new Properties(); - properties.setProperty("hoodie.streamer.source.hoodieincr.path", basePath()); - properties.setProperty("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy", - missingCheckpointStrategy.name()); - properties.setProperty("hoodie.streamer.source.hoodieincr.file.format", "json"); - return new TypedProperties(properties); - } - - private HoodieWriteConfig.Builder getConfigBuilder(String basePath, HoodieTableMetaClient metaClient) { - return HoodieWriteConfig.newBuilder() - .withPath(basePath) - .withSchema(S3_METADATA_SCHEMA.toString()) - .withParallelism(2, 2) - .withBulkInsertParallelism(2) - .withFinalizeWriteParallelism(2).withDeleteParallelism(2) - .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION) - .forTable(metaClient.getTableConfig().getTableName()); - } - - private HoodieWriteConfig getWriteConfig() { - return getConfigBuilder(basePath(), metaClient) - .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build()) - .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build()) - .withMetadataConfig(HoodieMetadataConfig.newBuilder() - .withMaxNumDeltaCommitsBeforeCompaction(1).build()) - .build(); - } - - private Pair> writeS3MetadataRecords(String commitTime) throws IOException { - HoodieWriteConfig writeConfig = getWriteConfig(); - try (SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig)) { - - writeClient.startCommitWithTime(commitTime); - List s3MetadataRecords = Arrays.asList( - generateS3EventMetadata(commitTime, "bucket-1", "data-file-1.json", 1L) - ); - JavaRDD result = writeClient.upsert(jsc().parallelize(s3MetadataRecords, 1), commitTime); - - List statuses = result.collect(); - assertNoWriteErrors(statuses); - - return Pair.of(commitTime, s3MetadataRecords); - } } @Test @@ -553,77 +370,4 @@ public void testCreateSource() throws IOException { new DefaultStreamContext(schemaProvider.orElse(null), Option.of(sourceProfileSupplier))); assertEquals(Source.SourceType.ROW, s3Source.getSourceType()); } - - private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, - Option checkpointToPull, long sourceLimit, String expectedCheckpoint, - TypedProperties typedProperties) { - S3EventsHoodieIncrSource incrSource = new S3EventsHoodieIncrSource(typedProperties, jsc(), - spark(), mockQueryRunner, new CloudDataFetcher(typedProperties, jsc(), spark(), metrics, mockCloudObjectsSelectorCommon), - new DefaultStreamContext(schemaProvider.orElse(null), Option.of(sourceProfileSupplier))); - - Pair>, Checkpoint> dataAndCheckpoint = incrSource.fetchNextBatch( - checkpointToPull.isPresent() ? Option.of(new StreamerCheckpointV1(checkpointToPull.get())) : Option.empty(), sourceLimit); - - Option> datasetOpt = dataAndCheckpoint.getLeft(); - Checkpoint nextCheckPoint = dataAndCheckpoint.getRight(); - - Assertions.assertNotNull(nextCheckPoint); - Assertions.assertEquals(new StreamerCheckpointV1(expectedCheckpoint), nextCheckPoint); - } - - private void setMockQueryRunner(Dataset inputDs) { - setMockQueryRunner(inputDs, Option.empty()); - } - - private void setMockQueryRunner(Dataset inputDs, Option nextCheckPointOpt) { - - when(mockQueryRunner.run(Mockito.any(QueryInfo.class), Mockito.any())).thenAnswer(invocation -> { - QueryInfo queryInfo = invocation.getArgument(0); - QueryInfo updatedQueryInfo = nextCheckPointOpt.map(nextCheckPoint -> - queryInfo.withUpdatedEndInstant(nextCheckPoint)) - .orElse(queryInfo); - if (updatedQueryInfo.isSnapshot()) { - return Pair.of(updatedQueryInfo, - inputDs.filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, - updatedQueryInfo.getStartInstant())) - .filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, - updatedQueryInfo.getEndInstant()))); - } - return Pair.of(updatedQueryInfo, inputDs); - }); - } - - private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, - Option checkpointToPull, long sourceLimit, String expectedCheckpoint) { - TypedProperties typedProperties = setProps(missingCheckpointStrategy); - - readAndAssert(missingCheckpointStrategy, checkpointToPull, sourceLimit, expectedCheckpoint, typedProperties); - } - - static class TestSourceProfile implements SourceProfile { - private final long maxSourceBytes; - private final int sourcePartitions; - private final long bytesPerPartition; - - public TestSourceProfile(long maxSourceBytes, int sourcePartitions, long bytesPerPartition) { - this.maxSourceBytes = maxSourceBytes; - this.sourcePartitions = sourcePartitions; - this.bytesPerPartition = bytesPerPartition; - } - - @Override - public long getMaxSourceBytes() { - return maxSourceBytes; - } - - @Override - public int getSourcePartitions() { - return sourcePartitions; - } - - @Override - public Long getSourceSpecificContext() { - return bytesPerPartition; - } - } } \ No newline at end of file diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestS3GcsEventsHoodieIncrSourceE2ECkpVersion.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestS3GcsEventsHoodieIncrSourceE2ECkpVersion.java new file mode 100644 index 0000000000000..1ff334e07979b --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestS3GcsEventsHoodieIncrSourceE2ECkpVersion.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.streamer; + +import org.apache.hudi.common.config.DFSPropertiesConfiguration; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.testutils.HoodieClientTestUtils; +import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer; +import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerTestBase; +import org.apache.hudi.utilities.sources.S3EventsHoodieIncrSourceHarness; + +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1; +import static org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1.STREAMER_CHECKPOINT_RESET_KEY_V1; +import static org.apache.hudi.config.HoodieWriteConfig.WRITE_TABLE_VERSION; +import static org.apache.hudi.utilities.config.HoodieStreamerConfig.CHECKPOINT_FORCE_SKIP; +import static org.apache.hudi.utilities.sources.MockS3EventsHoodieIncrSource.CUSTOM_CHECKPOINT1; +import static org.apache.hudi.utilities.sources.MockS3EventsHoodieIncrSource.CUSTOM_CHECKPOINT2; +import static org.apache.hudi.utilities.sources.MockS3EventsHoodieIncrSource.OP_EMPTY_ROW_SET_NONE_NULL_CKP1_KEY; +import static org.apache.hudi.utilities.sources.MockS3EventsHoodieIncrSource.OP_EMPTY_ROW_SET_NONE_NULL_CKP2_KEY; +import static org.apache.hudi.utilities.sources.MockS3EventsHoodieIncrSource.OP_EMPTY_ROW_SET_NULL_CKP_KEY; +import static org.apache.hudi.utilities.sources.MockS3EventsHoodieIncrSource.OP_FETCH_NEXT_BATCH; +import static org.apache.hudi.utilities.sources.MockS3EventsHoodieIncrSource.VAL_CKP_IGNORE_KEY_IS_NULL; +import static org.apache.hudi.utilities.sources.MockS3EventsHoodieIncrSource.VAL_CKP_KEY_EQ_VAL; +import static org.apache.hudi.utilities.sources.MockS3EventsHoodieIncrSource.VAL_CKP_RESET_KEY_IS_NULL; +import static org.apache.hudi.utilities.sources.MockS3EventsHoodieIncrSource.VAL_EMPTY_CKP_KEY; +import static org.apache.hudi.utilities.sources.MockS3EventsHoodieIncrSource.VAL_INPUT_CKP; +import static org.apache.hudi.utilities.sources.MockS3EventsHoodieIncrSource.VAL_NON_EMPTY_CKP_ALL_MEMBERS; +import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; + +@ExtendWith(MockitoExtension.class) +public class TestS3GcsEventsHoodieIncrSourceE2ECkpVersion extends S3EventsHoodieIncrSourceHarness { + + private String toggleVersion(String version) { + return "8".equals(version) ? "6" : "8"; + } + + private HoodieDeltaStreamer.Config createConfig(String basePath, String sourceCheckpoint) { + HoodieDeltaStreamer.Config cfg = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig( + basePath, + WriteOperationType.INSERT, + "org.apache.hudi.utilities.sources.MockS3EventsHoodieIncrSource", + Collections.emptyList(), + sourceCheckpoint != null ? DFSPropertiesConfiguration.DEFAULT_PATH.toString() : null, + false, + false, + 100000, + false, + null, + null, + "timestamp", + sourceCheckpoint); + cfg.propsFilePath = DFSPropertiesConfiguration.DEFAULT_PATH.toString(); + return cfg; + } + + private TypedProperties setupBaseProperties(String tableVersion) { + TypedProperties props = setProps(READ_UPTO_LATEST_COMMIT); + props.put(WRITE_TABLE_VERSION.key(), tableVersion); + return props; + } + + public void verifyLastInstantCommitMetadata(Map expectedMetadata) { + metaClient.reloadActiveTimeline(); + Option metadata = HoodieClientTestUtils.getCommitMetadataForInstant( + metaClient, metaClient.getActiveTimeline().lastInstant().get()); + assertFalse(metadata.isEmpty()); + assertEquals(metadata.get().getExtraMetadata(), expectedMetadata); + } + + @ParameterizedTest + @ValueSource(strings = {"6", "8"}) + public void testSyncE2ENoPrevCkpThenSyncTwice(String tableVersion) throws Exception { + // First start with no previous checkpoint and ingest till ckp 1 with table version. + // Disable auto upgrade and MDT as we want to keep things as it is. + metaClient = getHoodieMetaClientWithTableVersion(storageConf(), basePath(), tableVersion); + TypedProperties props = setupBaseProperties(tableVersion); + // Dummy behavior injection to return ckp 1. + props.put(OP_FETCH_NEXT_BATCH, OP_EMPTY_ROW_SET_NONE_NULL_CKP1_KEY); + // Validating the source input ckp is empty when doing the sync. + props.put(VAL_INPUT_CKP, VAL_EMPTY_CKP_KEY); + props.put("hoodie.metadata.enable", "false"); + props.put("hoodie.write.auto.upgrade", "false"); + + HoodieDeltaStreamer ds = new HoodieDeltaStreamer(createConfig(basePath(), null), jsc, Option.of(props)); + ds.sync(); + + Map expectedMetadata = new HashMap<>(); + expectedMetadata.put("schema", ""); + expectedMetadata.put(STREAMER_CHECKPOINT_KEY_V1, CUSTOM_CHECKPOINT1); + verifyLastInstantCommitMetadata(expectedMetadata); + + // Then resume from ckp 1 and ingest till ckp 2 with table version Y. + // Disable auto upgrade and MDT as we want to keep things as it is. + props = setupBaseProperties(toggleVersion(tableVersion)); + props.put("hoodie.metadata.enable", "false"); + // Dummy behavior injection to return ckp 2. + props.put(OP_FETCH_NEXT_BATCH, OP_EMPTY_ROW_SET_NONE_NULL_CKP2_KEY); + props.put("hoodie.write.auto.upgrade", "false"); + // Validate the given checkpoint is ckp 1 when doing the sync. + props.put(VAL_INPUT_CKP, VAL_NON_EMPTY_CKP_ALL_MEMBERS); + props.put(VAL_CKP_KEY_EQ_VAL, CUSTOM_CHECKPOINT1); + props.put(VAL_CKP_RESET_KEY_IS_NULL, "IGNORED"); + props.put(VAL_CKP_IGNORE_KEY_IS_NULL, "IGNORED"); + + ds = new HoodieDeltaStreamer(createConfig(basePath(), CUSTOM_CHECKPOINT1), jsc, Option.of(props)); + ds.sync(); + + // We do not allow table version 8 and ingest with version 6 delta streamer. But not table with version 6 + // and delta streamer with version 8. + expectedMetadata = new HashMap<>(); + expectedMetadata.put("schema", ""); + expectedMetadata.put(STREAMER_CHECKPOINT_KEY_V1, CUSTOM_CHECKPOINT2); + expectedMetadata.put(STREAMER_CHECKPOINT_RESET_KEY_V1, CUSTOM_CHECKPOINT1); + verifyLastInstantCommitMetadata(expectedMetadata); + + // In the third round, enable MDT and auto upgrade, use table version 8 + props = setupBaseProperties("8"); + props.put("hoodie.metadata.enable", "false"); + // Dummy behavior injection to return ckp 2. + props.put(OP_FETCH_NEXT_BATCH, OP_EMPTY_ROW_SET_NONE_NULL_CKP1_KEY); + props.put("hoodie.write.auto.upgrade", "false"); + // Validate the given checkpoint is ckp 2 when doing the sync. + props.put(VAL_INPUT_CKP, VAL_NON_EMPTY_CKP_ALL_MEMBERS); + props.put(VAL_CKP_KEY_EQ_VAL, CUSTOM_CHECKPOINT2); + props.put(VAL_CKP_RESET_KEY_IS_NULL, "IGNORED"); + props.put(VAL_CKP_IGNORE_KEY_IS_NULL, "IGNORED"); + + ds = new HoodieDeltaStreamer(createConfig(basePath(), CUSTOM_CHECKPOINT2), jsc, Option.of(props)); + ds.sync(); + + // After upgrading, we still use checkpoint V1 since this is s3/Gcs incremental source. + expectedMetadata = new HashMap<>(); + expectedMetadata.put("schema", ""); + expectedMetadata.put(STREAMER_CHECKPOINT_KEY_V1, CUSTOM_CHECKPOINT1); + expectedMetadata.put(STREAMER_CHECKPOINT_RESET_KEY_V1, CUSTOM_CHECKPOINT2); + verifyLastInstantCommitMetadata(expectedMetadata); + } + + @ParameterizedTest + @ValueSource(strings = {"6", "8"}) + public void testSyncE2ENoNextCkpNoPrevCkp(String tableVersion) throws Exception { + TypedProperties props = setupBaseProperties(tableVersion); + props.put(OP_FETCH_NEXT_BATCH, OP_EMPTY_ROW_SET_NULL_CKP_KEY); + props.put(VAL_INPUT_CKP, VAL_EMPTY_CKP_KEY); + + HoodieDeltaStreamer.Config cfg = createConfig(basePath(), null); + cfg.allowCommitOnNoCheckpointChange = true; + HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc, Option.of(props)); + ds.sync(); + + Map expectedMetadata = new HashMap<>(); + expectedMetadata.put("schema", ""); + verifyLastInstantCommitMetadata(expectedMetadata); + } + + @ParameterizedTest + @ValueSource(strings = {"6", "8"}) + public void testSyncE2ENoNextCkpHasPrevCkp(String tableVersion) throws Exception { + String previousCkp = "previousCkp"; + TypedProperties props = setupBaseProperties(tableVersion); + props.put(VAL_INPUT_CKP, VAL_NON_EMPTY_CKP_ALL_MEMBERS); + props.put(VAL_CKP_KEY_EQ_VAL, previousCkp); + props.put(VAL_CKP_RESET_KEY_IS_NULL, ""); + props.put(VAL_CKP_IGNORE_KEY_IS_NULL, ""); + props.put(OP_FETCH_NEXT_BATCH, OP_EMPTY_ROW_SET_NULL_CKP_KEY); + + HoodieDeltaStreamer.Config cfg = createConfig(basePath(), previousCkp); + cfg.allowCommitOnNoCheckpointChange = true; + HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc, Option.of(props)); + ds.sync(); + + Map expectedMetadata = new HashMap<>(); + expectedMetadata.put("schema", ""); + expectedMetadata.put(STREAMER_CHECKPOINT_RESET_KEY_V1, previousCkp); + verifyLastInstantCommitMetadata(expectedMetadata); + } + + @ParameterizedTest + @ValueSource(strings = {"6", "8"}) + public void testSyncE2EForceSkip(String tableVersion) throws Exception { + TypedProperties props = setupBaseProperties(tableVersion); + props.put(CHECKPOINT_FORCE_SKIP.key(), "true"); + props.put(OP_FETCH_NEXT_BATCH, OP_EMPTY_ROW_SET_NONE_NULL_CKP1_KEY); + props.put(VAL_INPUT_CKP, VAL_EMPTY_CKP_KEY); + + HoodieDeltaStreamer ds = new HoodieDeltaStreamer(createConfig(basePath(), null), jsc, Option.of(props)); + ds.sync(); + + Map expectedMetadata = new HashMap<>(); + expectedMetadata.put("schema", ""); + verifyLastInstantCommitMetadata(expectedMetadata); + } +} \ No newline at end of file diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSync.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSync.java index fc66cb6d2bdc8..832c655904bbf 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSync.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSync.java @@ -25,12 +25,14 @@ import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; +import org.apache.hudi.common.table.checkpoint.Checkpoint; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieErrorTableConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.hadoop.HoodieHadoopStorage; +import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.InputBatch; import org.apache.hudi.utilities.transform.Transformer; @@ -51,10 +53,14 @@ import java.io.IOException; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Stream; +import static org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_RESET_KEY_V2; import static org.apache.hudi.config.HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA; +import static org.apache.hudi.utilities.config.HoodieStreamerConfig.CHECKPOINT_FORCE_SKIP; import static org.apache.hudi.utilities.streamer.HoodieStreamer.CHECKPOINT_KEY; import static org.apache.hudi.utilities.streamer.HoodieStreamer.CHECKPOINT_RESET_KEY; import static org.apache.hudi.utilities.streamer.StreamSync.CHECKPOINT_IGNORE_KEY; @@ -72,7 +78,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -public class TestStreamSync { +public class TestStreamSync extends SparkClientFunctionalTestHarness { @ParameterizedTest @MethodSource("testCasesFetchNextBatchFromSource") @@ -149,32 +155,6 @@ void testFetchNextBatchFromSource(Boolean useRowWriter, Boolean hasTransformer, HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.defaultValue()); } - @ParameterizedTest - @MethodSource("getCheckpointToResumeCases") - void testGetCheckpointToResume(HoodieStreamer.Config cfg, HoodieCommitMetadata commitMetadata, Option expectedResumeCheckpoint) throws IOException { - // TODO(yihua): rewrite this tests - /* - HoodieSparkEngineContext hoodieSparkEngineContext = mock(HoodieSparkEngineContext.class); - HoodieStorage storage = new HoodieHadoopStorage(mock(FileSystem.class)); - TypedProperties props = new TypedProperties(); - SparkSession sparkSesion = mock(SparkSession.class); - Configuration configuration = mock(Configuration.class); - HoodieTimeline commitsTimeline = mock(HoodieTimeline.class); - HoodieInstant hoodieInstant = mock(HoodieInstant.class); - - when(commitsTimeline.filter(any())).thenReturn(commitsTimeline); - when(commitsTimeline.lastInstant()).thenReturn(Option.of(hoodieInstant)); - - StreamSync streamSync = new StreamSync(cfg, sparkSession, props, hoodieSparkEngineContext, - storage, configuration, client -> true, null, Option.empty(), null, Option.empty(), true); - StreamSync spy = spy(streamSync); - //doReturn(Option.of(commitMetadata)).when(spy).getLatestCommitMetadataWithValidCheckpointInfo(any()); - - Option resumeCheckpoint = CheckpointUtils.getCheckpointToResumeFrom(Option.of(commitsTimeline), cfg, props); - assertEquals(expectedResumeCheckpoint, resumeCheckpoint); - */ - } - @ParameterizedTest @MethodSource("getMultiTableStreamerCases") void testCloneConfigsFromMultiTableStreamer(HoodieMultiTableStreamer.Config cfg) throws IOException { @@ -319,4 +299,94 @@ public void testInitializeEmptyTable() throws IOException { // then verify(tableBuilder, times(1)).setTableVersion(HoodieTableVersion.SIX); } + + private StreamSync setupStreamSync() { + HoodieStreamer.Config cfg = new HoodieStreamer.Config(); + cfg.checkpoint = "test-checkpoint"; + cfg.ignoreCheckpoint = "test-ignore"; + cfg.sourceClassName = "test-source"; + + TypedProperties props = new TypedProperties(); + SchemaProvider schemaProvider = mock(SchemaProvider.class); + + return new StreamSync(cfg, mock(SparkSession.class), props, + mock(HoodieSparkEngineContext.class), mock(HoodieStorage.class), + mock(Configuration.class), client -> true, schemaProvider, + Option.empty(), mock(SourceFormatAdapter.class), Option.empty(), false); + } + + @Test + public void testExtractCheckpointMetadata_WhenForceSkipIsTrue() { + StreamSync streamSync = setupStreamSync(); + HoodieStreamer.Config cfg = new HoodieStreamer.Config(); + TypedProperties props = new TypedProperties(); + props.put(CHECKPOINT_FORCE_SKIP.key(), "true"); + + Map result = streamSync.extractCheckpointMetadata( + mock(InputBatch.class), props, HoodieTableVersion.ZERO.versionCode(), cfg); + + assertTrue(result.isEmpty(), "Should return empty map when CHECKPOINT_FORCE_SKIP is true"); + } + + @Test + public void testExtractCheckpointMetadata_WhenCheckpointExists() { + StreamSync streamSync = setupStreamSync(); + HoodieStreamer.Config cfg = new HoodieStreamer.Config(); + TypedProperties props = new TypedProperties(); + props.put(CHECKPOINT_FORCE_SKIP.key(), "false"); + + InputBatch inputBatch = mock(InputBatch.class); + Checkpoint checkpoint = mock(Checkpoint.class); + Map expectedMetadata = new HashMap<>(); + expectedMetadata.put("test", "value"); + + when(inputBatch.getCheckpointForNextBatch()).thenReturn(checkpoint); + when(checkpoint.getCheckpointCommitMetadata(cfg.checkpoint, cfg.ignoreCheckpoint)) + .thenReturn(expectedMetadata); + + Map result = streamSync.extractCheckpointMetadata( + inputBatch, props, HoodieTableVersion.ZERO.versionCode(), cfg); + + assertEquals(expectedMetadata, result, "Should return checkpoint metadata when checkpoint exists"); + } + + @Test + public void testExtractCheckpointMetadata_WhenCheckpointIsNullV2() { + StreamSync streamSync = setupStreamSync(); + HoodieStreamer.Config cfg = new HoodieStreamer.Config(); + cfg.checkpoint = "test-checkpoint"; + cfg.ignoreCheckpoint = "test-ignore"; + TypedProperties props = new TypedProperties(); + + InputBatch inputBatch = mock(InputBatch.class); + when(inputBatch.getCheckpointForNextBatch()).thenReturn(null); + + Map result = streamSync.extractCheckpointMetadata( + inputBatch, props, HoodieTableVersion.EIGHT.versionCode(), cfg); + + Map expected = new HashMap<>(); + expected.put(CHECKPOINT_IGNORE_KEY, "test-ignore"); + expected.put(STREAMER_CHECKPOINT_RESET_KEY_V2, "test-checkpoint"); + assertEquals(expected, result, "Should return default metadata when checkpoint is null"); + } + + @Test + public void testExtractCheckpointMetadata_WhenCheckpointIsNullV1() { + StreamSync streamSync = setupStreamSync(); + HoodieStreamer.Config cfg = new HoodieStreamer.Config(); + cfg.checkpoint = "test-checkpoint"; + cfg.ignoreCheckpoint = "test-ignore"; + TypedProperties props = new TypedProperties(); + + InputBatch inputBatch = mock(InputBatch.class); + when(inputBatch.getCheckpointForNextBatch()).thenReturn(null); + + Map result = streamSync.extractCheckpointMetadata( + inputBatch, props, HoodieTableVersion.SIX.versionCode(), cfg); + + Map expected = new HashMap<>(); + expected.put(CHECKPOINT_IGNORE_KEY, "test-ignore"); + expected.put(CHECKPOINT_RESET_KEY, "test-checkpoint"); + assertEquals(expected, result, "Should return default metadata when checkpoint is null"); + } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamerCheckpointUtils.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamerCheckpointUtils.java new file mode 100644 index 0000000000000..113f8e3f8d899 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamerCheckpointUtils.java @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.streamer; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.checkpoint.Checkpoint; +import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1; +import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.versioning.v2.InstantComparatorV2; +import org.apache.hudi.common.testutils.HoodieCommonTestHarness; +import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; +import org.apache.hudi.utilities.exception.HoodieStreamerException; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.hudi.utilities.streamer.HoodieStreamer.CHECKPOINT_KEY; +import static org.apache.hudi.utilities.streamer.StreamSync.CHECKPOINT_IGNORE_KEY; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@ExtendWith(MockitoExtension.class) +public class TestStreamerCheckpointUtils extends SparkClientFunctionalTestHarness { + private TypedProperties props; + private HoodieStreamer.Config streamerConfig; + protected HoodieTableMetaClient metaClient; + + @BeforeEach + public void setUp() throws IOException { + metaClient = HoodieTestUtils.init(basePath(), HoodieTableType.COPY_ON_WRITE); + props = new TypedProperties(); + streamerConfig = new HoodieStreamer.Config(); + streamerConfig.tableType = HoodieTableType.COPY_ON_WRITE.name(); + } + + @Test + public void testEmptyTimelineCase() throws IOException { + Option checkpoint = StreamerCheckpointUtils.getCheckpointToResumeString( + metaClient.getActiveTimeline(), streamerConfig, props); + assertTrue(checkpoint.isEmpty()); + } + + @Test + public void testIgnoreCheckpointCaseEmptyIgnoreKey() throws IOException { + String commitTime = "20240120000000"; + Map extraMetadata = new HashMap<>(); + extraMetadata.put(CHECKPOINT_KEY, "ckp_key"); + extraMetadata.put(CHECKPOINT_IGNORE_KEY, ""); + createCommit(commitTime, extraMetadata); + + streamerConfig.ignoreCheckpoint = "ignore_checkpoint_1"; + props.setProperty(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "2"); + + Option checkpoint = StreamerCheckpointUtils.getCheckpointToResumeString( + metaClient.getActiveTimeline(), streamerConfig, props); + assertTrue(checkpoint.isEmpty()); + } + + @Test + public void testIgnoreCheckpointCaseIgnoreKeyMismatch() throws IOException { + String commitTime = "20240120000000"; + Map extraMetadata = new HashMap<>(); + extraMetadata.put(CHECKPOINT_KEY, "ckp_key"); + extraMetadata.put(CHECKPOINT_IGNORE_KEY, "ignore_checkpoint_2"); + createCommit(commitTime, extraMetadata); + + streamerConfig.ignoreCheckpoint = "ignore_checkpoint_1"; + props.setProperty(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "2"); + + Option checkpoint = StreamerCheckpointUtils.getCheckpointToResumeString( + metaClient.getActiveTimeline(), streamerConfig, props); + assertTrue(checkpoint.isEmpty()); + } + + @Test + public void testThrowExceptionCase() throws IOException { + String commitTime = "20240120000000"; + Map extraMetadata = new HashMap<>(); + extraMetadata.put(CHECKPOINT_KEY, ""); + extraMetadata.put(HoodieStreamer.CHECKPOINT_RESET_KEY, "old-reset-key"); + createCommit(commitTime, extraMetadata); + + props.setProperty(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "2"); + + HoodieStreamerException exception = assertThrows(HoodieStreamerException.class, () -> { + StreamerCheckpointUtils.getCheckpointToResumeString( + metaClient.getActiveTimeline(), streamerConfig, props); + }); + assertTrue(exception.getMessage().contains("Unable to find previous checkpoint")); + } + + @Test + public void testNewCheckpointV2WithResetKeyCase() throws IOException { + String commitTime = "0000000000"; + Map extraMetadata = new HashMap<>(); + extraMetadata.put(CHECKPOINT_KEY, ""); + extraMetadata.put(HoodieStreamer.CHECKPOINT_RESET_KEY, "old-reset-key"); + createCommit(commitTime, extraMetadata); + + streamerConfig.checkpoint = "earliest"; + streamerConfig.sourceClassName = "org.apache.hudi.utilities.sources.KafkaSource"; + props.setProperty(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "2"); + + Option checkpoint = StreamerCheckpointUtils.getCheckpointToResumeString( + metaClient.getActiveTimeline(), streamerConfig, props); + assertTrue(checkpoint.get() instanceof StreamerCheckpointV1); + assertEquals("earliest", checkpoint.get().getCheckpointKey()); + } + + @Test + public void testNewCheckpointV1WithResetKeyCase() throws IOException { + String commitTime = "20240120000000"; + Map extraMetadata = new HashMap<>(); + extraMetadata.put(HoodieStreamer.CHECKPOINT_RESET_KEY, "old-reset-key"); + createCommit(commitTime, extraMetadata); + + streamerConfig.checkpoint = "earliest"; + streamerConfig.sourceClassName = "org.apache.hudi.utilities.sources.KafkaSource"; + props.setProperty(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "1"); + + Option checkpoint = StreamerCheckpointUtils.getCheckpointToResumeString( + metaClient.getActiveTimeline(), streamerConfig, props); + assertTrue(checkpoint.get() instanceof StreamerCheckpointV1); + assertEquals("earliest", checkpoint.get().getCheckpointKey()); + } + + @Test + public void testReuseCheckpointCase() throws IOException { + String commitTime = "20240120000000"; + Map extraMetadata = new HashMap<>(); + extraMetadata.put(CHECKPOINT_KEY, "earliest-0-100"); + extraMetadata.put(CHECKPOINT_IGNORE_KEY, ""); + extraMetadata.put(HoodieStreamer.CHECKPOINT_RESET_KEY, ""); + createCommit(commitTime, extraMetadata); + + props.setProperty(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "2"); + + Option checkpoint = StreamerCheckpointUtils.getCheckpointToResumeString( + metaClient.getActiveTimeline(), streamerConfig, props); + assertEquals("earliest-0-100", checkpoint.get().getCheckpointKey()); + } + + public void testNewCheckpointV2NoMetadataCase() throws IOException { + String commitTime = "20240120000000"; + Map extraMetadata = new HashMap<>(); + createCommit(commitTime, extraMetadata); + + streamerConfig.checkpoint = "earliest"; + streamerConfig.sourceClassName = "org.apache.hudi.utilities.sources.KafkaSource"; + props.setProperty(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "2"); + + Option checkpoint = StreamerCheckpointUtils.getCheckpointToResumeString( + metaClient.getActiveTimeline(), streamerConfig, props); + assertTrue(checkpoint.get() instanceof StreamerCheckpointV2); + assertEquals("earliest", checkpoint.get().getCheckpointKey()); + } + + @Test + public void testNewCheckpointV1NoMetadataCase() throws IOException { + String commitTime = "20240120000000"; + Map extraMetadata = new HashMap<>(); + createCommit(commitTime, extraMetadata); + + streamerConfig.checkpoint = "earliest"; + streamerConfig.sourceClassName = "org.apache.hudi.utilities.sources.KafkaSource"; + props.setProperty(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "1"); + + Option checkpoint = StreamerCheckpointUtils.getCheckpointToResumeString( + metaClient.getActiveTimeline(), streamerConfig, props); + assertTrue(checkpoint.get() instanceof StreamerCheckpointV1); + assertEquals("earliest", checkpoint.get().getCheckpointKey()); + } + + private void createCommit(String commitTime, Map extraMetadata) throws IOException { + HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); + HoodieInstant instant = new HoodieInstant(HoodieInstant.State.INFLIGHT, + HoodieTimeline.COMMIT_ACTION, commitTime, InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR); + timeline.createNewInstant(instant); + timeline.saveAsComplete(instant, + Option.of(HoodieCommonTestHarness.getCommitMetadata(metaClient, basePath(), "partition1", commitTime, 2, extraMetadata))); + metaClient.reloadActiveTimeline(); + } + + @Test + public void testIgnoreCheckpointNullKeyCase() throws IOException { + String commitTime = "20240120000000"; + Map extraMetadata = new HashMap<>(); + // Set empty ignore key + extraMetadata.put(CHECKPOINT_KEY, "some-checkpoint"); + extraMetadata.put(CHECKPOINT_IGNORE_KEY, ""); + createCommit(commitTime, extraMetadata); + + streamerConfig.ignoreCheckpoint = "ignore_checkpoint_1"; + props.setProperty(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "2"); + + Option checkpoint = StreamerCheckpointUtils.getCheckpointToResumeString( + metaClient.getActiveTimeline(), streamerConfig, props); + assertTrue(checkpoint.isEmpty()); + } + + @Test + public void testNewCheckpointWithEmptyResetKey() throws IOException { + String commitTime = "20240120000000"; + Map extraMetadata = new HashMap<>(); + extraMetadata.put(HoodieStreamer.CHECKPOINT_KEY, "old-checkpoint"); + extraMetadata.put(HoodieStreamer.CHECKPOINT_RESET_KEY, ""); // Empty reset key + createCommit(commitTime, extraMetadata); + + streamerConfig.checkpoint = "new-checkpoint"; + props.setProperty(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "2"); + + Option checkpoint = StreamerCheckpointUtils.getCheckpointToResumeString( + metaClient.getActiveTimeline(), streamerConfig, props); + assertTrue(checkpoint.get() instanceof StreamerCheckpointV1); + assertEquals("new-checkpoint", checkpoint.get().getCheckpointKey()); + } + + @Test + public void testNewCheckpointWithDifferentResetKey() throws IOException { + String commitTime = "20240120000000"; + Map extraMetadata = new HashMap<>(); + extraMetadata.put(HoodieStreamer.CHECKPOINT_KEY, "old-checkpoint"); + extraMetadata.put(HoodieStreamer.CHECKPOINT_RESET_KEY, "different-reset-key"); + createCommit(commitTime, extraMetadata); + + streamerConfig.checkpoint = "new-checkpoint"; + props.setProperty(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "2"); + + Option checkpoint = StreamerCheckpointUtils.getCheckpointToResumeString( + metaClient.getActiveTimeline(), streamerConfig, props); + assertTrue(checkpoint.get() instanceof StreamerCheckpointV1); + assertEquals("new-checkpoint", checkpoint.get().getCheckpointKey()); + } + + @Test + public void testMergeOnReadWithDeltaCommits() throws IOException { + // Setup MOR table + metaClient = HoodieTestUtils.init(basePath(), HoodieTableType.MERGE_ON_READ); + streamerConfig.tableType = HoodieTableType.MERGE_ON_READ.name(); + + // Create a commit and deltacommit + String commitTime = "20240120000000"; + String deltaCommitTime = "20240120000001"; + + // Create commit + Map commitMetadata = new HashMap<>(); + commitMetadata.put(HoodieStreamer.CHECKPOINT_KEY, "commit-cp"); + createCommit(commitTime, commitMetadata); + + // Create deltacommit + Map deltaCommitMetadata = new HashMap<>(); + deltaCommitMetadata.put(HoodieStreamer.CHECKPOINT_KEY, "deltacommit-cp"); + createDeltaCommit(deltaCommitTime, deltaCommitMetadata); + + Option checkpoint = StreamerCheckpointUtils.getCheckpointToResumeString( + metaClient.getActiveTimeline(), streamerConfig, props); + + // Should use deltacommit checkpoint + assertEquals("deltacommit-cp", checkpoint.get().getCheckpointKey()); + } + + @Test + public void testMergeOnReadWithoutDeltaCommits() throws IOException { + // Setup MOR table + metaClient = HoodieTestUtils.init(basePath(), HoodieTableType.MERGE_ON_READ); + streamerConfig.tableType = HoodieTableType.MERGE_ON_READ.name(); + + // Create only commit + String commitTime = "20240120000000"; + Map commitMetadata = new HashMap<>(); + commitMetadata.put(HoodieStreamer.CHECKPOINT_KEY, "commit-cp"); + createCommit(commitTime, commitMetadata); + + Option checkpoint = StreamerCheckpointUtils.getCheckpointToResumeString( + metaClient.getActiveTimeline(), streamerConfig, props); + + // Should use commit checkpoint + assertEquals("commit-cp", checkpoint.get().getCheckpointKey()); + } + + private void createDeltaCommit(String deltaCommitTime, Map extraMetadata) throws IOException { + HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); + HoodieInstant instant = new HoodieInstant(HoodieInstant.State.INFLIGHT, + HoodieTimeline.DELTA_COMMIT_ACTION, deltaCommitTime, InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR); + timeline.createNewInstant(instant); + timeline.saveAsComplete(instant, + Option.of(HoodieCommonTestHarness.getCommitMetadata(metaClient, basePath(), "partition1", deltaCommitTime, 2, extraMetadata))); + metaClient.reloadActiveTimeline(); + } + + @Test + public void testCreateNewCheckpointV2WithNullTimeline() throws IOException { + streamerConfig.checkpoint = "test-cp"; + streamerConfig.sourceClassName = "org.apache.hudi.utilities.sources.KafkaSource"; + props.setProperty(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "2"); + + Option checkpoint = StreamerCheckpointUtils.getCheckpointToResumeFrom( + Option.empty(), streamerConfig, props); + assertTrue(checkpoint.get() instanceof StreamerCheckpointV1); + assertEquals("test-cp", checkpoint.get().getCheckpointKey()); + } + + @Test + public void testCreateNewCheckpointV1WithNullTimeline() throws IOException { + streamerConfig.checkpoint = "test-cp"; + props.setProperty(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "1"); + + Option checkpoint = StreamerCheckpointUtils.getCheckpointToResumeFrom( + Option.empty(), streamerConfig, props); + assertTrue(checkpoint.get() instanceof StreamerCheckpointV1); + assertEquals("test-cp", checkpoint.get().getCheckpointKey()); + } + + @Test + public void testEmptyTimelineAndNullCheckpoint() throws IOException { + streamerConfig.checkpoint = null; + Option checkpoint = StreamerCheckpointUtils.getCheckpointToResumeFrom( + Option.empty(), streamerConfig, props); + assertTrue(checkpoint.isEmpty()); + } + + @Test + public void testTimelineWithCheckpointOverridesConfigCheckpoint() throws IOException { + String commitTime = "20240120000000"; + Map metadata = new HashMap<>(); + metadata.put(HoodieStreamer.CHECKPOINT_KEY, "commit-cp"); + createCommit(commitTime, metadata); + + streamerConfig.checkpoint = "config-cp"; + + Option checkpoint = StreamerCheckpointUtils.getCheckpointToResumeFrom( + Option.of(metaClient.getActiveTimeline()), streamerConfig, props); + assertEquals("config-cp", checkpoint.get().getCheckpointKey()); + } +} From bff46c9f28906240ad5c9148d9c3fc4b41f8dae8 Mon Sep 17 00:00:00 2001 From: Davis Zhang Date: Fri, 24 Jan 2025 21:08:24 -0800 Subject: [PATCH 3/3] address PR comments --- .../deltastreamer/TestHoodieDeltaStreamer.java | 5 ++--- ...estS3GcsEventsHoodieIncrSourceE2ECkpVersion.java | 13 +++++++++++++ 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java index beb90fd4aad09..d9c48ec6c7bef 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java @@ -506,7 +506,6 @@ public void testBulkInsertsAndUpsertsWithBootstrap(HoodieRecordType recordType) cfg.configs.add(String.format("%s=false", HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key())); cfg.targetBasePath = newDatasetBasePath; new HoodieDeltaStreamer(cfg, jsc).sync(); - assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage, newDatasetBasePath)); Dataset res = sqlContext.read().format("org.apache.hudi").load(newDatasetBasePath); LOG.info("Schema :"); res.printSchema(); @@ -664,8 +663,8 @@ public void testUpsertsCOW_ContinuousModeDisabled() throws Exception { ds.sync(); assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext); assertFalse(Metrics.isInitialized(tableBasePath), "Metrics should be shutdown"); - UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath); assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage, tableBasePath)); + UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath); } @Timeout(600) @@ -694,8 +693,8 @@ public void testUpsertsMOR_ContinuousModeDisabled() throws Exception { ds.sync(); assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext); assertFalse(Metrics.isInitialized(tableBasePath), "Metrics should be shutdown"); - UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath); assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage, tableBasePath)); + UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath); } private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir, HoodieRecordType recordType) throws Exception { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestS3GcsEventsHoodieIncrSourceE2ECkpVersion.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestS3GcsEventsHoodieIncrSourceE2ECkpVersion.java index 1ff334e07979b..165166a07f049 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestS3GcsEventsHoodieIncrSourceE2ECkpVersion.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestS3GcsEventsHoodieIncrSourceE2ECkpVersion.java @@ -22,12 +22,16 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.checkpoint.CheckpointUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerTestBase; +import org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource; +import org.apache.hudi.utilities.sources.S3EventsHoodieIncrSource; import org.apache.hudi.utilities.sources.S3EventsHoodieIncrSourceHarness; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -220,4 +224,13 @@ public void testSyncE2EForceSkip(String tableVersion) throws Exception { expectedMetadata.put("schema", ""); verifyLastInstantCommitMetadata(expectedMetadata); } + + @Test + public void testTargetCheckpointV2ForS3Gcs() { + // To ensure we properly track sources that must use checkpoint V1. + assertFalse(CheckpointUtils.targetCheckpointV2(8, S3EventsHoodieIncrSource.class.getName())); + assertFalse(CheckpointUtils.targetCheckpointV2(6, S3EventsHoodieIncrSource.class.getName())); + assertFalse(CheckpointUtils.targetCheckpointV2(8, GcsEventsHoodieIncrSource.class.getName())); + assertFalse(CheckpointUtils.targetCheckpointV2(6, GcsEventsHoodieIncrSource.class.getName())); + } } \ No newline at end of file