Skip to content

Commit

Permalink
[ENG-11454][INTERNAL] Fix Duplicate handling behavior when Precombine…
Browse files Browse the repository at this point in the history
… value is not set (#821)

Co-authored-by: sivabalan <n.siva.b@gmail.com>
  • Loading branch information
2 people authored and linliu-code committed Feb 6, 2025
1 parent 14c292c commit 6cd1587
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1485,6 +1485,55 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin
assertEquals(inputRows, readRows)
}

@Test
def testEmptyPrecombine() : Unit = {
val readType = HoodieRecordType.AVRO
val writeType = HoodieRecordType.AVRO
val (_, readOpts) = getWriterReaderOpts(readType)
var (writeOpts, _) = getWriterReaderOpts(writeType)
writeOpts = writeOpts ++ Map(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key()
-> classOf[OverwriteWithLatestAvroPayload].getName.toString) ++
Map(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key() -> "") ++
Map(DataSourceWriteOptions.TABLE_TYPE.key() -> "MERGE_ON_READ")

val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).asScala
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
inputDF1.write.format("org.apache.hudi")
.options(writeOpts)
.option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
.mode(SaveMode.Overwrite)
.save(basePath)
assertTrue(HoodieDataSourceHelpers.hasNewCommits(storage, basePath, "000"))
val hudiSnapshotDF1 = spark.read.format("org.apache.hudi")
.options(readOpts)
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
.load(basePath)
assertEquals(100, hudiSnapshotDF1.count()) // still 100, since we only updated

// Second Operation:
// SNAPSHOT view should read the log files only with the latest commit time.
val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002", 100)).asScala
val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2))
inputDF2.write.format("org.apache.hudi")
.options(writeOpts)
.mode(SaveMode.Append)
.save(basePath)
val hudiSnapshotDF2 = spark.read.format("org.apache.hudi")
.options(readOpts)
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
.load(basePath)
assertEquals(100, hudiSnapshotDF2.count()) // still 100, since we only updated
val commit1Time = hudiSnapshotDF1.select("_hoodie_commit_time").head().get(0).toString
val commit2Time = hudiSnapshotDF2.select("_hoodie_commit_time").head().get(0).toString
assertEquals(hudiSnapshotDF2.select("_hoodie_commit_time").distinct().count(), 1)
assertTrue(commit2Time > commit1Time)
assertEquals(100, hudiSnapshotDF2.join(hudiSnapshotDF1, Seq("_hoodie_record_key"), "left").count())
val hudiWithoutMeta = hudiSnapshotDF2.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala: _*)
assertEquals(inputDF2.except(hudiWithoutMeta).count(), 0)
}

def getWriterReaderOpts(recordType: HoodieRecordType = HoodieRecordType.AVRO,
opt: Map[String, String] = commonOpts,
enableFileIndex: Boolean = DataSourceReadOptions.ENABLE_HOODIE_FILE_INDEX.defaultValue()):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.hudi.utilities.deltastreamer;

import joptsimple.internal.Strings;
import org.apache.hudi.DataSourceReadOptions;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.DefaultSparkRecordMerger;
Expand Down Expand Up @@ -1836,6 +1837,43 @@ private static Stream<Arguments> getArgumentsForFilterDupesWithPrecombineTest()
Arguments.of(HoodieRecordType.SPARK, "COPY_ON_WRITE", "timestamp"));
}

private static Stream<Arguments> getArgumentsForFilterDupesWithPrecombineTest() {
return Stream.of(
Arguments.of(HoodieRecordType.AVRO, "MERGE_ON_READ", Strings.EMPTY),
Arguments.of(HoodieRecordType.AVRO, "MERGE_ON_READ", "timestamp"),
Arguments.of(HoodieRecordType.AVRO, "COPY_ON_WRITE", Strings.EMPTY),
Arguments.of(HoodieRecordType.AVRO, "COPY_ON_WRITE", "timestamp"),
Arguments.of(HoodieRecordType.SPARK, "MERGE_ON_READ", Strings.EMPTY),
Arguments.of(HoodieRecordType.SPARK, "MERGE_ON_READ", "timestamp"),
Arguments.of(HoodieRecordType.SPARK, "COPY_ON_WRITE", Strings.EMPTY),
Arguments.of(HoodieRecordType.SPARK, "COPY_ON_WRITE", "timestamp"));
}

@ParameterizedTest
@MethodSource("getArgumentsForFilterDupesWithPrecombineTest")
public void testFilterDupesWithPrecombine(
HoodieRecordType recordType, String tableType, String sourceOrderingField) throws Exception {
String tableBasePath = basePath + "/test_dupes_tables_with_precombine";
HoodieDeltaStreamer.Config cfg =
TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
cfg.tableType = tableType;
cfg.filterDupes = true;
cfg.sourceOrderingField = Strings.EMPTY;
addRecordMerger(recordType, cfg.configs);
new HoodieDeltaStreamer(cfg, jsc).sync();

assertRecordCount(1000, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);

// Generate the same 1000 records + 1000 new ones
// We use TestDataSource to assist w/ generating input data. for every subquent batches, it produces 50% inserts and 50% updates.
runDeltaSync(cfg, true, 2000, WriteOperationType.INSERT);
assertRecordCount(2000, tableBasePath, sqlContext); // if filter dupes is not enabled, we should be expecting 3000 records here.
TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2);

UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
}

@ParameterizedTest
@MethodSource("getArgumentsForFilterDupesWithPrecombineTest")
public void testFilterDupesWithPrecombine(
Expand Down Expand Up @@ -1865,7 +1903,7 @@ public void testFilterDupesWithPrecombine(
public void testFilterDupes() throws Exception {
String tableBasePath = basePath + "/test_dupes_table";

// Initial bulk insert
// Initial bulk insert of 1000 records
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
new HoodieDeltaStreamer(cfg, jsc).sync();
assertRecordCount(1000, tableBasePath, sqlContext);
Expand Down Expand Up @@ -1895,9 +1933,9 @@ public void testFilterDupes() throws Exception {
// Ensure it is empty
HoodieCommitMetadata commitMetadata = new DefaultCommitMetadataSerDe()
.deserialize(newLastFinished, mClient.getActiveTimeline().getInstantDetails(newLastFinished).get(), HoodieCommitMetadata.class);
System.out.println("New Commit Metadata=" + commitMetadata);
assertTrue(commitMetadata.getPartitionToWriteStats().isEmpty());


// Try UPSERT with filterDupes true. Expect exception
cfg2.filterDupes = true;
cfg2.operation = WriteOperationType.UPSERT;
Expand All @@ -1906,6 +1944,8 @@ public void testFilterDupes() throws Exception {
} catch (IllegalArgumentException e) {
assertTrue(e.getMessage().contains("'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to ensure updates are not missed."));
}

runDeltaSync(cfg2, true, 2000, WriteOperationType.INSERT);
UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
}

Expand Down

0 comments on commit 6cd1587

Please sign in to comment.