diff --git a/hudi-sync/hudi-datahub-sync/pom.xml b/hudi-sync/hudi-datahub-sync/pom.xml
index a03c12af80ca..733d1090d985 100644
--- a/hudi-sync/hudi-datahub-sync/pom.xml
+++ b/hudi-sync/hudi-datahub-sync/pom.xml
@@ -34,14 +34,14 @@
jar
- 0.8.45
+ 0.15.0rc20
4.1.5
io.acryl
- datahub-client
+ datahub-client-java8
${datahub.version}
@@ -85,6 +85,14 @@
${project.version}
+
+
+ org.apache.hudi
+ hudi-hadoop-mr
+ ${project.version}
+ compile
+
+
org.apache.hudi
@@ -113,6 +121,14 @@
kryo-shaded
test
+
+
+ org.apache.hudi
+ hudi-hive-sync
+ ${project.version}
+ test
+
+
diff --git a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DatahubResponseLogger.java b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubResponseLogger.java
similarity index 91%
rename from hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DatahubResponseLogger.java
rename to hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubResponseLogger.java
index d7d6945f515b..277cffd22101 100644
--- a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DatahubResponseLogger.java
+++ b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubResponseLogger.java
@@ -27,12 +27,12 @@
/**
* Handle responses to requests to Datahub Metastore. Just logs them.
*/
-public class DatahubResponseLogger implements Callback {
- private static final Logger LOG = LoggerFactory.getLogger(DatahubResponseLogger.class);
+public class DataHubResponseLogger implements Callback {
+ private static final Logger LOG = LoggerFactory.getLogger(DataHubResponseLogger.class);
@Override
public void onCompletion(MetadataWriteResponse response) {
- LOG.info("Completed Datahub RestEmitter request. "
+ LOG.info("Completed DataHub RestEmitter request. "
+ "Status: " + (response.isSuccess() ? " succeeded" : " failed"));
if (!response.isSuccess()) {
LOG.error("Request failed. " + response);
diff --git a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java
index ac3f474d6769..44ce04f59e68 100644
--- a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java
+++ b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java
@@ -21,54 +21,72 @@
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hive.SchemaDifference;
import org.apache.hudi.sync.common.HoodieSyncClient;
import org.apache.hudi.sync.common.HoodieSyncException;
import org.apache.hudi.sync.datahub.config.DataHubSyncConfig;
+import org.apache.hudi.sync.datahub.config.HoodieDataHubDatasetIdentifier;
+import org.apache.hudi.sync.datahub.util.SchemaFieldsUtil;
+import com.linkedin.common.BrowsePathEntry;
+import com.linkedin.common.BrowsePathEntryArray;
+import com.linkedin.common.BrowsePathsV2;
import com.linkedin.common.Status;
+import com.linkedin.common.SubTypes;
+import com.linkedin.common.UrnArray;
import com.linkedin.common.urn.DatasetUrn;
-import com.linkedin.data.template.SetMode;
-import com.linkedin.data.template.StringMap;
-import com.linkedin.dataset.DatasetProperties;
-import com.linkedin.schema.ArrayType;
-import com.linkedin.schema.BooleanType;
-import com.linkedin.schema.BytesType;
-import com.linkedin.schema.EnumType;
-import com.linkedin.schema.FixedType;
-import com.linkedin.schema.MapType;
-import com.linkedin.schema.NullType;
-import com.linkedin.schema.NumberType;
-import com.linkedin.schema.OtherSchema;
-import com.linkedin.schema.RecordType;
-import com.linkedin.schema.SchemaField;
-import com.linkedin.schema.SchemaFieldArray;
-import com.linkedin.schema.SchemaFieldDataType;
-import com.linkedin.schema.SchemaMetadata;
-import com.linkedin.schema.StringType;
-import com.linkedin.schema.UnionType;
+import com.linkedin.common.urn.Urn;
+import com.linkedin.container.Container;
+import com.linkedin.container.ContainerProperties;
+import com.linkedin.data.template.StringArray;
+import com.linkedin.domain.Domains;
+import com.linkedin.metadata.aspect.patch.builder.DatasetPropertiesPatchBuilder;
+import com.linkedin.mxe.MetadataChangeProposal;
+import datahub.client.MetadataWriteResponse;
import datahub.client.rest.RestEmitter;
import datahub.event.MetadataChangeProposalWrapper;
-import org.apache.avro.AvroTypeException;
+import io.datahubproject.schematron.converters.avro.AvroSchemaConverter;
import org.apache.avro.Schema;
import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
public class DataHubSyncClient extends HoodieSyncClient {
+ private static final Logger LOG = LoggerFactory.getLogger(DataHubSyncClient.class);
+
protected final DataHubSyncConfig config;
private final DatasetUrn datasetUrn;
+ private final Urn databaseUrn;
+ private final String tableName;
+ private final String databaseName;
private static final Status SOFT_DELETE_FALSE = new Status().setRemoved(false);
public DataHubSyncClient(DataHubSyncConfig config, HoodieTableMetaClient metaClient) {
super(config, metaClient);
this.config = config;
- this.datasetUrn = config.datasetIdentifier.getDatasetUrn();
+ HoodieDataHubDatasetIdentifier datasetIdentifier =
+ config.getDatasetIdentifier();
+ this.datasetUrn = datasetIdentifier.getDatasetUrn();
+ this.databaseUrn = datasetIdentifier.getDatabaseUrn();
+ this.tableName = datasetIdentifier.getTableName();
+ this.databaseName = datasetIdentifier.getDatabaseName();
}
@Override
@@ -76,45 +94,171 @@ public Option getLastCommitTimeSynced(String tableName) {
throw new UnsupportedOperationException("Not supported: `getLastCommitTimeSynced`");
}
+ protected Option getLastCommitTime() {
+ return getActiveTimeline().lastInstant().map(HoodieInstant::requestedTime);
+ }
+
+ protected Option getLastCommitCompletionTime() {
+ int countInstants = getActiveTimeline().countInstants();
+ return getActiveTimeline()
+ .getInstantsOrderedByCompletionTime()
+ .skip(countInstants - 1)
+ .findFirst()
+ .map(HoodieInstant::getCompletionTime)
+ .map(Option::of).orElseGet(Option::empty);
+ }
+
@Override
public void updateLastCommitTimeSynced(String tableName) {
- updateTableProperties(tableName, Collections.singletonMap(HOODIE_LAST_COMMIT_TIME_SYNC, getActiveTimeline().lastInstant().get().requestedTime()));
+ Option lastCommitTime = getLastCommitTime();
+ if (lastCommitTime.isPresent()) {
+ updateTableProperties(tableName, Collections.singletonMap(HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitTime.get()));
+ } else {
+ LOG.error("Failed to get last commit time");
+ }
+
+ Option lastCommitCompletionTime = getLastCommitCompletionTime();
+ if (lastCommitCompletionTime.isPresent()) {
+ updateTableProperties(tableName, Collections.singletonMap(HOODIE_LAST_COMMIT_COMPLETION_TIME_SYNC, lastCommitCompletionTime.get()));
+ } else {
+ LOG.error("Failed to get last commit completion time");
+ }
+ }
+
+ private MetadataChangeProposal createDatasetPropertiesAspect(String tableName, Map tableProperties) {
+ DatasetPropertiesPatchBuilder datasetPropertiesPatchBuilder = new DatasetPropertiesPatchBuilder().urn(datasetUrn);
+ if (tableProperties != null) {
+ tableProperties.forEach(datasetPropertiesPatchBuilder::addCustomProperty);
+ }
+ if (tableName != null) {
+ datasetPropertiesPatchBuilder.setName(tableName);
+ }
+ return datasetPropertiesPatchBuilder.build();
}
@Override
public boolean updateTableProperties(String tableName, Map tableProperties) {
- MetadataChangeProposalWrapper propertiesChangeProposal = MetadataChangeProposalWrapper.builder()
- .entityType("dataset")
- .entityUrn(datasetUrn)
- .upsert()
- .aspect(new DatasetProperties().setCustomProperties(new StringMap(tableProperties)))
- .build();
-
- DatahubResponseLogger responseLogger = new DatahubResponseLogger();
+ // Use PATCH API to avoid overwriting existing properties
+ MetadataChangeProposal proposal = createDatasetPropertiesAspect(tableName, tableProperties);
+ DataHubResponseLogger responseLogger = new DataHubResponseLogger();
try (RestEmitter emitter = config.getRestEmitter()) {
- emitter.emit(propertiesChangeProposal, responseLogger).get();
+ Future future = emitter.emit(proposal, responseLogger);
+ future.get();
return true;
} catch (Exception e) {
- throw new HoodieDataHubSyncException("Fail to change properties for Dataset " + datasetUrn + ": "
- + tableProperties, e);
+ if (!config.suppressExceptions()) {
+ throw new HoodieDataHubSyncException(
+ "Failed to sync properties for Dataset " + datasetUrn + ": " + tableProperties, e);
+ } else {
+ LOG.error("Failed to sync properties for Dataset {}: {}", datasetUrn, tableProperties, e);
+ return false;
+ }
}
}
@Override
public void updateTableSchema(String tableName, MessageType schema, SchemaDifference schemaDifference) {
try (RestEmitter emitter = config.getRestEmitter()) {
- DatahubResponseLogger responseLogger = new DatahubResponseLogger();
- MetadataChangeProposalWrapper schemaChange = createSchemaMetadataUpdate(tableName);
- emitter.emit(schemaChange, responseLogger).get();
-
- // When updating an entity, it is necessary to set its soft-delete status to false, or else the update won't get
- // reflected in the UI.
- MetadataChangeProposalWrapper softDeleteUndoProposal = createUndoSoftDelete();
- emitter.emit(softDeleteUndoProposal, responseLogger).get();
+ DataHubResponseLogger responseLogger = new DataHubResponseLogger();
+
+ Stream proposals =
+ Stream.of(createContainerEntity(), createDatasetEntity()).flatMap(stream -> stream);
+
+ // Execute all proposals in parallel and collect futures
+ List> futures = proposals.map(
+ p -> {
+ try {
+ return emitter.emit(p, responseLogger);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ ).collect(Collectors.toList());
+
+ List successfulResults = new ArrayList<>();
+ List failures = new ArrayList<>();
+
+ for (Future future : futures) {
+ try {
+ successfulResults.add(future.get(30, TimeUnit.SECONDS));
+ } catch (TimeoutException e) {
+ failures.add(new HoodieDataHubSyncException("Operation timed out", e));
+ } catch (InterruptedException | ExecutionException e) {
+ failures.add(e);
+ }
+ }
+
+ if (!failures.isEmpty()) {
+ if (!config.suppressExceptions()) {
+ throw new HoodieDataHubSyncException("Failed to sync " + failures.size() + " operations", failures.get(0));
+ } else {
+ for (Throwable failure : failures) {
+ LOG.error("Failed to sync operation", failure);
+ }
+ }
+ }
} catch (Exception e) {
- throw new HoodieDataHubSyncException("Fail to change schema for Dataset " + datasetUrn, e);
+ if (!config.suppressExceptions()) {
+ throw new HoodieDataHubSyncException(String.format("Failed to sync metadata for dataset %s", tableName), e);
+ } else {
+ LOG.error("Failed to sync metadata for dataset {}", tableName, e);
+ }
+ }
+ }
+
+ private MetadataChangeProposalWrapper createContainerAspect(Urn entityUrn, Urn containerUrn) {
+ MetadataChangeProposalWrapper containerProposal = MetadataChangeProposalWrapper.builder()
+ .entityType(entityUrn.getEntityType())
+ .entityUrn(entityUrn)
+ .upsert()
+ .aspect(new Container().setContainer(containerUrn))
+ .build();
+ return containerProposal;
+ }
+
+ private MetadataChangeProposalWrapper createBrowsePathsAspect(Urn entityUrn, List path) {
+ BrowsePathEntryArray browsePathEntryArray = new BrowsePathEntryArray(path);
+ MetadataChangeProposalWrapper browsePathsProposal = MetadataChangeProposalWrapper.builder()
+ .entityType(entityUrn.getEntityType())
+ .entityUrn(entityUrn)
+ .upsert()
+ .aspect(new BrowsePathsV2().setPath(browsePathEntryArray))
+ .build();
+ return browsePathsProposal;
+ }
+
+ private MetadataChangeProposalWrapper createDomainAspect(Urn entityUrn) {
+ try {
+ Urn domainUrn = Urn.createFromString(config.getDomainIdentifier());
+ MetadataChangeProposalWrapper attachDomainProposal = MetadataChangeProposalWrapper.builder()
+ .entityType(entityUrn.getEntityType())
+ .entityUrn(entityUrn)
+ .upsert()
+ .aspect(new Domains().setDomains(new UrnArray(domainUrn)))
+ .build();
+ return attachDomainProposal;
+ } catch (URISyntaxException e) {
+ LOG.warn("Failed to create domain URN from string: {}", config.getDomainIdentifier());
}
+ return null;
+ }
+
+ private Stream createContainerEntity() {
+ MetadataChangeProposalWrapper containerEntityProposal = MetadataChangeProposalWrapper.builder()
+ .entityType("container")
+ .entityUrn(databaseUrn)
+ .upsert()
+ .aspect(new ContainerProperties().setName(databaseName))
+ .build();
+
+ Stream resultStream = Stream.of(
+ containerEntityProposal,
+ createSubTypeAspect(databaseUrn, "Database"),
+ createBrowsePathsAspect(databaseUrn, Collections.emptyList()), createStatusAspect(databaseUrn),
+ config.attachDomain() ? createDomainAspect(databaseUrn) : null
+ ).filter(Objects::nonNull);
+ return resultStream;
}
@Override
@@ -127,42 +271,60 @@ public void close() {
// no op;
}
- private MetadataChangeProposalWrapper createUndoSoftDelete() {
- MetadataChangeProposalWrapper softDeleteUndoProposal = MetadataChangeProposalWrapper.builder()
- .entityType("dataset")
- .entityUrn(datasetUrn)
+ private MetadataChangeProposalWrapper createStatusAspect(Urn urn) {
+ MetadataChangeProposalWrapper softDeleteUndoProposal = MetadataChangeProposalWrapper.builder()
+ .entityType(urn.getEntityType())
+ .entityUrn(urn)
.upsert()
.aspect(SOFT_DELETE_FALSE)
- .aspectName("status")
.build();
return softDeleteUndoProposal;
}
- private MetadataChangeProposalWrapper createSchemaMetadataUpdate(String tableName) {
+ private MetadataChangeProposalWrapper createSubTypeAspect(Urn urn, String subType) {
+ MetadataChangeProposalWrapper subTypeProposal = MetadataChangeProposalWrapper.builder()
+ .entityType(urn.getEntityType())
+ .entityUrn(urn)
+ .upsert()
+ .aspect(new SubTypes().setTypeNames(new StringArray(subType)))
+ .build();
+ return subTypeProposal;
+ }
+
+ private MetadataChangeProposalWrapper createSchemaMetadataAspect(String tableName) {
Schema avroSchema = getAvroSchemaWithoutMetadataFields(metaClient);
- List fields = avroSchema.getFields().stream().map(f -> new SchemaField()
- .setFieldPath(f.name())
- .setType(toSchemaFieldDataType(f.schema().getType()))
- .setDescription(f.doc(), SetMode.IGNORE_NULL)
- .setNativeDataType(f.schema().getType().getName())).collect(Collectors.toList());
+ AvroSchemaConverter avroSchemaConverter = AvroSchemaConverter.builder().build();
+ com.linkedin.schema.SchemaMetadata schemaMetadata = avroSchemaConverter.toDataHubSchema(
+ avroSchema,
+ false,
+ false,
+ datasetUrn.getPlatformEntity(),
+ null
+ );
- final SchemaMetadata.PlatformSchema platformSchema = new SchemaMetadata.PlatformSchema();
- platformSchema.setOtherSchema(new OtherSchema().setRawSchema(avroSchema.toString()));
+ // Reorder fields to relocate _hoodie_ metadata fields to the end
+ schemaMetadata.setFields(SchemaFieldsUtil.reorderPrefixedFields(schemaMetadata.getFields(), "_hoodie_"));
return MetadataChangeProposalWrapper.builder()
.entityType("dataset")
.entityUrn(datasetUrn)
.upsert()
- .aspect(new SchemaMetadata()
- .setSchemaName(tableName)
- .setVersion(0)
- .setHash("")
- .setPlatform(datasetUrn.getPlatformEntity())
- .setPlatformSchema(platformSchema)
- .setFields(new SchemaFieldArray(fields)))
+ .aspect(schemaMetadata)
.build();
}
+ private Stream createDatasetEntity() {
+ Stream result = Stream.of(
+ createStatusAspect(datasetUrn),
+ createSubTypeAspect(datasetUrn, "Table"),
+ createBrowsePathsAspect(datasetUrn, Collections.singletonList(new BrowsePathEntry().setUrn(databaseUrn).setId(databaseName))),
+ createContainerAspect(datasetUrn, databaseUrn),
+ createSchemaMetadataAspect(tableName),
+ config.attachDomain() ? createDomainAspect(datasetUrn) : null
+ ).filter(Objects::nonNull);
+ return result;
+ }
+
Schema getAvroSchemaWithoutMetadataFields(HoodieTableMetaClient metaClient) {
try {
return new TableSchemaResolver(metaClient).getTableAvroSchema(true);
@@ -170,36 +332,4 @@ Schema getAvroSchemaWithoutMetadataFields(HoodieTableMetaClient metaClient) {
throw new HoodieSyncException("Failed to read avro schema", e);
}
}
-
- static SchemaFieldDataType toSchemaFieldDataType(Schema.Type type) {
- switch (type) {
- case BOOLEAN:
- return new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new BooleanType()));
- case INT:
- case LONG:
- case FLOAT:
- case DOUBLE:
- return new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new NumberType()));
- case MAP:
- return new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new MapType()));
- case ENUM:
- return new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new EnumType()));
- case NULL:
- return new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new NullType()));
- case ARRAY:
- return new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new ArrayType()));
- case BYTES:
- return new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new BytesType()));
- case FIXED:
- return new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new FixedType()));
- case UNION:
- return new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new UnionType()));
- case RECORD:
- return new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new RecordType()));
- case STRING:
- return new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new StringType()));
- default:
- throw new AvroTypeException("Unexpected type: " + type.getName());
- }
- }
-}
+}
\ No newline at end of file
diff --git a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncTool.java b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncTool.java
index 182506fadece..148fc61dd26e 100644
--- a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncTool.java
+++ b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncTool.java
@@ -27,10 +27,18 @@
import com.beust.jcommander.JCommander;
import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Map;
import java.util.Properties;
+import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
+import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_CONDITIONAL_SYNC;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
+import static org.apache.hudi.sync.datahub.DataHubTableProperties.HoodieTableMetadata;
+import static org.apache.hudi.sync.datahub.DataHubTableProperties.getTableProperties;
/**
* To sync with DataHub via REST APIs.
@@ -39,9 +47,12 @@
* @see https://datahubproject.io/
*/
public class DataHubSyncTool extends HoodieSyncTool {
+ private static final Logger LOG = LoggerFactory.getLogger(DataHubSyncTool.class);
protected final DataHubSyncConfig config;
- private final HoodieTableMetaClient metaClient;
+ protected final HoodieTableMetaClient metaClient;
+ protected DataHubSyncClient syncClient;
+ private final String tableName;
public DataHubSyncTool(Properties props) {
this(props, HadoopConfigUtils.createHadoopConf(props), Option.empty());
@@ -50,20 +61,59 @@ public DataHubSyncTool(Properties props) {
public DataHubSyncTool(Properties props, Configuration hadoopConf, Option metaClientOption) {
super(props, hadoopConf);
this.config = new DataHubSyncConfig(props);
+ this.tableName = config.getString(META_SYNC_TABLE_NAME);
this.metaClient = metaClientOption.orElseGet(() -> buildMetaClient(config));
+ this.syncClient = new DataHubSyncClient(config, metaClient);
}
- /**
- * Sync to a DataHub Dataset.
- *
- * @implNote DataHub sync is an experimental feature, which overwrites the DataHub Dataset's schema
- * and last commit time sync'ed upon every invocation.
- */
@Override
public void syncHoodieTable() {
- try (DataHubSyncClient syncClient = new DataHubSyncClient(config, metaClient)) {
- syncClient.updateTableSchema(config.getString(META_SYNC_TABLE_NAME), null, null);
- syncClient.updateLastCommitTimeSynced(config.getString(META_SYNC_TABLE_NAME));
+ try {
+ LOG.info("Syncing target Hoodie table with DataHub dataset({}). DataHub URL: {}, basePath: {}",
+ tableName, config.getDataHubServerEndpoint(), config.getString(META_SYNC_BASE_PATH));
+
+ syncSchema();
+ syncTableProperties();
+ updateLastCommitTimeIfNeeded();
+
+ LOG.info("Sync completed for table {}", tableName);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to sync table " + tableName + " to DataHub", e);
+ } finally {
+ close();
+ }
+ }
+
+ private void syncSchema() throws Exception {
+ syncClient.updateTableSchema(tableName, null, null);
+ LOG.info("Schema synced for table {}", tableName);
+ }
+
+ private void syncTableProperties() throws Exception {
+ MessageType storageSchema = syncClient.getStorageSchema();
+ HoodieTableMetadata tableMetadata = new HoodieTableMetadata(metaClient, storageSchema);
+ Map tableProperties = getTableProperties(config, tableMetadata);
+ syncClient.updateTableProperties(tableName, tableProperties);
+ LOG.info("Properties synced for table {}", tableName);
+ }
+
+ private void updateLastCommitTimeIfNeeded() throws Exception {
+ boolean shouldUpdateLastCommitTime = !config.getBoolean(META_SYNC_CONDITIONAL_SYNC);
+ if (shouldUpdateLastCommitTime) {
+ syncClient.updateLastCommitTimeSynced(tableName);
+ LOG.info("Updated last sync time for table {}", tableName);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (syncClient != null) {
+ try {
+ syncClient.close();
+ syncClient = null;
+ } catch (Exception e) {
+ LOG.error("Error closing DataHub sync client", e);
+ }
}
}
@@ -77,4 +127,4 @@ public static void main(String[] args) {
}
new DataHubSyncTool(params.toProps()).syncHoodieTable();
}
-}
+}
\ No newline at end of file
diff --git a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubTableProperties.java b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubTableProperties.java
new file mode 100644
index 000000000000..f951d67a4ff3
--- /dev/null
+++ b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubTableProperties.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.sync.datahub;
+
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.sync.common.util.SparkDataSourceTableUtils;
+import org.apache.hudi.sync.datahub.config.DataHubSyncConfig;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getInputFormatClassName;
+import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getOutputFormatClassName;
+import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getSerDeClassName;
+import static org.apache.hudi.sync.datahub.config.DataHubSyncConfig.HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD;
+import static org.apache.hudi.sync.datahub.config.DataHubSyncConfig.HIVE_TABLE_SERDE_PROPERTIES;
+import static org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_BASE_FILE_FORMAT;
+import static org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_BASE_PATH;
+import static org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_PARTITION_FIELDS;
+import static org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_SPARK_VERSION;
+
+public class DataHubTableProperties {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DataHubTableProperties.class);
+
+ public static final String HOODIE_META_SYNC_DATAHUB_TABLE_PROPERTIES = "hoodie.meta.sync.datahub.table.properties";
+ public static final String HUDI_TABLE_TYPE = "hudi.table.type";
+ public static final String HUDI_TABLE_VERSION = "hudi.table.version";
+ public static final String HUDI_BASE_PATH = "hudi.base.path";
+ public static final String HUDI_PARTITION_FIELDS = "hudi.partition.fields";
+
+ public static final ConfigProperty DATAHUB_TABLE_PROPERTIES =
+ ConfigProperty.key(HOODIE_META_SYNC_DATAHUB_TABLE_PROPERTIES)
+ .defaultValue("")
+ .withDocumentation("Additional properties to be attached to the DataHub dataset, specified as key1=val1,key2=val2");
+
+ public static Map getTableProperties(DataHubSyncConfig config, HoodieTableMetadata tableMetadata) {
+ Map properties = new HashMap<>();
+ addBasicHudiTableProperties(properties, config, tableMetadata);
+ addPartitioningInformation(properties, config);
+ addUserDefinedProperties(properties, config);
+ addSparkRelatedProperties(properties, config, tableMetadata);
+ return properties;
+ }
+
+ private static void addBasicHudiTableProperties(Map properties, DataHubSyncConfig config, HoodieTableMetadata tableMetadata) {
+ properties.put(HUDI_TABLE_TYPE, tableMetadata.getTableType());
+ properties.put(HUDI_TABLE_VERSION, tableMetadata.getTableVersion());
+ properties.put(HUDI_BASE_PATH, config.getString(META_SYNC_BASE_PATH));
+ }
+
+ private static void addPartitioningInformation(Map properties, DataHubSyncConfig config) {
+ if (!config.getSplitStrings(META_SYNC_PARTITION_FIELDS).isEmpty()) {
+ properties.put(HUDI_PARTITION_FIELDS, String.join(",", config.getSplitStrings(META_SYNC_PARTITION_FIELDS)));
+ }
+ }
+
+ private static void addUserDefinedProperties(Map properties, DataHubSyncConfig config) {
+ Map userDefinedProps = ConfigUtils.toMap(config.getString(DATAHUB_TABLE_PROPERTIES));
+ properties.putAll(userDefinedProps);
+ }
+
+ private static void addSparkRelatedProperties(Map properties, DataHubSyncConfig config, HoodieTableMetadata tableMetadata) {
+ Map sparkProperties = SparkDataSourceTableUtils.getSparkTableProperties(
+ config.getSplitStrings(META_SYNC_PARTITION_FIELDS),
+ config.getString(META_SYNC_SPARK_VERSION),
+ config.getInt(HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD),
+ tableMetadata.getSchema()
+ );
+ properties.putAll(sparkProperties);
+ properties.putAll(getSerdeProperties(config, false));
+ }
+
+ private static Map getSerdeProperties(DataHubSyncConfig config, boolean readAsOptimized) {
+ HoodieFileFormat baseFileFormat = HoodieFileFormat.valueOf(config.getStringOrDefault(META_SYNC_BASE_FILE_FORMAT).toUpperCase());
+ String inputFormatClassName = getInputFormatClassName(baseFileFormat, false, false);
+ String outputFormatClassName = getOutputFormatClassName(baseFileFormat);
+ String serDeFormatClassName = getSerDeClassName(baseFileFormat);
+
+ Map serdeProperties = ConfigUtils.toMap(config.getString(HIVE_TABLE_SERDE_PROPERTIES));
+ serdeProperties.put("inputFormat", inputFormatClassName);
+ serdeProperties.put("outputFormat", outputFormatClassName);
+ serdeProperties.put("serdeClass", serDeFormatClassName);
+ Map sparkSerdeProperties = SparkDataSourceTableUtils.getSparkSerdeProperties(readAsOptimized, config.getString(META_SYNC_BASE_PATH));
+ sparkSerdeProperties.forEach((k, v) -> serdeProperties.putIfAbsent(k.startsWith("spark.") ? k : "spark." + k, v));
+ LOG.info("Serde Properties : {}", serdeProperties);
+ return serdeProperties;
+ }
+
+ public static class HoodieTableMetadata {
+ private final HoodieTableMetaClient metaClient;
+ private final MessageType schema;
+
+ public HoodieTableMetadata(HoodieTableMetaClient metaClient, MessageType schema) {
+ this.metaClient = metaClient;
+ this.schema = schema;
+ }
+
+ public String getTableType() {
+ return metaClient.getTableType().name();
+ }
+
+ public String getTableVersion() {
+ return metaClient.getTableConfig().getTableVersion().toString();
+ }
+
+ public MessageType getSchema() {
+ return schema;
+ }
+ }
+}
\ No newline at end of file
diff --git a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/DataHubSyncConfig.java b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/DataHubSyncConfig.java
index 2a96171b3c12..a54c7c85e48f 100644
--- a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/DataHubSyncConfig.java
+++ b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/DataHubSyncConfig.java
@@ -29,6 +29,8 @@
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParametersDelegate;
import datahub.client.rest.RestEmitter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.Immutable;
@@ -43,6 +45,8 @@
description = "Configurations used by the Hudi to sync metadata to DataHub.")
public class DataHubSyncConfig extends HoodieSyncConfig {
+ private static final Logger LOG = LoggerFactory.getLogger(DataHubSyncConfig.class);
+
public static final ConfigProperty META_SYNC_DATAHUB_DATASET_IDENTIFIER_CLASS = ConfigProperty
.key("hoodie.meta.sync.datahub.dataset.identifier.class")
.defaultValue(HoodieDataHubDatasetIdentifier.class.getName())
@@ -80,12 +84,47 @@ public class DataHubSyncConfig extends HoodieSyncConfig {
.markAdvanced()
.withDocumentation("Environment to use when pushing entities to Datahub");
- public final HoodieDataHubDatasetIdentifier datasetIdentifier;
+ public static final ConfigProperty META_SYNC_DATAHUB_DOMAIN_IDENTIFIER = ConfigProperty
+ .key("hoodie.meta.sync.datahub.domain.identifier")
+ .noDefaultValue()
+ .markAdvanced()
+ .withDocumentation("Domain identifier for the dataset. When provided all datasets will be attached to the provided domain. Must be in urn form (e.g., urn:li:domain:_domain_id).");
+
+ public static final ConfigProperty HIVE_TABLE_SERDE_PROPERTIES = ConfigProperty
+ .key("hoodie.datasource.hive_sync.serde_properties")
+ .noDefaultValue()
+ .markAdvanced()
+ .withDocumentation("Serde properties to hive table.");
+
+
+ public static final ConfigProperty HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD = ConfigProperty
+ .key("hoodie.datasource.hive_sync.schema_string_length_thresh")
+ .defaultValue(4000)
+ .markAdvanced()
+ .withDocumentation("");
+
+ public static final ConfigProperty META_SYNC_DATAHUB_SYNC_SUPPRESS_EXCEPTIONS = ConfigProperty
+ .key("hoodie.meta.sync.datahub.sync.suppress.exceptions")
+ .defaultValue(true)
+ .markAdvanced()
+ .withDocumentation("Suppress exceptions during DataHub sync. This is true by default to ensure that when running inline with other jobs, the sync does not fail the job.");
public DataHubSyncConfig(Properties props) {
super(props);
+ // Log warning if the domain identifier is provided but is not in urn form
+ if (contains(META_SYNC_DATAHUB_DOMAIN_IDENTIFIER) && !getString(META_SYNC_DATAHUB_DOMAIN_IDENTIFIER).startsWith("urn:li:domain:")) {
+ LOG.warn(
+ "Domain identifier must be in urn form (e.g., urn:li:domain:_domain_id). Provided {}. Will remove this from configuration.",
+ getString(META_SYNC_DATAHUB_DOMAIN_IDENTIFIER));
+ this.props.remove(META_SYNC_DATAHUB_DOMAIN_IDENTIFIER.key());
+ }
+ }
+
+ public HoodieDataHubDatasetIdentifier getDatasetIdentifier() {
String identifierClass = getStringOrDefault(META_SYNC_DATAHUB_DATASET_IDENTIFIER_CLASS);
- datasetIdentifier = (HoodieDataHubDatasetIdentifier) ReflectionUtils.loadClass(identifierClass, new Class>[] {Properties.class}, props);
+ // Use reflection to instantiate the class
+ HoodieDataHubDatasetIdentifier datasetIdentifier = (HoodieDataHubDatasetIdentifier) ReflectionUtils.loadClass(identifierClass, new Class>[] {Properties.class}, props);
+ return datasetIdentifier;
}
public RestEmitter getRestEmitter() {
@@ -98,6 +137,22 @@ public RestEmitter getRestEmitter() {
}
}
+ public Boolean suppressExceptions() {
+ return getBoolean(META_SYNC_DATAHUB_SYNC_SUPPRESS_EXCEPTIONS);
+ }
+
+ public String getDataHubServerEndpoint() {
+ return getString(META_SYNC_DATAHUB_EMITTER_SERVER);
+ }
+
+ public boolean attachDomain() {
+ return contains(META_SYNC_DATAHUB_DOMAIN_IDENTIFIER);
+ }
+
+ public String getDomainIdentifier() {
+ return getString(META_SYNC_DATAHUB_DOMAIN_IDENTIFIER);
+ }
+
public static class DataHubSyncConfigParams {
@ParametersDelegate()
@@ -122,6 +177,14 @@ public static class DataHubSyncConfigParams {
@Parameter(names = {"--dataset-env"}, description = "Which Datahub Environment to use when pushing entities")
public String datasetEnv;
+ @Parameter(names = {
+ "--domain"}, description = "Domain identifier for the dataset. When provided all datasets will be attached to the provided domain. Must be in urn form (e.g., urn:li:domain:_domain_id).")
+ public String domainIdentifier;
+
+ @Parameter(names = {
+ "--suppress-exceptions"}, description = "Suppress exceptions during DataHub sync.")
+ public String suppressExceptions;
+
public boolean isHelp() {
return hoodieSyncConfigParams.isHelp();
}
@@ -134,7 +197,14 @@ public Properties toProps() {
props.setPropertyIfNonNull(META_SYNC_DATAHUB_EMITTER_SUPPLIER_CLASS.key(), emitterSupplierClass);
props.setPropertyIfNonNull(META_SYNC_DATAHUB_DATAPLATFORM_NAME.key(), dataPlatformName);
props.setPropertyIfNonNull(META_SYNC_DATAHUB_DATASET_ENV.key(), datasetEnv);
+ props.setPropertyIfNonNull(META_SYNC_DATAHUB_DOMAIN_IDENTIFIER.key(), domainIdentifier);
+ // We want the default behavior of DataHubSync Tool when run as command line to NOT suppress exceptions
+ if (suppressExceptions == null) {
+ props.setProperty(META_SYNC_DATAHUB_SYNC_SUPPRESS_EXCEPTIONS.key(), "false");
+ } else {
+ props.setProperty(META_SYNC_DATAHUB_SYNC_SUPPRESS_EXCEPTIONS.key(), String.valueOf(suppressExceptions));
+ }
return props;
}
}
-}
+}
\ No newline at end of file
diff --git a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java
index 6c8ea076ffc0..664fab31e8e1 100644
--- a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java
+++ b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java
@@ -22,6 +22,8 @@
import com.linkedin.common.FabricType;
import com.linkedin.common.urn.DataPlatformUrn;
import com.linkedin.common.urn.DatasetUrn;
+import com.linkedin.common.urn.Urn;
+import io.datahubproject.models.util.DatabaseKey;
import java.util.Properties;
@@ -41,19 +43,50 @@ public class HoodieDataHubDatasetIdentifier {
public static final FabricType DEFAULT_DATAHUB_ENV = FabricType.DEV;
protected final Properties props;
+ private final DatasetUrn datasetUrn;
+ private final Urn databaseUrn;
+ private final String tableName;
+ private final String databaseName;
public HoodieDataHubDatasetIdentifier(Properties props) {
this.props = props;
- }
-
- public DatasetUrn getDatasetUrn() {
+ if (props == null || props.isEmpty()) {
+ throw new IllegalArgumentException("Properties cannot be null or empty");
+ }
DataHubSyncConfig config = new DataHubSyncConfig(props);
- return new DatasetUrn(
+ this.datasetUrn = new DatasetUrn(
createDataPlatformUrn(config.getStringOrDefault(META_SYNC_DATAHUB_DATAPLATFORM_NAME)),
createDatasetName(config.getString(META_SYNC_DATABASE_NAME), config.getString(META_SYNC_TABLE_NAME)),
FabricType.valueOf(config.getStringOrDefault(META_SYNC_DATAHUB_DATASET_ENV))
);
+
+ this.tableName = config.getString(META_SYNC_TABLE_NAME);
+ this.databaseName = config.getString(META_SYNC_DATABASE_NAME);
+
+ DatabaseKey databaseKey = DatabaseKey.builder()
+ .platform(config.getStringOrDefault(META_SYNC_DATAHUB_DATAPLATFORM_NAME))
+ .instance(config.getStringOrDefault(META_SYNC_DATAHUB_DATASET_ENV))
+ .database(this.databaseName)
+ .build();
+
+ this.databaseUrn = databaseKey.asUrn();
+ }
+
+ public DatasetUrn getDatasetUrn() {
+ return this.datasetUrn;
+ }
+
+ public Urn getDatabaseUrn() {
+ return this.databaseUrn;
+ }
+
+ public String getTableName() {
+ return this.tableName;
+ }
+
+ public String getDatabaseName() {
+ return this.databaseName;
}
private static DataPlatformUrn createDataPlatformUrn(String platformUrn) {
@@ -63,4 +96,4 @@ private static DataPlatformUrn createDataPlatformUrn(String platformUrn) {
private static String createDatasetName(String databaseName, String tableName) {
return String.format("%s.%s", databaseName, tableName);
}
-}
+}
\ No newline at end of file
diff --git a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/util/SchemaFieldsUtil.java b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/util/SchemaFieldsUtil.java
new file mode 100644
index 000000000000..0b3229ac38c7
--- /dev/null
+++ b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/util/SchemaFieldsUtil.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.sync.datahub.util;
+
+import com.linkedin.schema.SchemaField;
+import com.linkedin.schema.SchemaFieldArray;
+import io.datahubproject.models.util.FieldPath;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SchemaFieldsUtil {
+ public static SchemaFieldArray reorderPrefixedFields(SchemaFieldArray fields, String prefix) {
+ if (fields == null || fields.isEmpty()) {
+ return fields;
+ }
+
+ // Split the list into underscore and non-underscore fields while preserving order
+ List prefixedFields = new ArrayList<>();
+ List normalFields = new ArrayList<>();
+
+ for (SchemaField field : fields) {
+ FieldPath fieldPath = new FieldPath(field.getFieldPath());
+
+ if (fieldPath.isTopLevel() && fieldPath.leafFieldName().startsWith(prefix)) {
+ prefixedFields.add(field);
+ } else {
+ normalFields.add(field);
+ }
+ }
+
+ // Combine the lists with underscore fields at the end
+ List result = new ArrayList<>(normalFields.size() + prefixedFields.size());
+ result.addAll(normalFields);
+ result.addAll(prefixedFields);
+
+ return new SchemaFieldArray(result);
+ }
+}
\ No newline at end of file
diff --git a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/TestDataHubSyncClient.java b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/TestDataHubSyncClient.java
index a19e035ad1d0..837536fb7950 100644
--- a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/TestDataHubSyncClient.java
+++ b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/TestDataHubSyncClient.java
@@ -21,9 +21,11 @@
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.sync.datahub.config.DataHubSyncConfig;
+import com.linkedin.mxe.MetadataChangeProposal;
import datahub.client.MetadataWriteResponse;
import datahub.client.rest.RestEmitter;
import datahub.event.MetadataChangeProposalWrapper;
@@ -40,15 +42,22 @@
import java.io.IOException;
import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS;
+import static org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_SYNC_SUPPRESS_EXCEPTIONS;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class TestDataHubSyncClient {
@@ -65,7 +74,7 @@ public class TestDataHubSyncClient {
@BeforeAll
public static void beforeAll() throws IOException {
TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ "
- + "{\"name\": \"ts\",\"type\": \"long\"}]}";
+ + "{\"name\": \"ts\",\"type\": \"long\"}]}";
avroSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
@@ -73,9 +82,9 @@ public static void beforeAll() throws IOException {
props.put("hoodie.table.name", "some_table");
tableBasePath = Paths.get(tmpDir.toString(), "some_table").toString();
HoodieTableMetaClient.newTableBuilder()
- .fromProperties(props)
- .setTableType(HoodieTableType.MERGE_ON_READ.name())
- .initTable(HadoopFSUtils.getStorageConf(new Configuration()), tableBasePath);
+ .fromProperties(props)
+ .setTableType(HoodieTableType.MERGE_ON_READ.name())
+ .initTable(HadoopFSUtils.getStorageConf(new Configuration()), tableBasePath);
}
@BeforeEach
@@ -94,19 +103,125 @@ public void testUpdateTableSchemaInvokesRestEmitter() throws IOException {
props.put(META_SYNC_BASE_PATH.key(), tableBasePath);
Mockito.when(
- restEmitterMock.emit(any(MetadataChangeProposalWrapper.class), Mockito.any())
+ restEmitterMock.emit(any(MetadataChangeProposalWrapper.class), Mockito.any())
).thenReturn(
- CompletableFuture.completedFuture(MetadataWriteResponse.builder().build())
+ CompletableFuture.completedFuture(MetadataWriteResponse.builder().build())
);
DatahubSyncConfigStub configStub = new DatahubSyncConfigStub(props, restEmitterMock);
DataHubSyncClientStub dhClient = new DataHubSyncClientStub(configStub);
dhClient.updateTableSchema("some_table", null, null);
- verify(restEmitterMock, times(2)).emit(any(MetadataChangeProposalWrapper.class),
+ verify(restEmitterMock, times(9)).emit(any(MetadataChangeProposalWrapper.class),
Mockito.any());
}
+ @Test
+ public void testUpdateTableProperties() throws Exception {
+ Properties props = new Properties();
+ props.put(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), DummyPartitionValueExtractor.class.getName());
+ props.put(META_SYNC_BASE_PATH.key(), tableBasePath);
+
+ when(restEmitterMock.emit(any(MetadataChangeProposal.class), any()))
+ .thenReturn(CompletableFuture.completedFuture(MetadataWriteResponse.builder().build()));
+
+ DatahubSyncConfigStub configStub = new DatahubSyncConfigStub(props, restEmitterMock);
+ DataHubSyncClientStub dhClient = new DataHubSyncClientStub(configStub);
+
+ Map properties = new HashMap<>();
+ properties.put("key1", "value1");
+ properties.put("key2", "value2");
+
+ boolean result = dhClient.updateTableProperties("some_table", properties);
+ assertTrue(result);
+ verify(restEmitterMock, times(1)).emit(any(MetadataChangeProposal.class), any());
+ }
+
+ @Test
+ public void testUpdateTablePropertiesFailure() throws Exception {
+ Properties props = new Properties();
+ props.put(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), DummyPartitionValueExtractor.class.getName());
+ props.put(META_SYNC_BASE_PATH.key(), tableBasePath);
+ props.put(META_SYNC_DATAHUB_SYNC_SUPPRESS_EXCEPTIONS.key(), "false");
+
+ CompletableFuture failedFuture = new CompletableFuture<>();
+ failedFuture.completeExceptionally(new IOException("Emission failed"));
+ when(restEmitterMock.emit(any(MetadataChangeProposalWrapper.class), any()))
+ .thenReturn(failedFuture);
+
+ DatahubSyncConfigStub configStub = new DatahubSyncConfigStub(props, restEmitterMock);
+ DataHubSyncClientStub dhClient = new DataHubSyncClientStub(configStub);
+
+ Map properties = new HashMap<>();
+ properties.put("key1", "value1");
+
+ assertThrows(HoodieDataHubSyncException.class, () ->
+ dhClient.updateTableProperties("some_table", properties));
+ }
+
+ @Test
+ public void testGetLastCommitTimeSynced() {
+ Properties props = new Properties();
+ props.put(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), DummyPartitionValueExtractor.class.getName());
+ props.put(META_SYNC_BASE_PATH.key(), tableBasePath);
+
+ DatahubSyncConfigStub configStub = new DatahubSyncConfigStub(props, restEmitterMock);
+ DataHubSyncClientStub dhClient = new DataHubSyncClientStub(configStub);
+
+ assertThrows(UnsupportedOperationException.class, () ->
+ dhClient.getLastCommitTimeSynced("some_table"));
+ }
+
+ @Test
+ public void testGetMetastoreSchema() {
+ Properties props = new Properties();
+ props.put(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), DummyPartitionValueExtractor.class.getName());
+ props.put(META_SYNC_BASE_PATH.key(), tableBasePath);
+
+ DatahubSyncConfigStub configStub = new DatahubSyncConfigStub(props, restEmitterMock);
+ DataHubSyncClientStub dhClient = new DataHubSyncClientStub(configStub);
+
+ assertThrows(UnsupportedOperationException.class, () ->
+ dhClient.getMetastoreSchema("some_table"));
+ }
+
+ @Test
+ public void testUpdateTableSchemaWithEmitterFailure() throws Exception {
+ Properties props = new Properties();
+ props.put(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), DummyPartitionValueExtractor.class.getName());
+ props.put(META_SYNC_BASE_PATH.key(), tableBasePath);
+ props.put(META_SYNC_DATAHUB_SYNC_SUPPRESS_EXCEPTIONS.key(), "false");
+
+ // Create a failed future that will throw when accessed
+ CompletableFuture future = new CompletableFuture<>();
+ future.completeExceptionally(new ExecutionException("Emission failed", new IOException()));
+
+ // Configure mock to return the failed future for ALL calls
+ when(restEmitterMock.emit((MetadataChangeProposalWrapper) any(), any())).thenReturn(future);
+
+ DatahubSyncConfigStub configStub = new DatahubSyncConfigStub(props, restEmitterMock);
+ DataHubSyncClientStub dhClient = new DataHubSyncClientStub(configStub);
+
+ assertThrows(HoodieDataHubSyncException.class, () ->
+ dhClient.updateTableSchema("some_table", null, null));
+ }
+
+ @Test
+ public void testUpdateLastCommitTimeSynced() throws Exception {
+ Properties props = new Properties();
+ props.put(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), DummyPartitionValueExtractor.class.getName());
+ props.put(META_SYNC_BASE_PATH.key(), tableBasePath);
+
+ when(restEmitterMock.emit(any(MetadataChangeProposal.class), any()))
+ .thenReturn(CompletableFuture.completedFuture(MetadataWriteResponse.builder().build()));
+
+ DatahubSyncConfigStub configStub = new DatahubSyncConfigStub(props, restEmitterMock);
+ DataHubSyncClientStub dhClient = new DataHubSyncClientStub(configStub);
+
+ dhClient.updateLastCommitTimeSynced("some_table");
+ verify(restEmitterMock, times(2)).emit(any(MetadataChangeProposal.class), any());
+ }
+
public class DataHubSyncClientStub extends DataHubSyncClient {
public DataHubSyncClientStub(DataHubSyncConfig config) {
@@ -118,6 +233,16 @@ Schema getAvroSchemaWithoutMetadataFields(HoodieTableMetaClient metaClient) {
return avroSchema;
}
+ @Override
+ protected Option getLastCommitTime() {
+ return Option.of("1000");
+ }
+
+ @Override
+ protected Option getLastCommitCompletionTime() {
+ return Option.of("1000");
+ }
+
}
public class DatahubSyncConfigStub extends DataHubSyncConfig {
@@ -133,6 +258,7 @@ public DatahubSyncConfigStub(Properties props, RestEmitter emitterMock) {
public RestEmitter getRestEmitter() {
return emitterMock;
}
+
}
-}
+}
\ No newline at end of file
diff --git a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestDataHubSyncConfig.java b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestDataHubSyncConfig.java
index 9d92970c3b2f..f693389fa4d8 100644
--- a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestDataHubSyncConfig.java
+++ b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestDataHubSyncConfig.java
@@ -47,7 +47,7 @@ void testInstantiationWithProps() {
Properties props = new Properties();
props.setProperty(META_SYNC_DATAHUB_DATASET_IDENTIFIER_CLASS.key(), DummyIdentifier.class.getName());
DataHubSyncConfig syncConfig = new DataHubSyncConfig(props);
- DatasetUrn datasetUrn = syncConfig.datasetIdentifier.getDatasetUrn();
+ DatasetUrn datasetUrn = syncConfig.getDatasetIdentifier().getDatasetUrn();
assertEquals("foo", datasetUrn.getPlatformEntity().getPlatformNameEntity());
assertEquals("project.database.table", datasetUrn.getDatasetNameEntity());
assertEquals(FabricType.PROD, datasetUrn.getOriginEntity());
diff --git a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestHoodieDataHubDatasetIdentifier.java b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestHoodieDataHubDatasetIdentifier.java
new file mode 100644
index 000000000000..52af11f0a854
--- /dev/null
+++ b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestHoodieDataHubDatasetIdentifier.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.sync.datahub.config;
+
+import com.linkedin.common.FabricType;
+import com.linkedin.common.urn.DatasetUrn;
+import com.linkedin.common.urn.Urn;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import java.util.Properties;
+
+import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
+import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
+import static org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_DATAPLATFORM_NAME;
+import static org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_DATASET_ENV;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TestHoodieDataHubDatasetIdentifier {
+
+ private Properties props;
+
+ @BeforeEach
+ void setUp() {
+ props = new Properties();
+ }
+
+ @Test
+ @DisplayName("Test constructor with default values")
+ void testConstructorWithDefaultValues() {
+ // Given
+ props.setProperty(META_SYNC_DATABASE_NAME.key(), "test_db");
+ props.setProperty(META_SYNC_TABLE_NAME.key(), "test_table");
+
+ // When
+ HoodieDataHubDatasetIdentifier identifier = new HoodieDataHubDatasetIdentifier(props);
+
+ // Then
+ DatasetUrn datasetUrn = identifier.getDatasetUrn();
+ assertNotNull(datasetUrn);
+ assertEquals(HoodieDataHubDatasetIdentifier.DEFAULT_HOODIE_DATAHUB_PLATFORM_NAME,
+ datasetUrn.getPlatformEntity().getId());
+ assertEquals("test_db.test_table", datasetUrn.getDatasetNameEntity());
+ assertEquals(HoodieDataHubDatasetIdentifier.DEFAULT_DATAHUB_ENV, datasetUrn.getOriginEntity());
+ }
+
+ @Test
+ @DisplayName("Test constructor with custom values")
+ void testConstructorWithCustomValues() {
+ // Given
+ props.setProperty(META_SYNC_DATABASE_NAME.key(), "custom_db");
+ props.setProperty(META_SYNC_TABLE_NAME.key(), "custom_table");
+ props.setProperty(META_SYNC_DATAHUB_DATAPLATFORM_NAME.key(), "custom_platform");
+ props.setProperty(META_SYNC_DATAHUB_DATASET_ENV.key(), "PROD");
+
+ // When
+ HoodieDataHubDatasetIdentifier identifier = new HoodieDataHubDatasetIdentifier(props);
+
+ // Then
+ DatasetUrn datasetUrn = identifier.getDatasetUrn();
+ assertNotNull(datasetUrn);
+ assertEquals("custom_platform", datasetUrn.getPlatformEntity().getId());
+ assertEquals("custom_db.custom_table", datasetUrn.getDatasetNameEntity());
+ assertEquals(FabricType.PROD, datasetUrn.getOriginEntity());
+ }
+
+ @Test
+ @DisplayName("Test getDatabaseUrn")
+ void testGetDatabaseUrn() {
+ // Given
+ props.setProperty(META_SYNC_DATABASE_NAME.key(), "test_db");
+ props.setProperty(META_SYNC_TABLE_NAME.key(), "test_table");
+ props.setProperty(META_SYNC_DATAHUB_DATASET_ENV.key(), "PROD");
+
+ // When
+ HoodieDataHubDatasetIdentifier identifier = new HoodieDataHubDatasetIdentifier(props);
+
+ // Then
+ Urn databaseUrn = identifier.getDatabaseUrn();
+ assertNotNull(databaseUrn);
+ assertFalse(databaseUrn.toString().contains("test_db"));
+ assertFalse(databaseUrn.toString().contains("PROD"));
+ assertTrue(databaseUrn.toString().startsWith("urn:li:container:"));
+ }
+
+ @Test
+ @DisplayName("Test getTableName")
+ void testGetTableName() {
+ // Given
+ String tableName = "test_table";
+ props.setProperty(META_SYNC_DATABASE_NAME.key(), "test_db");
+ props.setProperty(META_SYNC_TABLE_NAME.key(), tableName);
+
+ // When
+ HoodieDataHubDatasetIdentifier identifier = new HoodieDataHubDatasetIdentifier(props);
+
+ // Then
+ assertEquals(tableName, identifier.getTableName());
+ }
+
+ @Test
+ @DisplayName("Test constructor with missing required properties")
+ void testConstructorWithMissingProperties() {
+ // Given empty properties
+
+ // Then
+ assertThrows(IllegalArgumentException.class, () -> {
+ new HoodieDataHubDatasetIdentifier(props);
+ });
+ }
+
+ @Test
+ @DisplayName("Test constructor with invalid environment")
+ void testConstructorWithInvalidEnvironment() {
+ // Given
+ props.setProperty(META_SYNC_DATABASE_NAME.key(), "test_db");
+ props.setProperty(META_SYNC_TABLE_NAME.key(), "test_table");
+ props.setProperty(META_SYNC_DATAHUB_DATASET_ENV.key(), "INVALID_ENV");
+
+ // Then
+ assertThrows(IllegalArgumentException.class, () -> {
+ new HoodieDataHubDatasetIdentifier(props);
+ });
+ }
+}
\ No newline at end of file
diff --git a/packaging/hudi-datahub-sync-bundle/pom.xml b/packaging/hudi-datahub-sync-bundle/pom.xml
index fc3f0afa525c..a918164ced94 100644
--- a/packaging/hudi-datahub-sync-bundle/pom.xml
+++ b/packaging/hudi-datahub-sync-bundle/pom.xml
@@ -73,7 +73,7 @@
org.apache.hudi:hudi-sync-common
org.apache.hudi:hudi-datahub-sync
- io.acryl:datahub-client
+ io.acryl:datahub-client-java8
com.beust:jcommander
commons-io:commons-io
org.apache.httpcomponents:httpasyncclient