Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8726] cover USE_TRANSITION_TIME for ckp conversion #12687

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ public static HoodieTableMetaClient createMetaClient(SparkSession spark, String
return HoodieTestUtils.createMetaClient(new HadoopStorageConfiguration(spark.sessionState().newHadoopConf()), basePath);
}

private static Option<HoodieCommitMetadata> getCommitMetadataForInstant(HoodieTableMetaClient metaClient, HoodieInstant instant) {
public static Option<HoodieCommitMetadata> getCommitMetadataForInstant(HoodieTableMetaClient metaClient, HoodieInstant instant) {
try {
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
byte[] data = timeline.getInstantDetails(instant).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieBaseFile;
Expand All @@ -38,6 +39,7 @@
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
Expand Down Expand Up @@ -87,6 +89,7 @@
import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static org.apache.hudi.common.testutils.HoodieTestUtils.RAW_TRIPS_TEST_NAME;
import static org.apache.hudi.config.HoodieWriteConfig.WRITE_TABLE_VERSION;
import static org.apache.hudi.testutils.Assertions.assertFileSizesEqual;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -181,12 +184,19 @@ public HoodieTableMetaClient getHoodieMetaClient(StorageConfiguration<?> storage
return getHoodieMetaClient(storageConf, basePath, getPropertiesForKeyGen(true));
}

public HoodieTableMetaClient getHoodieMetaClientWithTableVersion(StorageConfiguration<?> storageConf, String basePath, String tableVersion) throws IOException {
Properties props = getPropertiesForKeyGen(true);
props.put(WRITE_TABLE_VERSION.key(), tableVersion);
return getHoodieMetaClient(storageConf, basePath, props);
}

@Override
public HoodieTableMetaClient getHoodieMetaClient(StorageConfiguration<?> storageConf, String basePath, Properties props) throws IOException {
return HoodieTableMetaClient.newTableBuilder()
.setTableName(RAW_TRIPS_TEST_NAME)
.setTableType(COPY_ON_WRITE)
.setPayloadClass(HoodieAvroPayload.class)
.setTableVersion(ConfigUtils.getIntWithAltKeys(new TypedProperties(props), WRITE_TABLE_VERSION))
.fromProperties(props)
.initTable(storageConf.newInstance(), basePath);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,29 @@
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;

import static org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1;
import static org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1.STREAMER_CHECKPOINT_RESET_KEY_V1;
import static org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2;
import static org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_RESET_KEY_V2;
import static org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.USE_TRANSITION_TIME;

public class CheckpointUtils {

public static final Set<String> DATASOURCES_MUST_USE_CKP_V1 = new HashSet<>(Arrays.asList(
"org.apache.hudi.utilities.sources.S3EventsHoodieIncrSource",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we do S3EventsHoodieIncrSource.class.getClassName just to ensure we are resilient to moving classes to a diff package.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's not ideal because:
this general purpose utility method "shouldTargetCheckpointV2" is called everywhere, where not all the call site have access to class definition of them. It requires non trivial amount of code refactoring to move code to some common place where everyone can access.

As compensation, I added unit test validation for S3EventsHoodieIncrSource.class.getClassName and Gcs. This can also guard against class moving to other packages

"org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource",
"org.apache.hudi.utilities.sources.MockS3EventsHoodieIncrSource"
));
public static Checkpoint getCheckpoint(HoodieCommitMetadata commitMetadata) {
if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_KEY_V2))
|| !StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_RESET_KEY_V2))) {
Expand All @@ -49,14 +59,15 @@ public static Checkpoint getCheckpoint(HoodieCommitMetadata commitMetadata) {
throw new HoodieException("Checkpoint is not found in the commit metadata: " + commitMetadata.getExtraMetadata());
}

public static boolean targetCheckpointV2(int writeTableVersion) {
return writeTableVersion >= HoodieTableVersion.EIGHT.versionCode();
public static boolean targetCheckpointV2(int writeTableVersion, String sourceClassName) {
return writeTableVersion >= HoodieTableVersion.EIGHT.versionCode()
&& !DATASOURCES_MUST_USE_CKP_V1.contains(sourceClassName);
}

// TODO(yihua): for checkpoint translation, handle cases where the checkpoint is not exactly the
// instant or completion time
public static StreamerCheckpointV2 convertToCheckpointV2ForCommitTime(
Checkpoint checkpoint, HoodieTableMetaClient metaClient) {
Checkpoint checkpoint, HoodieTableMetaClient metaClient, TimelineUtils.HollowCommitHandling handlingMode) {
if (checkpoint.checkpointKey.equals(HoodieTimeline.INIT_INSTANT_TS)) {
return new StreamerCheckpointV2(HoodieTimeline.INIT_INSTANT_TS);
}
Expand All @@ -65,7 +76,9 @@ public static StreamerCheckpointV2 convertToCheckpointV2ForCommitTime(
}
if (checkpoint instanceof StreamerCheckpointV1) {
// V1 -> V2 translation
// TODO(yihua): handle USE_TRANSITION_TIME in V1
if (handlingMode.equals(USE_TRANSITION_TIME)) {
return new StreamerCheckpointV2(checkpoint);
}
// TODO(yihua): handle different ordering between requested and completion time
// TODO(yihua): handle timeline history / archived timeline
String instantTime = checkpoint.getCheckpointKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,14 @@

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

import java.util.stream.Stream;

import static org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.BLOCK;
import static org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.FAIL;
import static org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.USE_TRANSITION_TIME;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -95,7 +100,7 @@ public void testConvertToCheckpointV2ForCommitTime() {
when(activeTimeline.getInstantsAsStream()).thenReturn(Stream.of(instant));

Checkpoint checkpoint = new StreamerCheckpointV1(instantTime);
StreamerCheckpointV2 translatedCheckpoint = CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient);
StreamerCheckpointV2 translatedCheckpoint = CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient, FAIL);

assertEquals(completionTime, translatedCheckpoint.getCheckpointKey());
}
Expand Down Expand Up @@ -126,7 +131,7 @@ public void testConvertToCheckpointV2ThrowsExceptionForMissingCompletionTime() {
Checkpoint checkpoint = new StreamerCheckpointV1(instantTime);

Exception exception = assertThrows(UnsupportedOperationException.class,
() -> CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient));
() -> CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient, BLOCK));
assertTrue(exception.getMessage().contains("Unable to find completion time"));
}

Expand Down Expand Up @@ -155,7 +160,7 @@ public void testConvertToCheckpointV2ForCommitTimeEmptyTimeline() {

Checkpoint checkpoint = new StreamerCheckpointV1(instantTime);
Exception exception = assertThrows(UnsupportedOperationException.class,
() -> CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient));
() -> CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient, FAIL));
assertTrue(exception.getMessage().contains("Unable to find completion time"));
}

Expand All @@ -168,7 +173,39 @@ public void testConvertCheckpointWithInitTimestamp() {
assertEquals(HoodieTimeline.INIT_INSTANT_TS, translated.getCheckpointKey());

checkpoint = new StreamerCheckpointV2(instantTime);
translated = CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient);
translated = CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient, BLOCK);
assertEquals(HoodieTimeline.INIT_INSTANT_TS, translated.getCheckpointKey());
}

@Test
public void testConvertCheckpointWithUseTransitionTime() {
String instantTime = "20231127010101";
String completionTime = "20231127020102";

// Mock active timeline
HoodieInstant instant = new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", instantTime, completionTime, InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR);
when(activeTimeline.getInstantsAsStream()).thenReturn(Stream.of(instant));

Checkpoint checkpoint = new StreamerCheckpointV1(completionTime);
StreamerCheckpointV2 translatedCheckpoint = CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient, USE_TRANSITION_TIME);

assertEquals(completionTime, translatedCheckpoint.getCheckpointKey());
}

@ParameterizedTest
@CsvSource({
// version, sourceClassName, expectedResult
// Version >= 8 with allowed sources should return true
"8, org.apache.hudi.utilities.sources.TestSource, true",
"9, org.apache.hudi.utilities.sources.AnotherSource, true",
// Version < 8 should return false regardless of source
"7, org.apache.hudi.utilities.sources.TestSource, false",
"6, org.apache.hudi.utilities.sources.AnotherSource, false",
// Disallowed sources should return false even with version >= 8
"8, org.apache.hudi.utilities.sources.S3EventsHoodieIncrSource, false",
"8, org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource, false"
})
public void testTargetCheckpointV2(int version, String sourceClassName, boolean expected) {
assertEquals(expected, CheckpointUtils.targetCheckpointV2(version, sourceClassName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
Expand Down Expand Up @@ -544,22 +543,6 @@ private HoodieRollbackMetadata getRollbackMetadataInstance(String basePath, Stri
return TimelineMetadataUtils.convertRollbackMetadata(commitTs, Option.empty(), rollbacks, rollbackStats);
}

private byte[] getCommitMetadata(String basePath, String partition, String commitTs, int count, Map<String, String> extraMetadata)
throws IOException {
HoodieCommitMetadata commit = new HoodieCommitMetadata();
for (int i = 1; i <= count; i++) {
HoodieWriteStat stat = new HoodieWriteStat();
stat.setFileId(i + "");
stat.setPartitionPath(Paths.get(basePath, partition).toString());
stat.setPath(commitTs + "." + i + metaClient.getTableConfig().getBaseFileFormat().getFileExtension());
commit.addWriteStat(partition, stat);
}
for (Map.Entry<String, String> extraEntries : extraMetadata.entrySet()) {
commit.addMetadata(extraEntries.getKey(), extraEntries.getValue());
}
return serializeCommitMetadata(metaClient.getCommitMetadataSerDe(), commit).get();
}

private byte[] getReplaceCommitMetadata(String basePath, String commitTs, String replacePartition, int replaceCount,
String newFilePartition, int newFileCount, Map<String, String> extraMetadata,
WriteOperationType operationType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@

import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
Expand Down Expand Up @@ -55,6 +57,7 @@

import java.io.IOException;
import java.net.URI;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -70,6 +73,7 @@

import static org.apache.hudi.common.config.HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME;
import static org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME;
import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCommitMetadata;

/**
* The common hoodie test harness to provide the basic infrastructure.
Expand Down Expand Up @@ -377,4 +381,25 @@ private static HoodieDataBlock getDataBlock(HoodieLogBlock.HoodieLogBlockType da
throw new RuntimeException("Unknown data block type " + dataBlockType);
}
}

public byte[] getCommitMetadata(String basePath, String partition, String commitTs, int count, Map<String, String> extraMetadata)
throws IOException {
return getCommitMetadata(metaClient, basePath, partition, commitTs, count, extraMetadata);
}

public static byte[] getCommitMetadata(HoodieTableMetaClient metaClient, String basePath, String partition, String commitTs, int count, Map<String, String> extraMetadata)
throws IOException {
HoodieCommitMetadata commit = new HoodieCommitMetadata();
for (int i = 1; i <= count; i++) {
HoodieWriteStat stat = new HoodieWriteStat();
stat.setFileId(i + "");
stat.setPartitionPath(Paths.get(basePath, partition).toString());
stat.setPath(commitTs + "." + i + metaClient.getTableConfig().getBaseFileFormat().getFileExtension());
commit.addWriteStat(partition, stat);
}
for (Map.Entry<String, String> extraEntries : extraMetadata.entrySet()) {
commit.addMetadata(extraEntries.getKey(), extraEntries.getValue());
}
return serializeCommitMetadata(metaClient.getCommitMetadataSerDe(), commit).get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ class HoodieStreamSourceV1(sqlContext: SQLContext,
private def translateCheckpoint(commitTime: String): String = {
if (writeTableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
CheckpointUtils.convertToCheckpointV2ForCommitTime(
new StreamerCheckpointV1(commitTime), metaClient).getCheckpointKey
new StreamerCheckpointV1(commitTime), metaClient, hollowCommitHandling).getCheckpointKey
} else {
commitTime
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ class HoodieStreamSourceV2(sqlContext: SQLContext,
}

private def translateCheckpoint(commitTime: String): String = {
if (writeTableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
if (CheckpointUtils.targetCheckpointV2(writeTableVersion.versionCode(), getClass.getName)) {
commitTime
} else {
CheckpointUtils.convertToCheckpointV1ForCommitTime(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ protected Option<Checkpoint> translateCheckpoint(Option<Checkpoint> lastCheckpoi

@Override
public Pair<Option<Dataset<Row>>, Checkpoint> fetchNextBatch(Option<Checkpoint> lastCheckpoint, long sourceLimit) {
if (CheckpointUtils.targetCheckpointV2(writeTableVersion)) {
if (CheckpointUtils.targetCheckpointV2(writeTableVersion, getClass().getName())) {
return fetchNextBatchBasedOnCompletionTime(lastCheckpoint, sourceLimit);
} else {
return fetchNextBatchBasedOnRequestedTime(lastCheckpoint, sourceLimit);
Expand All @@ -215,7 +215,7 @@ private Pair<Option<Dataset<Row>>, Checkpoint> fetchNextBatchBasedOnCompletionTi
IncrementalQueryAnalyzer analyzer = IncrSourceHelper.getIncrementalQueryAnalyzer(
sparkContext, srcPath, lastCheckpoint, missingCheckpointStrategy,
getIntWithAltKeys(props, HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH),
getLatestSourceProfile());
getLatestSourceProfile(), getHollowCommitHandleMode(props));
QueryContext queryContext = analyzer.analyze();
Option<InstantRange> instantRange = queryContext.getInstantRange();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ protected Option<Checkpoint> translateCheckpoint(Option<Checkpoint> lastCheckpoi
if (lastCheckpoint.isEmpty()) {
return Option.empty();
}
if (CheckpointUtils.targetCheckpointV2(writeTableVersion)) {
if (CheckpointUtils.targetCheckpointV2(writeTableVersion, getClass().getName())) {
// V2 -> V2
if (lastCheckpoint.get() instanceof StreamerCheckpointV2) {
return lastCheckpoint;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hudi.common.table.read.IncrementalQueryAnalyzer;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
Expand Down Expand Up @@ -184,7 +185,7 @@ public static IncrementalQueryAnalyzer getIncrementalQueryAnalyzer(
Option<Checkpoint> lastCheckpoint,
MissingCheckpointStrategy missingCheckpointStrategy,
int numInstantsFromConfig,
Option<SourceProfile<Integer>> latestSourceProfile) {
Option<SourceProfile<Integer>> latestSourceProfile, TimelineUtils.HollowCommitHandling handlingMode) {
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
.setConf(HadoopFSUtils.getStorageConfWithCopy(jssc.hadoopConfiguration()))
.setBasePath(srcPath)
Expand All @@ -197,7 +198,7 @@ public static IncrementalQueryAnalyzer getIncrementalQueryAnalyzer(
if (lastCheckpoint.isPresent() && !lastCheckpoint.get().getCheckpointKey().isEmpty()) {
// Translate checkpoint
StreamerCheckpointV2 lastStreamerCheckpointV2 = CheckpointUtils.convertToCheckpointV2ForCommitTime(
lastCheckpoint.get(), metaClient);
lastCheckpoint.get(), metaClient, handlingMode);
startCompletionTime = lastStreamerCheckpointV2.getCheckpointKey();
rangeType = RangeType.OPEN_CLOSED;
} else if (missingCheckpointStrategy != null) {
Expand Down
Loading
Loading