From 3cb874fd53390b4465a8a8b9acd615b1b7f362cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20G=C3=B3mez=20Villamor?= Date: Wed, 18 Dec 2024 05:17:14 +0100 Subject: [PATCH] [HUDI-8616] DataHub meta sync improvements (#12456) --- hudi-sync/hudi-datahub-sync/pom.xml | 20 +- ...Logger.java => DataHubResponseLogger.java} | 6 +- .../hudi/sync/datahub/DataHubSyncClient.java | 320 ++++++++++++------ .../hudi/sync/datahub/DataHubSyncTool.java | 72 +++- .../sync/datahub/DataHubTableProperties.java | 134 ++++++++ .../datahub/config/DataHubSyncConfig.java | 76 ++++- .../HoodieDataHubDatasetIdentifier.java | 43 ++- .../sync/datahub/util/SchemaFieldsUtil.java | 56 +++ .../sync/datahub/TestDataHubSyncClient.java | 142 +++++++- .../datahub/config/TestDataHubSyncConfig.java | 2 +- .../TestHoodieDataHubDatasetIdentifier.java | 147 ++++++++ packaging/hudi-datahub-sync-bundle/pom.xml | 2 +- 12 files changed, 891 insertions(+), 129 deletions(-) rename hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/{DatahubResponseLogger.java => DataHubResponseLogger.java} (91%) create mode 100644 hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubTableProperties.java create mode 100644 hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/util/SchemaFieldsUtil.java create mode 100644 hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestHoodieDataHubDatasetIdentifier.java diff --git a/hudi-sync/hudi-datahub-sync/pom.xml b/hudi-sync/hudi-datahub-sync/pom.xml index a03c12af80ca7..733d1090d985e 100644 --- a/hudi-sync/hudi-datahub-sync/pom.xml +++ b/hudi-sync/hudi-datahub-sync/pom.xml @@ -34,14 +34,14 @@ jar - 0.8.45 + 0.15.0rc20 4.1.5 io.acryl - datahub-client + datahub-client-java8 ${datahub.version} @@ -85,6 +85,14 @@ ${project.version} + + + org.apache.hudi + hudi-hadoop-mr + ${project.version} + compile + + org.apache.hudi @@ -113,6 +121,14 @@ kryo-shaded test + + + org.apache.hudi + hudi-hive-sync + ${project.version} + test + + diff --git a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DatahubResponseLogger.java b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubResponseLogger.java similarity index 91% rename from hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DatahubResponseLogger.java rename to hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubResponseLogger.java index d7d6945f515b1..277cffd22101f 100644 --- a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DatahubResponseLogger.java +++ b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubResponseLogger.java @@ -27,12 +27,12 @@ /** * Handle responses to requests to Datahub Metastore. Just logs them. */ -public class DatahubResponseLogger implements Callback { - private static final Logger LOG = LoggerFactory.getLogger(DatahubResponseLogger.class); +public class DataHubResponseLogger implements Callback { + private static final Logger LOG = LoggerFactory.getLogger(DataHubResponseLogger.class); @Override public void onCompletion(MetadataWriteResponse response) { - LOG.info("Completed Datahub RestEmitter request. " + LOG.info("Completed DataHub RestEmitter request. " + "Status: " + (response.isSuccess() ? " succeeded" : " failed")); if (!response.isSuccess()) { LOG.error("Request failed. " + response); diff --git a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java index ac3f474d67695..44ce04f59e68f 100644 --- a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java +++ b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java @@ -21,54 +21,72 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; import org.apache.hudi.hive.SchemaDifference; import org.apache.hudi.sync.common.HoodieSyncClient; import org.apache.hudi.sync.common.HoodieSyncException; import org.apache.hudi.sync.datahub.config.DataHubSyncConfig; +import org.apache.hudi.sync.datahub.config.HoodieDataHubDatasetIdentifier; +import org.apache.hudi.sync.datahub.util.SchemaFieldsUtil; +import com.linkedin.common.BrowsePathEntry; +import com.linkedin.common.BrowsePathEntryArray; +import com.linkedin.common.BrowsePathsV2; import com.linkedin.common.Status; +import com.linkedin.common.SubTypes; +import com.linkedin.common.UrnArray; import com.linkedin.common.urn.DatasetUrn; -import com.linkedin.data.template.SetMode; -import com.linkedin.data.template.StringMap; -import com.linkedin.dataset.DatasetProperties; -import com.linkedin.schema.ArrayType; -import com.linkedin.schema.BooleanType; -import com.linkedin.schema.BytesType; -import com.linkedin.schema.EnumType; -import com.linkedin.schema.FixedType; -import com.linkedin.schema.MapType; -import com.linkedin.schema.NullType; -import com.linkedin.schema.NumberType; -import com.linkedin.schema.OtherSchema; -import com.linkedin.schema.RecordType; -import com.linkedin.schema.SchemaField; -import com.linkedin.schema.SchemaFieldArray; -import com.linkedin.schema.SchemaFieldDataType; -import com.linkedin.schema.SchemaMetadata; -import com.linkedin.schema.StringType; -import com.linkedin.schema.UnionType; +import com.linkedin.common.urn.Urn; +import com.linkedin.container.Container; +import com.linkedin.container.ContainerProperties; +import com.linkedin.data.template.StringArray; +import com.linkedin.domain.Domains; +import com.linkedin.metadata.aspect.patch.builder.DatasetPropertiesPatchBuilder; +import com.linkedin.mxe.MetadataChangeProposal; +import datahub.client.MetadataWriteResponse; import datahub.client.rest.RestEmitter; import datahub.event.MetadataChangeProposalWrapper; -import org.apache.avro.AvroTypeException; +import io.datahubproject.schematron.converters.avro.AvroSchemaConverter; import org.apache.avro.Schema; import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; +import java.util.stream.Stream; public class DataHubSyncClient extends HoodieSyncClient { + private static final Logger LOG = LoggerFactory.getLogger(DataHubSyncClient.class); + protected final DataHubSyncConfig config; private final DatasetUrn datasetUrn; + private final Urn databaseUrn; + private final String tableName; + private final String databaseName; private static final Status SOFT_DELETE_FALSE = new Status().setRemoved(false); public DataHubSyncClient(DataHubSyncConfig config, HoodieTableMetaClient metaClient) { super(config, metaClient); this.config = config; - this.datasetUrn = config.datasetIdentifier.getDatasetUrn(); + HoodieDataHubDatasetIdentifier datasetIdentifier = + config.getDatasetIdentifier(); + this.datasetUrn = datasetIdentifier.getDatasetUrn(); + this.databaseUrn = datasetIdentifier.getDatabaseUrn(); + this.tableName = datasetIdentifier.getTableName(); + this.databaseName = datasetIdentifier.getDatabaseName(); } @Override @@ -76,45 +94,171 @@ public Option getLastCommitTimeSynced(String tableName) { throw new UnsupportedOperationException("Not supported: `getLastCommitTimeSynced`"); } + protected Option getLastCommitTime() { + return getActiveTimeline().lastInstant().map(HoodieInstant::requestedTime); + } + + protected Option getLastCommitCompletionTime() { + int countInstants = getActiveTimeline().countInstants(); + return getActiveTimeline() + .getInstantsOrderedByCompletionTime() + .skip(countInstants - 1) + .findFirst() + .map(HoodieInstant::getCompletionTime) + .map(Option::of).orElseGet(Option::empty); + } + @Override public void updateLastCommitTimeSynced(String tableName) { - updateTableProperties(tableName, Collections.singletonMap(HOODIE_LAST_COMMIT_TIME_SYNC, getActiveTimeline().lastInstant().get().requestedTime())); + Option lastCommitTime = getLastCommitTime(); + if (lastCommitTime.isPresent()) { + updateTableProperties(tableName, Collections.singletonMap(HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitTime.get())); + } else { + LOG.error("Failed to get last commit time"); + } + + Option lastCommitCompletionTime = getLastCommitCompletionTime(); + if (lastCommitCompletionTime.isPresent()) { + updateTableProperties(tableName, Collections.singletonMap(HOODIE_LAST_COMMIT_COMPLETION_TIME_SYNC, lastCommitCompletionTime.get())); + } else { + LOG.error("Failed to get last commit completion time"); + } + } + + private MetadataChangeProposal createDatasetPropertiesAspect(String tableName, Map tableProperties) { + DatasetPropertiesPatchBuilder datasetPropertiesPatchBuilder = new DatasetPropertiesPatchBuilder().urn(datasetUrn); + if (tableProperties != null) { + tableProperties.forEach(datasetPropertiesPatchBuilder::addCustomProperty); + } + if (tableName != null) { + datasetPropertiesPatchBuilder.setName(tableName); + } + return datasetPropertiesPatchBuilder.build(); } @Override public boolean updateTableProperties(String tableName, Map tableProperties) { - MetadataChangeProposalWrapper propertiesChangeProposal = MetadataChangeProposalWrapper.builder() - .entityType("dataset") - .entityUrn(datasetUrn) - .upsert() - .aspect(new DatasetProperties().setCustomProperties(new StringMap(tableProperties))) - .build(); - - DatahubResponseLogger responseLogger = new DatahubResponseLogger(); + // Use PATCH API to avoid overwriting existing properties + MetadataChangeProposal proposal = createDatasetPropertiesAspect(tableName, tableProperties); + DataHubResponseLogger responseLogger = new DataHubResponseLogger(); try (RestEmitter emitter = config.getRestEmitter()) { - emitter.emit(propertiesChangeProposal, responseLogger).get(); + Future future = emitter.emit(proposal, responseLogger); + future.get(); return true; } catch (Exception e) { - throw new HoodieDataHubSyncException("Fail to change properties for Dataset " + datasetUrn + ": " - + tableProperties, e); + if (!config.suppressExceptions()) { + throw new HoodieDataHubSyncException( + "Failed to sync properties for Dataset " + datasetUrn + ": " + tableProperties, e); + } else { + LOG.error("Failed to sync properties for Dataset {}: {}", datasetUrn, tableProperties, e); + return false; + } } } @Override public void updateTableSchema(String tableName, MessageType schema, SchemaDifference schemaDifference) { try (RestEmitter emitter = config.getRestEmitter()) { - DatahubResponseLogger responseLogger = new DatahubResponseLogger(); - MetadataChangeProposalWrapper schemaChange = createSchemaMetadataUpdate(tableName); - emitter.emit(schemaChange, responseLogger).get(); - - // When updating an entity, it is necessary to set its soft-delete status to false, or else the update won't get - // reflected in the UI. - MetadataChangeProposalWrapper softDeleteUndoProposal = createUndoSoftDelete(); - emitter.emit(softDeleteUndoProposal, responseLogger).get(); + DataHubResponseLogger responseLogger = new DataHubResponseLogger(); + + Stream proposals = + Stream.of(createContainerEntity(), createDatasetEntity()).flatMap(stream -> stream); + + // Execute all proposals in parallel and collect futures + List> futures = proposals.map( + p -> { + try { + return emitter.emit(p, responseLogger); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + ).collect(Collectors.toList()); + + List successfulResults = new ArrayList<>(); + List failures = new ArrayList<>(); + + for (Future future : futures) { + try { + successfulResults.add(future.get(30, TimeUnit.SECONDS)); + } catch (TimeoutException e) { + failures.add(new HoodieDataHubSyncException("Operation timed out", e)); + } catch (InterruptedException | ExecutionException e) { + failures.add(e); + } + } + + if (!failures.isEmpty()) { + if (!config.suppressExceptions()) { + throw new HoodieDataHubSyncException("Failed to sync " + failures.size() + " operations", failures.get(0)); + } else { + for (Throwable failure : failures) { + LOG.error("Failed to sync operation", failure); + } + } + } } catch (Exception e) { - throw new HoodieDataHubSyncException("Fail to change schema for Dataset " + datasetUrn, e); + if (!config.suppressExceptions()) { + throw new HoodieDataHubSyncException(String.format("Failed to sync metadata for dataset %s", tableName), e); + } else { + LOG.error("Failed to sync metadata for dataset {}", tableName, e); + } + } + } + + private MetadataChangeProposalWrapper createContainerAspect(Urn entityUrn, Urn containerUrn) { + MetadataChangeProposalWrapper containerProposal = MetadataChangeProposalWrapper.builder() + .entityType(entityUrn.getEntityType()) + .entityUrn(entityUrn) + .upsert() + .aspect(new Container().setContainer(containerUrn)) + .build(); + return containerProposal; + } + + private MetadataChangeProposalWrapper createBrowsePathsAspect(Urn entityUrn, List path) { + BrowsePathEntryArray browsePathEntryArray = new BrowsePathEntryArray(path); + MetadataChangeProposalWrapper browsePathsProposal = MetadataChangeProposalWrapper.builder() + .entityType(entityUrn.getEntityType()) + .entityUrn(entityUrn) + .upsert() + .aspect(new BrowsePathsV2().setPath(browsePathEntryArray)) + .build(); + return browsePathsProposal; + } + + private MetadataChangeProposalWrapper createDomainAspect(Urn entityUrn) { + try { + Urn domainUrn = Urn.createFromString(config.getDomainIdentifier()); + MetadataChangeProposalWrapper attachDomainProposal = MetadataChangeProposalWrapper.builder() + .entityType(entityUrn.getEntityType()) + .entityUrn(entityUrn) + .upsert() + .aspect(new Domains().setDomains(new UrnArray(domainUrn))) + .build(); + return attachDomainProposal; + } catch (URISyntaxException e) { + LOG.warn("Failed to create domain URN from string: {}", config.getDomainIdentifier()); } + return null; + } + + private Stream createContainerEntity() { + MetadataChangeProposalWrapper containerEntityProposal = MetadataChangeProposalWrapper.builder() + .entityType("container") + .entityUrn(databaseUrn) + .upsert() + .aspect(new ContainerProperties().setName(databaseName)) + .build(); + + Stream resultStream = Stream.of( + containerEntityProposal, + createSubTypeAspect(databaseUrn, "Database"), + createBrowsePathsAspect(databaseUrn, Collections.emptyList()), createStatusAspect(databaseUrn), + config.attachDomain() ? createDomainAspect(databaseUrn) : null + ).filter(Objects::nonNull); + return resultStream; } @Override @@ -127,42 +271,60 @@ public void close() { // no op; } - private MetadataChangeProposalWrapper createUndoSoftDelete() { - MetadataChangeProposalWrapper softDeleteUndoProposal = MetadataChangeProposalWrapper.builder() - .entityType("dataset") - .entityUrn(datasetUrn) + private MetadataChangeProposalWrapper createStatusAspect(Urn urn) { + MetadataChangeProposalWrapper softDeleteUndoProposal = MetadataChangeProposalWrapper.builder() + .entityType(urn.getEntityType()) + .entityUrn(urn) .upsert() .aspect(SOFT_DELETE_FALSE) - .aspectName("status") .build(); return softDeleteUndoProposal; } - private MetadataChangeProposalWrapper createSchemaMetadataUpdate(String tableName) { + private MetadataChangeProposalWrapper createSubTypeAspect(Urn urn, String subType) { + MetadataChangeProposalWrapper subTypeProposal = MetadataChangeProposalWrapper.builder() + .entityType(urn.getEntityType()) + .entityUrn(urn) + .upsert() + .aspect(new SubTypes().setTypeNames(new StringArray(subType))) + .build(); + return subTypeProposal; + } + + private MetadataChangeProposalWrapper createSchemaMetadataAspect(String tableName) { Schema avroSchema = getAvroSchemaWithoutMetadataFields(metaClient); - List fields = avroSchema.getFields().stream().map(f -> new SchemaField() - .setFieldPath(f.name()) - .setType(toSchemaFieldDataType(f.schema().getType())) - .setDescription(f.doc(), SetMode.IGNORE_NULL) - .setNativeDataType(f.schema().getType().getName())).collect(Collectors.toList()); + AvroSchemaConverter avroSchemaConverter = AvroSchemaConverter.builder().build(); + com.linkedin.schema.SchemaMetadata schemaMetadata = avroSchemaConverter.toDataHubSchema( + avroSchema, + false, + false, + datasetUrn.getPlatformEntity(), + null + ); - final SchemaMetadata.PlatformSchema platformSchema = new SchemaMetadata.PlatformSchema(); - platformSchema.setOtherSchema(new OtherSchema().setRawSchema(avroSchema.toString())); + // Reorder fields to relocate _hoodie_ metadata fields to the end + schemaMetadata.setFields(SchemaFieldsUtil.reorderPrefixedFields(schemaMetadata.getFields(), "_hoodie_")); return MetadataChangeProposalWrapper.builder() .entityType("dataset") .entityUrn(datasetUrn) .upsert() - .aspect(new SchemaMetadata() - .setSchemaName(tableName) - .setVersion(0) - .setHash("") - .setPlatform(datasetUrn.getPlatformEntity()) - .setPlatformSchema(platformSchema) - .setFields(new SchemaFieldArray(fields))) + .aspect(schemaMetadata) .build(); } + private Stream createDatasetEntity() { + Stream result = Stream.of( + createStatusAspect(datasetUrn), + createSubTypeAspect(datasetUrn, "Table"), + createBrowsePathsAspect(datasetUrn, Collections.singletonList(new BrowsePathEntry().setUrn(databaseUrn).setId(databaseName))), + createContainerAspect(datasetUrn, databaseUrn), + createSchemaMetadataAspect(tableName), + config.attachDomain() ? createDomainAspect(datasetUrn) : null + ).filter(Objects::nonNull); + return result; + } + Schema getAvroSchemaWithoutMetadataFields(HoodieTableMetaClient metaClient) { try { return new TableSchemaResolver(metaClient).getTableAvroSchema(true); @@ -170,36 +332,4 @@ Schema getAvroSchemaWithoutMetadataFields(HoodieTableMetaClient metaClient) { throw new HoodieSyncException("Failed to read avro schema", e); } } - - static SchemaFieldDataType toSchemaFieldDataType(Schema.Type type) { - switch (type) { - case BOOLEAN: - return new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new BooleanType())); - case INT: - case LONG: - case FLOAT: - case DOUBLE: - return new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new NumberType())); - case MAP: - return new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new MapType())); - case ENUM: - return new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new EnumType())); - case NULL: - return new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new NullType())); - case ARRAY: - return new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new ArrayType())); - case BYTES: - return new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new BytesType())); - case FIXED: - return new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new FixedType())); - case UNION: - return new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new UnionType())); - case RECORD: - return new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new RecordType())); - case STRING: - return new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new StringType())); - default: - throw new AvroTypeException("Unexpected type: " + type.getName()); - } - } -} +} \ No newline at end of file diff --git a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncTool.java b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncTool.java index 182506fadece6..148fc61dd26e0 100644 --- a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncTool.java +++ b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncTool.java @@ -27,10 +27,18 @@ import com.beust.jcommander.JCommander; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Map; import java.util.Properties; +import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH; +import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_CONDITIONAL_SYNC; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME; +import static org.apache.hudi.sync.datahub.DataHubTableProperties.HoodieTableMetadata; +import static org.apache.hudi.sync.datahub.DataHubTableProperties.getTableProperties; /** * To sync with DataHub via REST APIs. @@ -39,9 +47,12 @@ * @see https://datahubproject.io/ */ public class DataHubSyncTool extends HoodieSyncTool { + private static final Logger LOG = LoggerFactory.getLogger(DataHubSyncTool.class); protected final DataHubSyncConfig config; - private final HoodieTableMetaClient metaClient; + protected final HoodieTableMetaClient metaClient; + protected DataHubSyncClient syncClient; + private final String tableName; public DataHubSyncTool(Properties props) { this(props, HadoopConfigUtils.createHadoopConf(props), Option.empty()); @@ -50,20 +61,59 @@ public DataHubSyncTool(Properties props) { public DataHubSyncTool(Properties props, Configuration hadoopConf, Option metaClientOption) { super(props, hadoopConf); this.config = new DataHubSyncConfig(props); + this.tableName = config.getString(META_SYNC_TABLE_NAME); this.metaClient = metaClientOption.orElseGet(() -> buildMetaClient(config)); + this.syncClient = new DataHubSyncClient(config, metaClient); } - /** - * Sync to a DataHub Dataset. - * - * @implNote DataHub sync is an experimental feature, which overwrites the DataHub Dataset's schema - * and last commit time sync'ed upon every invocation. - */ @Override public void syncHoodieTable() { - try (DataHubSyncClient syncClient = new DataHubSyncClient(config, metaClient)) { - syncClient.updateTableSchema(config.getString(META_SYNC_TABLE_NAME), null, null); - syncClient.updateLastCommitTimeSynced(config.getString(META_SYNC_TABLE_NAME)); + try { + LOG.info("Syncing target Hoodie table with DataHub dataset({}). DataHub URL: {}, basePath: {}", + tableName, config.getDataHubServerEndpoint(), config.getString(META_SYNC_BASE_PATH)); + + syncSchema(); + syncTableProperties(); + updateLastCommitTimeIfNeeded(); + + LOG.info("Sync completed for table {}", tableName); + } catch (Exception e) { + throw new RuntimeException("Failed to sync table " + tableName + " to DataHub", e); + } finally { + close(); + } + } + + private void syncSchema() throws Exception { + syncClient.updateTableSchema(tableName, null, null); + LOG.info("Schema synced for table {}", tableName); + } + + private void syncTableProperties() throws Exception { + MessageType storageSchema = syncClient.getStorageSchema(); + HoodieTableMetadata tableMetadata = new HoodieTableMetadata(metaClient, storageSchema); + Map tableProperties = getTableProperties(config, tableMetadata); + syncClient.updateTableProperties(tableName, tableProperties); + LOG.info("Properties synced for table {}", tableName); + } + + private void updateLastCommitTimeIfNeeded() throws Exception { + boolean shouldUpdateLastCommitTime = !config.getBoolean(META_SYNC_CONDITIONAL_SYNC); + if (shouldUpdateLastCommitTime) { + syncClient.updateLastCommitTimeSynced(tableName); + LOG.info("Updated last sync time for table {}", tableName); + } + } + + @Override + public void close() { + if (syncClient != null) { + try { + syncClient.close(); + syncClient = null; + } catch (Exception e) { + LOG.error("Error closing DataHub sync client", e); + } } } @@ -77,4 +127,4 @@ public static void main(String[] args) { } new DataHubSyncTool(params.toProps()).syncHoodieTable(); } -} +} \ No newline at end of file diff --git a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubTableProperties.java b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubTableProperties.java new file mode 100644 index 0000000000000..f951d67a4ff34 --- /dev/null +++ b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubTableProperties.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.sync.datahub; + +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.ConfigUtils; +import org.apache.hudi.sync.common.util.SparkDataSourceTableUtils; +import org.apache.hudi.sync.datahub.config.DataHubSyncConfig; +import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getInputFormatClassName; +import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getOutputFormatClassName; +import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getSerDeClassName; +import static org.apache.hudi.sync.datahub.config.DataHubSyncConfig.HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD; +import static org.apache.hudi.sync.datahub.config.DataHubSyncConfig.HIVE_TABLE_SERDE_PROPERTIES; +import static org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_BASE_FILE_FORMAT; +import static org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_BASE_PATH; +import static org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_PARTITION_FIELDS; +import static org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_SPARK_VERSION; + +public class DataHubTableProperties { + + private static final Logger LOG = LoggerFactory.getLogger(DataHubTableProperties.class); + + public static final String HOODIE_META_SYNC_DATAHUB_TABLE_PROPERTIES = "hoodie.meta.sync.datahub.table.properties"; + public static final String HUDI_TABLE_TYPE = "hudi.table.type"; + public static final String HUDI_TABLE_VERSION = "hudi.table.version"; + public static final String HUDI_BASE_PATH = "hudi.base.path"; + public static final String HUDI_PARTITION_FIELDS = "hudi.partition.fields"; + + public static final ConfigProperty DATAHUB_TABLE_PROPERTIES = + ConfigProperty.key(HOODIE_META_SYNC_DATAHUB_TABLE_PROPERTIES) + .defaultValue("") + .withDocumentation("Additional properties to be attached to the DataHub dataset, specified as key1=val1,key2=val2"); + + public static Map getTableProperties(DataHubSyncConfig config, HoodieTableMetadata tableMetadata) { + Map properties = new HashMap<>(); + addBasicHudiTableProperties(properties, config, tableMetadata); + addPartitioningInformation(properties, config); + addUserDefinedProperties(properties, config); + addSparkRelatedProperties(properties, config, tableMetadata); + return properties; + } + + private static void addBasicHudiTableProperties(Map properties, DataHubSyncConfig config, HoodieTableMetadata tableMetadata) { + properties.put(HUDI_TABLE_TYPE, tableMetadata.getTableType()); + properties.put(HUDI_TABLE_VERSION, tableMetadata.getTableVersion()); + properties.put(HUDI_BASE_PATH, config.getString(META_SYNC_BASE_PATH)); + } + + private static void addPartitioningInformation(Map properties, DataHubSyncConfig config) { + if (!config.getSplitStrings(META_SYNC_PARTITION_FIELDS).isEmpty()) { + properties.put(HUDI_PARTITION_FIELDS, String.join(",", config.getSplitStrings(META_SYNC_PARTITION_FIELDS))); + } + } + + private static void addUserDefinedProperties(Map properties, DataHubSyncConfig config) { + Map userDefinedProps = ConfigUtils.toMap(config.getString(DATAHUB_TABLE_PROPERTIES)); + properties.putAll(userDefinedProps); + } + + private static void addSparkRelatedProperties(Map properties, DataHubSyncConfig config, HoodieTableMetadata tableMetadata) { + Map sparkProperties = SparkDataSourceTableUtils.getSparkTableProperties( + config.getSplitStrings(META_SYNC_PARTITION_FIELDS), + config.getString(META_SYNC_SPARK_VERSION), + config.getInt(HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD), + tableMetadata.getSchema() + ); + properties.putAll(sparkProperties); + properties.putAll(getSerdeProperties(config, false)); + } + + private static Map getSerdeProperties(DataHubSyncConfig config, boolean readAsOptimized) { + HoodieFileFormat baseFileFormat = HoodieFileFormat.valueOf(config.getStringOrDefault(META_SYNC_BASE_FILE_FORMAT).toUpperCase()); + String inputFormatClassName = getInputFormatClassName(baseFileFormat, false, false); + String outputFormatClassName = getOutputFormatClassName(baseFileFormat); + String serDeFormatClassName = getSerDeClassName(baseFileFormat); + + Map serdeProperties = ConfigUtils.toMap(config.getString(HIVE_TABLE_SERDE_PROPERTIES)); + serdeProperties.put("inputFormat", inputFormatClassName); + serdeProperties.put("outputFormat", outputFormatClassName); + serdeProperties.put("serdeClass", serDeFormatClassName); + Map sparkSerdeProperties = SparkDataSourceTableUtils.getSparkSerdeProperties(readAsOptimized, config.getString(META_SYNC_BASE_PATH)); + sparkSerdeProperties.forEach((k, v) -> serdeProperties.putIfAbsent(k.startsWith("spark.") ? k : "spark." + k, v)); + LOG.info("Serde Properties : {}", serdeProperties); + return serdeProperties; + } + + public static class HoodieTableMetadata { + private final HoodieTableMetaClient metaClient; + private final MessageType schema; + + public HoodieTableMetadata(HoodieTableMetaClient metaClient, MessageType schema) { + this.metaClient = metaClient; + this.schema = schema; + } + + public String getTableType() { + return metaClient.getTableType().name(); + } + + public String getTableVersion() { + return metaClient.getTableConfig().getTableVersion().toString(); + } + + public MessageType getSchema() { + return schema; + } + } +} \ No newline at end of file diff --git a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/DataHubSyncConfig.java b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/DataHubSyncConfig.java index 2a96171b3c125..a54c7c85e48ff 100644 --- a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/DataHubSyncConfig.java +++ b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/DataHubSyncConfig.java @@ -29,6 +29,8 @@ import com.beust.jcommander.Parameter; import com.beust.jcommander.ParametersDelegate; import datahub.client.rest.RestEmitter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.concurrent.Immutable; @@ -43,6 +45,8 @@ description = "Configurations used by the Hudi to sync metadata to DataHub.") public class DataHubSyncConfig extends HoodieSyncConfig { + private static final Logger LOG = LoggerFactory.getLogger(DataHubSyncConfig.class); + public static final ConfigProperty META_SYNC_DATAHUB_DATASET_IDENTIFIER_CLASS = ConfigProperty .key("hoodie.meta.sync.datahub.dataset.identifier.class") .defaultValue(HoodieDataHubDatasetIdentifier.class.getName()) @@ -80,12 +84,47 @@ public class DataHubSyncConfig extends HoodieSyncConfig { .markAdvanced() .withDocumentation("Environment to use when pushing entities to Datahub"); - public final HoodieDataHubDatasetIdentifier datasetIdentifier; + public static final ConfigProperty META_SYNC_DATAHUB_DOMAIN_IDENTIFIER = ConfigProperty + .key("hoodie.meta.sync.datahub.domain.identifier") + .noDefaultValue() + .markAdvanced() + .withDocumentation("Domain identifier for the dataset. When provided all datasets will be attached to the provided domain. Must be in urn form (e.g., urn:li:domain:_domain_id)."); + + public static final ConfigProperty HIVE_TABLE_SERDE_PROPERTIES = ConfigProperty + .key("hoodie.datasource.hive_sync.serde_properties") + .noDefaultValue() + .markAdvanced() + .withDocumentation("Serde properties to hive table."); + + + public static final ConfigProperty HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD = ConfigProperty + .key("hoodie.datasource.hive_sync.schema_string_length_thresh") + .defaultValue(4000) + .markAdvanced() + .withDocumentation(""); + + public static final ConfigProperty META_SYNC_DATAHUB_SYNC_SUPPRESS_EXCEPTIONS = ConfigProperty + .key("hoodie.meta.sync.datahub.sync.suppress.exceptions") + .defaultValue(true) + .markAdvanced() + .withDocumentation("Suppress exceptions during DataHub sync. This is true by default to ensure that when running inline with other jobs, the sync does not fail the job."); public DataHubSyncConfig(Properties props) { super(props); + // Log warning if the domain identifier is provided but is not in urn form + if (contains(META_SYNC_DATAHUB_DOMAIN_IDENTIFIER) && !getString(META_SYNC_DATAHUB_DOMAIN_IDENTIFIER).startsWith("urn:li:domain:")) { + LOG.warn( + "Domain identifier must be in urn form (e.g., urn:li:domain:_domain_id). Provided {}. Will remove this from configuration.", + getString(META_SYNC_DATAHUB_DOMAIN_IDENTIFIER)); + this.props.remove(META_SYNC_DATAHUB_DOMAIN_IDENTIFIER.key()); + } + } + + public HoodieDataHubDatasetIdentifier getDatasetIdentifier() { String identifierClass = getStringOrDefault(META_SYNC_DATAHUB_DATASET_IDENTIFIER_CLASS); - datasetIdentifier = (HoodieDataHubDatasetIdentifier) ReflectionUtils.loadClass(identifierClass, new Class[] {Properties.class}, props); + // Use reflection to instantiate the class + HoodieDataHubDatasetIdentifier datasetIdentifier = (HoodieDataHubDatasetIdentifier) ReflectionUtils.loadClass(identifierClass, new Class[] {Properties.class}, props); + return datasetIdentifier; } public RestEmitter getRestEmitter() { @@ -98,6 +137,22 @@ public RestEmitter getRestEmitter() { } } + public Boolean suppressExceptions() { + return getBoolean(META_SYNC_DATAHUB_SYNC_SUPPRESS_EXCEPTIONS); + } + + public String getDataHubServerEndpoint() { + return getString(META_SYNC_DATAHUB_EMITTER_SERVER); + } + + public boolean attachDomain() { + return contains(META_SYNC_DATAHUB_DOMAIN_IDENTIFIER); + } + + public String getDomainIdentifier() { + return getString(META_SYNC_DATAHUB_DOMAIN_IDENTIFIER); + } + public static class DataHubSyncConfigParams { @ParametersDelegate() @@ -122,6 +177,14 @@ public static class DataHubSyncConfigParams { @Parameter(names = {"--dataset-env"}, description = "Which Datahub Environment to use when pushing entities") public String datasetEnv; + @Parameter(names = { + "--domain"}, description = "Domain identifier for the dataset. When provided all datasets will be attached to the provided domain. Must be in urn form (e.g., urn:li:domain:_domain_id).") + public String domainIdentifier; + + @Parameter(names = { + "--suppress-exceptions"}, description = "Suppress exceptions during DataHub sync.") + public String suppressExceptions; + public boolean isHelp() { return hoodieSyncConfigParams.isHelp(); } @@ -134,7 +197,14 @@ public Properties toProps() { props.setPropertyIfNonNull(META_SYNC_DATAHUB_EMITTER_SUPPLIER_CLASS.key(), emitterSupplierClass); props.setPropertyIfNonNull(META_SYNC_DATAHUB_DATAPLATFORM_NAME.key(), dataPlatformName); props.setPropertyIfNonNull(META_SYNC_DATAHUB_DATASET_ENV.key(), datasetEnv); + props.setPropertyIfNonNull(META_SYNC_DATAHUB_DOMAIN_IDENTIFIER.key(), domainIdentifier); + // We want the default behavior of DataHubSync Tool when run as command line to NOT suppress exceptions + if (suppressExceptions == null) { + props.setProperty(META_SYNC_DATAHUB_SYNC_SUPPRESS_EXCEPTIONS.key(), "false"); + } else { + props.setProperty(META_SYNC_DATAHUB_SYNC_SUPPRESS_EXCEPTIONS.key(), String.valueOf(suppressExceptions)); + } return props; } } -} +} \ No newline at end of file diff --git a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java index 6c8ea076ffc0d..664fab31e8e1e 100644 --- a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java +++ b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java @@ -22,6 +22,8 @@ import com.linkedin.common.FabricType; import com.linkedin.common.urn.DataPlatformUrn; import com.linkedin.common.urn.DatasetUrn; +import com.linkedin.common.urn.Urn; +import io.datahubproject.models.util.DatabaseKey; import java.util.Properties; @@ -41,19 +43,50 @@ public class HoodieDataHubDatasetIdentifier { public static final FabricType DEFAULT_DATAHUB_ENV = FabricType.DEV; protected final Properties props; + private final DatasetUrn datasetUrn; + private final Urn databaseUrn; + private final String tableName; + private final String databaseName; public HoodieDataHubDatasetIdentifier(Properties props) { this.props = props; - } - - public DatasetUrn getDatasetUrn() { + if (props == null || props.isEmpty()) { + throw new IllegalArgumentException("Properties cannot be null or empty"); + } DataHubSyncConfig config = new DataHubSyncConfig(props); - return new DatasetUrn( + this.datasetUrn = new DatasetUrn( createDataPlatformUrn(config.getStringOrDefault(META_SYNC_DATAHUB_DATAPLATFORM_NAME)), createDatasetName(config.getString(META_SYNC_DATABASE_NAME), config.getString(META_SYNC_TABLE_NAME)), FabricType.valueOf(config.getStringOrDefault(META_SYNC_DATAHUB_DATASET_ENV)) ); + + this.tableName = config.getString(META_SYNC_TABLE_NAME); + this.databaseName = config.getString(META_SYNC_DATABASE_NAME); + + DatabaseKey databaseKey = DatabaseKey.builder() + .platform(config.getStringOrDefault(META_SYNC_DATAHUB_DATAPLATFORM_NAME)) + .instance(config.getStringOrDefault(META_SYNC_DATAHUB_DATASET_ENV)) + .database(this.databaseName) + .build(); + + this.databaseUrn = databaseKey.asUrn(); + } + + public DatasetUrn getDatasetUrn() { + return this.datasetUrn; + } + + public Urn getDatabaseUrn() { + return this.databaseUrn; + } + + public String getTableName() { + return this.tableName; + } + + public String getDatabaseName() { + return this.databaseName; } private static DataPlatformUrn createDataPlatformUrn(String platformUrn) { @@ -63,4 +96,4 @@ private static DataPlatformUrn createDataPlatformUrn(String platformUrn) { private static String createDatasetName(String databaseName, String tableName) { return String.format("%s.%s", databaseName, tableName); } -} +} \ No newline at end of file diff --git a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/util/SchemaFieldsUtil.java b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/util/SchemaFieldsUtil.java new file mode 100644 index 0000000000000..0b3229ac38c73 --- /dev/null +++ b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/util/SchemaFieldsUtil.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.sync.datahub.util; + +import com.linkedin.schema.SchemaField; +import com.linkedin.schema.SchemaFieldArray; +import io.datahubproject.models.util.FieldPath; + +import java.util.ArrayList; +import java.util.List; + +public class SchemaFieldsUtil { + public static SchemaFieldArray reorderPrefixedFields(SchemaFieldArray fields, String prefix) { + if (fields == null || fields.isEmpty()) { + return fields; + } + + // Split the list into underscore and non-underscore fields while preserving order + List prefixedFields = new ArrayList<>(); + List normalFields = new ArrayList<>(); + + for (SchemaField field : fields) { + FieldPath fieldPath = new FieldPath(field.getFieldPath()); + + if (fieldPath.isTopLevel() && fieldPath.leafFieldName().startsWith(prefix)) { + prefixedFields.add(field); + } else { + normalFields.add(field); + } + } + + // Combine the lists with underscore fields at the end + List result = new ArrayList<>(normalFields.size() + prefixedFields.size()); + result.addAll(normalFields); + result.addAll(prefixedFields); + + return new SchemaFieldArray(result); + } +} \ No newline at end of file diff --git a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/TestDataHubSyncClient.java b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/TestDataHubSyncClient.java index a19e035ad1d0a..837536fb7950b 100644 --- a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/TestDataHubSyncClient.java +++ b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/TestDataHubSyncClient.java @@ -21,9 +21,11 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.sync.datahub.config.DataHubSyncConfig; +import com.linkedin.mxe.MetadataChangeProposal; import datahub.client.MetadataWriteResponse; import datahub.client.rest.RestEmitter; import datahub.event.MetadataChangeProposalWrapper; @@ -40,15 +42,22 @@ import java.io.IOException; import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS; +import static org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_SYNC_SUPPRESS_EXCEPTIONS; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class TestDataHubSyncClient { @@ -65,7 +74,7 @@ public class TestDataHubSyncClient { @BeforeAll public static void beforeAll() throws IOException { TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ " - + "{\"name\": \"ts\",\"type\": \"long\"}]}"; + + "{\"name\": \"ts\",\"type\": \"long\"}]}"; avroSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA); @@ -73,9 +82,9 @@ public static void beforeAll() throws IOException { props.put("hoodie.table.name", "some_table"); tableBasePath = Paths.get(tmpDir.toString(), "some_table").toString(); HoodieTableMetaClient.newTableBuilder() - .fromProperties(props) - .setTableType(HoodieTableType.MERGE_ON_READ.name()) - .initTable(HadoopFSUtils.getStorageConf(new Configuration()), tableBasePath); + .fromProperties(props) + .setTableType(HoodieTableType.MERGE_ON_READ.name()) + .initTable(HadoopFSUtils.getStorageConf(new Configuration()), tableBasePath); } @BeforeEach @@ -94,19 +103,125 @@ public void testUpdateTableSchemaInvokesRestEmitter() throws IOException { props.put(META_SYNC_BASE_PATH.key(), tableBasePath); Mockito.when( - restEmitterMock.emit(any(MetadataChangeProposalWrapper.class), Mockito.any()) + restEmitterMock.emit(any(MetadataChangeProposalWrapper.class), Mockito.any()) ).thenReturn( - CompletableFuture.completedFuture(MetadataWriteResponse.builder().build()) + CompletableFuture.completedFuture(MetadataWriteResponse.builder().build()) ); DatahubSyncConfigStub configStub = new DatahubSyncConfigStub(props, restEmitterMock); DataHubSyncClientStub dhClient = new DataHubSyncClientStub(configStub); dhClient.updateTableSchema("some_table", null, null); - verify(restEmitterMock, times(2)).emit(any(MetadataChangeProposalWrapper.class), + verify(restEmitterMock, times(9)).emit(any(MetadataChangeProposalWrapper.class), Mockito.any()); } + @Test + public void testUpdateTableProperties() throws Exception { + Properties props = new Properties(); + props.put(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), DummyPartitionValueExtractor.class.getName()); + props.put(META_SYNC_BASE_PATH.key(), tableBasePath); + + when(restEmitterMock.emit(any(MetadataChangeProposal.class), any())) + .thenReturn(CompletableFuture.completedFuture(MetadataWriteResponse.builder().build())); + + DatahubSyncConfigStub configStub = new DatahubSyncConfigStub(props, restEmitterMock); + DataHubSyncClientStub dhClient = new DataHubSyncClientStub(configStub); + + Map properties = new HashMap<>(); + properties.put("key1", "value1"); + properties.put("key2", "value2"); + + boolean result = dhClient.updateTableProperties("some_table", properties); + assertTrue(result); + verify(restEmitterMock, times(1)).emit(any(MetadataChangeProposal.class), any()); + } + + @Test + public void testUpdateTablePropertiesFailure() throws Exception { + Properties props = new Properties(); + props.put(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), DummyPartitionValueExtractor.class.getName()); + props.put(META_SYNC_BASE_PATH.key(), tableBasePath); + props.put(META_SYNC_DATAHUB_SYNC_SUPPRESS_EXCEPTIONS.key(), "false"); + + CompletableFuture failedFuture = new CompletableFuture<>(); + failedFuture.completeExceptionally(new IOException("Emission failed")); + when(restEmitterMock.emit(any(MetadataChangeProposalWrapper.class), any())) + .thenReturn(failedFuture); + + DatahubSyncConfigStub configStub = new DatahubSyncConfigStub(props, restEmitterMock); + DataHubSyncClientStub dhClient = new DataHubSyncClientStub(configStub); + + Map properties = new HashMap<>(); + properties.put("key1", "value1"); + + assertThrows(HoodieDataHubSyncException.class, () -> + dhClient.updateTableProperties("some_table", properties)); + } + + @Test + public void testGetLastCommitTimeSynced() { + Properties props = new Properties(); + props.put(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), DummyPartitionValueExtractor.class.getName()); + props.put(META_SYNC_BASE_PATH.key(), tableBasePath); + + DatahubSyncConfigStub configStub = new DatahubSyncConfigStub(props, restEmitterMock); + DataHubSyncClientStub dhClient = new DataHubSyncClientStub(configStub); + + assertThrows(UnsupportedOperationException.class, () -> + dhClient.getLastCommitTimeSynced("some_table")); + } + + @Test + public void testGetMetastoreSchema() { + Properties props = new Properties(); + props.put(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), DummyPartitionValueExtractor.class.getName()); + props.put(META_SYNC_BASE_PATH.key(), tableBasePath); + + DatahubSyncConfigStub configStub = new DatahubSyncConfigStub(props, restEmitterMock); + DataHubSyncClientStub dhClient = new DataHubSyncClientStub(configStub); + + assertThrows(UnsupportedOperationException.class, () -> + dhClient.getMetastoreSchema("some_table")); + } + + @Test + public void testUpdateTableSchemaWithEmitterFailure() throws Exception { + Properties props = new Properties(); + props.put(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), DummyPartitionValueExtractor.class.getName()); + props.put(META_SYNC_BASE_PATH.key(), tableBasePath); + props.put(META_SYNC_DATAHUB_SYNC_SUPPRESS_EXCEPTIONS.key(), "false"); + + // Create a failed future that will throw when accessed + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(new ExecutionException("Emission failed", new IOException())); + + // Configure mock to return the failed future for ALL calls + when(restEmitterMock.emit((MetadataChangeProposalWrapper) any(), any())).thenReturn(future); + + DatahubSyncConfigStub configStub = new DatahubSyncConfigStub(props, restEmitterMock); + DataHubSyncClientStub dhClient = new DataHubSyncClientStub(configStub); + + assertThrows(HoodieDataHubSyncException.class, () -> + dhClient.updateTableSchema("some_table", null, null)); + } + + @Test + public void testUpdateLastCommitTimeSynced() throws Exception { + Properties props = new Properties(); + props.put(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), DummyPartitionValueExtractor.class.getName()); + props.put(META_SYNC_BASE_PATH.key(), tableBasePath); + + when(restEmitterMock.emit(any(MetadataChangeProposal.class), any())) + .thenReturn(CompletableFuture.completedFuture(MetadataWriteResponse.builder().build())); + + DatahubSyncConfigStub configStub = new DatahubSyncConfigStub(props, restEmitterMock); + DataHubSyncClientStub dhClient = new DataHubSyncClientStub(configStub); + + dhClient.updateLastCommitTimeSynced("some_table"); + verify(restEmitterMock, times(2)).emit(any(MetadataChangeProposal.class), any()); + } + public class DataHubSyncClientStub extends DataHubSyncClient { public DataHubSyncClientStub(DataHubSyncConfig config) { @@ -118,6 +233,16 @@ Schema getAvroSchemaWithoutMetadataFields(HoodieTableMetaClient metaClient) { return avroSchema; } + @Override + protected Option getLastCommitTime() { + return Option.of("1000"); + } + + @Override + protected Option getLastCommitCompletionTime() { + return Option.of("1000"); + } + } public class DatahubSyncConfigStub extends DataHubSyncConfig { @@ -133,6 +258,7 @@ public DatahubSyncConfigStub(Properties props, RestEmitter emitterMock) { public RestEmitter getRestEmitter() { return emitterMock; } + } -} +} \ No newline at end of file diff --git a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestDataHubSyncConfig.java b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestDataHubSyncConfig.java index 9d92970c3b2f2..f693389fa4d87 100644 --- a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestDataHubSyncConfig.java +++ b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestDataHubSyncConfig.java @@ -47,7 +47,7 @@ void testInstantiationWithProps() { Properties props = new Properties(); props.setProperty(META_SYNC_DATAHUB_DATASET_IDENTIFIER_CLASS.key(), DummyIdentifier.class.getName()); DataHubSyncConfig syncConfig = new DataHubSyncConfig(props); - DatasetUrn datasetUrn = syncConfig.datasetIdentifier.getDatasetUrn(); + DatasetUrn datasetUrn = syncConfig.getDatasetIdentifier().getDatasetUrn(); assertEquals("foo", datasetUrn.getPlatformEntity().getPlatformNameEntity()); assertEquals("project.database.table", datasetUrn.getDatasetNameEntity()); assertEquals(FabricType.PROD, datasetUrn.getOriginEntity()); diff --git a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestHoodieDataHubDatasetIdentifier.java b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestHoodieDataHubDatasetIdentifier.java new file mode 100644 index 0000000000000..52af11f0a8547 --- /dev/null +++ b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestHoodieDataHubDatasetIdentifier.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.sync.datahub.config; + +import com.linkedin.common.FabricType; +import com.linkedin.common.urn.DatasetUrn; +import com.linkedin.common.urn.Urn; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.util.Properties; + +import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME; +import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME; +import static org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_DATAPLATFORM_NAME; +import static org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_DATASET_ENV; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class TestHoodieDataHubDatasetIdentifier { + + private Properties props; + + @BeforeEach + void setUp() { + props = new Properties(); + } + + @Test + @DisplayName("Test constructor with default values") + void testConstructorWithDefaultValues() { + // Given + props.setProperty(META_SYNC_DATABASE_NAME.key(), "test_db"); + props.setProperty(META_SYNC_TABLE_NAME.key(), "test_table"); + + // When + HoodieDataHubDatasetIdentifier identifier = new HoodieDataHubDatasetIdentifier(props); + + // Then + DatasetUrn datasetUrn = identifier.getDatasetUrn(); + assertNotNull(datasetUrn); + assertEquals(HoodieDataHubDatasetIdentifier.DEFAULT_HOODIE_DATAHUB_PLATFORM_NAME, + datasetUrn.getPlatformEntity().getId()); + assertEquals("test_db.test_table", datasetUrn.getDatasetNameEntity()); + assertEquals(HoodieDataHubDatasetIdentifier.DEFAULT_DATAHUB_ENV, datasetUrn.getOriginEntity()); + } + + @Test + @DisplayName("Test constructor with custom values") + void testConstructorWithCustomValues() { + // Given + props.setProperty(META_SYNC_DATABASE_NAME.key(), "custom_db"); + props.setProperty(META_SYNC_TABLE_NAME.key(), "custom_table"); + props.setProperty(META_SYNC_DATAHUB_DATAPLATFORM_NAME.key(), "custom_platform"); + props.setProperty(META_SYNC_DATAHUB_DATASET_ENV.key(), "PROD"); + + // When + HoodieDataHubDatasetIdentifier identifier = new HoodieDataHubDatasetIdentifier(props); + + // Then + DatasetUrn datasetUrn = identifier.getDatasetUrn(); + assertNotNull(datasetUrn); + assertEquals("custom_platform", datasetUrn.getPlatformEntity().getId()); + assertEquals("custom_db.custom_table", datasetUrn.getDatasetNameEntity()); + assertEquals(FabricType.PROD, datasetUrn.getOriginEntity()); + } + + @Test + @DisplayName("Test getDatabaseUrn") + void testGetDatabaseUrn() { + // Given + props.setProperty(META_SYNC_DATABASE_NAME.key(), "test_db"); + props.setProperty(META_SYNC_TABLE_NAME.key(), "test_table"); + props.setProperty(META_SYNC_DATAHUB_DATASET_ENV.key(), "PROD"); + + // When + HoodieDataHubDatasetIdentifier identifier = new HoodieDataHubDatasetIdentifier(props); + + // Then + Urn databaseUrn = identifier.getDatabaseUrn(); + assertNotNull(databaseUrn); + assertFalse(databaseUrn.toString().contains("test_db")); + assertFalse(databaseUrn.toString().contains("PROD")); + assertTrue(databaseUrn.toString().startsWith("urn:li:container:")); + } + + @Test + @DisplayName("Test getTableName") + void testGetTableName() { + // Given + String tableName = "test_table"; + props.setProperty(META_SYNC_DATABASE_NAME.key(), "test_db"); + props.setProperty(META_SYNC_TABLE_NAME.key(), tableName); + + // When + HoodieDataHubDatasetIdentifier identifier = new HoodieDataHubDatasetIdentifier(props); + + // Then + assertEquals(tableName, identifier.getTableName()); + } + + @Test + @DisplayName("Test constructor with missing required properties") + void testConstructorWithMissingProperties() { + // Given empty properties + + // Then + assertThrows(IllegalArgumentException.class, () -> { + new HoodieDataHubDatasetIdentifier(props); + }); + } + + @Test + @DisplayName("Test constructor with invalid environment") + void testConstructorWithInvalidEnvironment() { + // Given + props.setProperty(META_SYNC_DATABASE_NAME.key(), "test_db"); + props.setProperty(META_SYNC_TABLE_NAME.key(), "test_table"); + props.setProperty(META_SYNC_DATAHUB_DATASET_ENV.key(), "INVALID_ENV"); + + // Then + assertThrows(IllegalArgumentException.class, () -> { + new HoodieDataHubDatasetIdentifier(props); + }); + } +} \ No newline at end of file diff --git a/packaging/hudi-datahub-sync-bundle/pom.xml b/packaging/hudi-datahub-sync-bundle/pom.xml index fc3f0afa525c0..a918164ced94c 100644 --- a/packaging/hudi-datahub-sync-bundle/pom.xml +++ b/packaging/hudi-datahub-sync-bundle/pom.xml @@ -73,7 +73,7 @@ org.apache.hudi:hudi-sync-common org.apache.hudi:hudi-datahub-sync - io.acryl:datahub-client + io.acryl:datahub-client-java8 com.beust:jcommander commons-io:commons-io org.apache.httpcomponents:httpasyncclient