Skip to content

Commit

Permalink
Fix a few more issues
Browse files Browse the repository at this point in the history
  • Loading branch information
yihua committed Jan 29, 2025
1 parent 4786ac5 commit 7419a76
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordReader;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.CachingIterator;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.EmptyIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.collection.Triple;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.storage.HoodieStorage;
Expand Down Expand Up @@ -102,9 +104,17 @@ public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
this.start = start;
this.length = length;
HoodieTableConfig tableConfig = hoodieTableMetaClient.getTableConfig();
RecordMergeMode recordMergeMode = tableConfig.getRecordMergeMode();
String mergeStrategyId = tableConfig.getRecordMergeStrategyId();
if (!tableConfig.getTableVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
Triple<RecordMergeMode, String, String> triple = HoodieTableConfig.inferCorrectMergingBehavior(
recordMergeMode, tableConfig.getPayloadClass(),
mergeStrategyId, null);
recordMergeMode = triple.getLeft();
mergeStrategyId = triple.getRight();
}
readerContext.setRecordMerger(readerContext.getRecordMerger(
tableConfig.getRecordMergeMode(),
tableConfig.getRecordMergeStrategyId(),
recordMergeMode, mergeStrategyId,
props.getString(RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY,
props.getString(RECORD_MERGE_IMPL_CLASSES_DEPRECATED_WRITE_CONFIG_KEY, ""))));
readerContext.setTablePath(tablePath);
Expand All @@ -122,7 +132,7 @@ public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
this.outputConverter = readerContext.getSchemaHandler().getOutputConverter();
this.readStats = new HoodieReadStats();
this.recordBuffer = getRecordBuffer(readerContext, hoodieTableMetaClient,
tableConfig.getRecordMergeMode(), props, hoodieBaseFileOption, this.logFiles.isEmpty(),
recordMergeMode, props, hoodieBaseFileOption, this.logFiles.isEmpty(),
isSkipMerge, shouldUseRecordPosition, readStats);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@ import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE, RECORD_MERGE_MODE, SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP, TBL_NAME, WRITE_PARTIAL_UPDATE_SCHEMA}
import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE, SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP, TBL_NAME, WRITE_PARTIAL_UPDATE_SCHEMA}
import org.apache.hudi.exception.{HoodieException, HoodieNotSupportedException}
import org.apache.hudi.hive.HiveSyncConfigHolder
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.util.JFunction.scalaFunction1Noop

import org.apache.avro.Schema
import org.apache.spark.sql._
import org.apache.spark.sql.HoodieCatalystExpressionUtils.{MatchCast, attributeEquals}
import org.apache.spark.sql.HoodieCatalystExpressionUtils.{attributeEquals, MatchCast}
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, BoundReference, EqualTo, Expression, Literal, NamedExpression, PredicateHelper}
Expand All @@ -46,13 +47,14 @@ import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
import org.apache.spark.sql.hudi.ProvidesHoodieConfig
import org.apache.spark.sql.hudi.ProvidesHoodieConfig.{combineOptions, getPartitionPathFieldWriteConfig}
import org.apache.spark.sql.hudi.analysis.HoodieAnalysis.failAnalysis
import org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.{CoercedAttributeReference, encodeAsBase64String, stripCasting, toStructType, userGuideString, validateTargetTableAttrExistsInAssignments}
import org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.{encodeAsBase64String, stripCasting, toStructType, userGuideString, validateTargetTableAttrExistsInAssignments, CoercedAttributeReference}
import org.apache.spark.sql.hudi.command.PartialAssignmentMode.PartialAssignmentMode
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._
import org.apache.spark.sql.types.{BooleanType, StructField, StructType}

import java.util.Base64

import scala.collection.JavaConverters._

/**
Expand Down Expand Up @@ -817,8 +819,9 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
// Precombine field and record key field must be present in the assignment clause of all insert actions for event time ordering mode.
// Check has no effect if we don't have such fields in target table or we don't have insert actions
// Please note we are relying on merge mode in the table config as writer merge mode is always "CUSTOM" for MIT.
if (getStringWithAltKeys(props.asJava.asInstanceOf[java.util.Map[String, Object]], HoodieTableConfig.RECORD_MERGE_MODE)
.equals(RecordMergeMode.EVENT_TIME_ORDERING.name())) {
if (RecordMergeMode.EVENT_TIME_ORDERING.name()
.equals(getStringWithAltKeys(props.asJava.asInstanceOf[java.util.Map[String, Object]],
HoodieTableConfig.RECORD_MERGE_MODE))) {
insertActions.foreach(action =>
hoodieCatalogTable.preCombineKey.foreach(
field => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase {
"mor,true,false,6", "mor,true,true,6"
*/
//"cow,true,false,6",
Seq("cow,false,false,8", "cow,false,true,8", "cow,true,false,8",
"cow,true,false,6", "cow,true,true,6",
Seq(
"mor,false,false,8", "mor,false,true,8", "mor,true,false,8",
"mor,true,false,6", "mor,true,true,6").foreach { args =>
val argList = args.split(',')
Expand Down Expand Up @@ -78,7 +77,11 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase {
val nonExistentConfigs = if (tableVersion.toInt == 6) {
Seq(HoodieTableConfig.RECORD_MERGE_MODE.key, HoodieTableConfig.PRECOMBINE_FIELD.key)
} else {
Seq(HoodieTableConfig.PRECOMBINE_FIELD.key)
if (setRecordMergeConfigs) {
Seq()
} else {
Seq(HoodieTableConfig.PRECOMBINE_FIELD.key)
}
}

test(s"Test $tableType table with COMMIT_TIME_ORDERING (tableVersion=$tableVersion,"
Expand Down Expand Up @@ -130,10 +133,12 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase {
storage, tmp.getCanonicalPath, expectedMergeConfigs, nonExistentConfigs)
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
(if (isUpsert) {
// With UPSERT operation, there is no duplicate
Seq(
Seq(1, "A_equal", 60.0, 100),
Seq(2, "B_equal", 70.0, 100))
} else {
// With INSERT operation, there are duplicates
Seq(
Seq(1, "A", 10.0, 100),
Seq(1, "A_equal", 60.0, 100),
Expand Down Expand Up @@ -241,6 +246,7 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase {
| ts long
| ) using hudi
| tblproperties (
| $writeTableVersionClause
| type = '$tableType',
| primaryKey = 'id'
| $mergeConfigClause
Expand All @@ -254,13 +260,13 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase {
spark.sql(
s"""
| insert into $tableName
| select 1 as id, 'A' as name, 10.0 as price, 100 as ts union all
| select 0, 'X', 20.0, 100 union all
| select 2, 'B', 20.0, 100 union all
| select 3, 'C', 30.0, 100 union all
| select 4, 'D', 40.0, 100 union all
| select 5, 'E', 50.0, 100 union all
| select 6, 'F', 60.0, 100
| select 1 as id, 'A' as name, 10.0 as price, 100L as ts union all
| select 0, 'X', 20.0, 100L union all
| select 2, 'B', 20.0, 100L union all
| select 3, 'C', 30.0, 100L union all
| select 4, 'D', 40.0, 100L union all
| select 5, 'E', 50.0, 100L union all
| select 6, 'F', 60.0, 100L
""".stripMargin)
validateTableConfig(
storage, tmp.getCanonicalPath, expectedMergeConfigs, nonExistentConfigs)
Expand All @@ -270,9 +276,9 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase {
s"""
| merge into $tableName t
| using (
| select 1 as id, 'B2' as name, 25.0 as price, 101 as ts union all
| select 2, '', 55.0, 99 as ts union all
| select 0, '', 55.0, 100 as ts
| select 1 as id, 'B2' as name, 25.0 as price, 101L as ts union all
| select 2, '', 55.0, 99L as ts union all
| select 0, '', 55.0, 100L as ts
| ) s
| on t.id = s.id
| when matched then delete
Expand All @@ -283,9 +289,9 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase {
s"""
| merge into $tableName t
| using (
| select 4 as id, 'D2' as name, 45.0 as price, 101 as ts union all
| select 5, 'E2', 55.0, 99 as ts union all
| select 6, 'F2', 65.0, 100 as ts
| select 4 as id, 'D2' as name, 45.0 as price, 101L as ts union all
| select 5, 'E2', 55.0, 99L as ts union all
| select 6, 'F2', 65.0, 100L as ts
| ) s
| on t.id = s.id
| when matched then update set *
Expand All @@ -306,8 +312,8 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase {
s"""
| merge into $tableName t
| using (
| select 7 as id, 'D2' as name, 45.0 as price, 100 as ts union all
| select 8, 'E2', 55.0, 100 as ts
| select 7 as id, 'D2' as name, 45.0 as price, 100L as ts union all
| select 8, 'E2', 55.0, 100L as ts
| ) s
| on t.id = s.id
| when not matched then insert *
Expand Down

0 comments on commit 7419a76

Please sign in to comment.