diff --git a/metadata-service/iceberg-catalog/build.gradle b/metadata-service/iceberg-catalog/build.gradle index f9a4c88c6b7323..9ae0d74e13e9d6 100644 --- a/metadata-service/iceberg-catalog/build.gradle +++ b/metadata-service/iceberg-catalog/build.gradle @@ -13,6 +13,7 @@ dependencies { implementation project(':metadata-models') implementation project(':metadata-utils') implementation project(':metadata-operation-context') + implementation project(':metadata-io') implementation project(':metadata-integration:java:datahub-schematron:lib') implementation 'org.apache.iceberg:iceberg-core:1.6.1' implementation 'org.apache.iceberg:iceberg-aws:1.6.1' diff --git a/metadata-service/iceberg-catalog/src/main/java/io/datahubproject/iceberg/catalog/DataHubIcebergWarehouse.java b/metadata-service/iceberg-catalog/src/main/java/io/datahubproject/iceberg/catalog/DataHubIcebergWarehouse.java index 2615ae670a7864..12567c1fbae7c9 100644 --- a/metadata-service/iceberg-catalog/src/main/java/io/datahubproject/iceberg/catalog/DataHubIcebergWarehouse.java +++ b/metadata-service/iceberg-catalog/src/main/java/io/datahubproject/iceberg/catalog/DataHubIcebergWarehouse.java @@ -1,21 +1,22 @@ package io.datahubproject.iceberg.catalog; import static com.linkedin.metadata.Constants.*; -import static com.linkedin.metadata.utils.GenericRecordUtils.serializeAspect; import static io.datahubproject.iceberg.catalog.Utils.*; -import com.google.common.util.concurrent.Striped; -import com.linkedin.common.AuditStamp; +import com.google.common.annotations.VisibleForTesting; import com.linkedin.common.FabricType; +import com.linkedin.common.Status; import com.linkedin.common.urn.DatasetUrn; import com.linkedin.common.urn.Urn; +import com.linkedin.container.Container; import com.linkedin.data.template.RecordTemplate; import com.linkedin.dataplatforminstance.IcebergWarehouseInfo; +import com.linkedin.dataset.DatasetProperties; import com.linkedin.dataset.IcebergCatalogInfo; import com.linkedin.entity.EnvelopedAspect; -import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.aspect.batch.AspectsBatch; import com.linkedin.metadata.entity.EntityService; -import com.linkedin.mxe.MetadataChangeProposal; +import com.linkedin.metadata.entity.IngestResult; import com.linkedin.platformresource.PlatformResourceInfo; import com.linkedin.secret.DataHubSecretValue; import com.linkedin.util.Pair; @@ -24,13 +25,13 @@ import io.datahubproject.metadata.services.SecretService; import java.net.URISyntaxException; import java.util.*; -import java.util.concurrent.locks.Lock; import lombok.Getter; import lombok.SneakyThrows; -import org.apache.iceberg.CatalogUtil; +import lombok.extern.slf4j.Slf4j; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.*; +@Slf4j public class DataHubIcebergWarehouse { public static final String DATASET_ICEBERG_METADATA_ASPECT_NAME = "icebergCatalogInfo"; @@ -47,11 +48,8 @@ public class DataHubIcebergWarehouse { @Getter private final String platformInstance; - // TODO: Need to handle locks for deployments with multiple GMS replicas. - private static final Striped resourceLocks = - Striped.lazyWeakLock(Runtime.getRuntime().availableProcessors() * 2); - - private DataHubIcebergWarehouse( + @VisibleForTesting + DataHubIcebergWarehouse( String platformInstance, IcebergWarehouseInfo icebergWarehouse, EntityService entityService, @@ -121,39 +119,96 @@ public String getDataRoot() { return icebergWarehouse.getDataRoot(); } + @SneakyThrows public Optional getDatasetUrn(TableIdentifier tableIdentifier) { Urn resourceUrn = resourceUrn(tableIdentifier); - PlatformResourceInfo platformResourceInfo = - (PlatformResourceInfo) - entityService.getLatestAspect( - operationContext, resourceUrn, PLATFORM_RESOURCE_INFO_ASPECT_NAME); - if (platformResourceInfo == null) { + Optional platformResourceInfo = + getLatestAspectNonRemoved(resourceUrn, PLATFORM_RESOURCE_INFO_ASPECT_NAME); + + if (platformResourceInfo.isEmpty()) { return Optional.empty(); } - try { - return Optional.of(DatasetUrn.createFromString(platformResourceInfo.getPrimaryKey())); - } catch (URISyntaxException e) { - throw new RuntimeException("Invalid dataset urn " + platformResourceInfo.getPrimaryKey(), e); + + return Optional.of(DatasetUrn.createFromString(platformResourceInfo.get().getPrimaryKey())); + } + + private Optional getLatestAspectNonRemoved( + Urn urn, String aspectName) { + Map> aspectsMap = + entityService.getLatestAspects( + operationContext, Set.of(urn), Set.of(STATUS_ASPECT_NAME, aspectName), false); + + if (aspectsMap == null || aspectsMap.isEmpty()) { + return Optional.empty(); + } + List aspects = aspectsMap.get(urn); + if (aspects == null || aspects.isEmpty()) { + return Optional.empty(); } + + T result = null; + + for (RecordTemplate aspect : aspects) { + if (aspect instanceof Status status) { + if (status.isRemoved()) { + return Optional.empty(); + } + } else { + result = (T) aspect; + } + } + + return Optional.ofNullable(result); } - public IcebergCatalogInfo getIcebergMetadata(TableIdentifier tableIdentifier) { + private Optional getLatestEnvelopedAspectNonRemoved(Urn urn, String aspectName) + throws URISyntaxException { + + Map> aspectsMap = + entityService.getLatestEnvelopedAspects( + operationContext, Set.of(urn), Set.of(STATUS_ASPECT_NAME, aspectName), false); + + if (aspectsMap == null || aspectsMap.isEmpty()) { + return Optional.empty(); + } + List aspects = aspectsMap.get(urn); + if (aspects == null || aspects.isEmpty()) { + return Optional.empty(); + } + + EnvelopedAspect result = null; + + for (EnvelopedAspect aspect : aspects) { + if (STATUS_ASPECT_NAME.equals(aspect.getName())) { + Status status = new Status(aspect.getValue().data()); + if (status.isRemoved()) { + return Optional.empty(); + } + } else { + result = aspect; + } + } + + return Optional.ofNullable(result); + } + + public Optional getIcebergMetadata(TableIdentifier tableIdentifier) { Optional datasetUrn = getDatasetUrn(tableIdentifier); if (datasetUrn.isEmpty()) { - return null; + return Optional.empty(); } - IcebergCatalogInfo icebergMeta = - (IcebergCatalogInfo) - entityService.getLatestAspect( - operationContext, datasetUrn.get(), DATASET_ICEBERG_METADATA_ASPECT_NAME); + Optional icebergMeta = + getLatestAspectNonRemoved(datasetUrn.get(), DATASET_ICEBERG_METADATA_ASPECT_NAME); - if (icebergMeta == null) { - throw new IllegalStateException( + if (icebergMeta.isEmpty()) { + // possibly some deletion cleanup is pending; log error & return as if dataset doesn't exist. + log.error( String.format( "IcebergMetadata not found for resource %s, dataset %s", resourceUrn(tableIdentifier), datasetUrn.get())); } + return icebergMeta; } @@ -165,19 +220,19 @@ public Pair getIcebergMetadataEnveloped( } try { - EnvelopedAspect existingEnveloped = - entityService.getLatestEnvelopedAspect( - operationContext, - DATASET_ENTITY_NAME, - datasetUrn.get(), - DATASET_ICEBERG_METADATA_ASPECT_NAME); - if (existingEnveloped == null) { - throw new IllegalStateException( + Optional existingEnveloped = + getLatestEnvelopedAspectNonRemoved( + datasetUrn.get(), DATASET_ICEBERG_METADATA_ASPECT_NAME); + if (existingEnveloped.isEmpty()) { + // possibly some deletion cleanup is pending; log error & return as if dataset doesn't + // exist. + log.error( String.format( "IcebergMetadata not found for resource %s, dataset %s", resourceUrn(tableIdentifier), datasetUrn.get())); + return null; } - return Pair.of(existingEnveloped, datasetUrn.get()); + return Pair.of(existingEnveloped.get(), datasetUrn.get()); } catch (Exception e) { throw new RuntimeException( "Error fetching IcebergMetadata aspect for dataset " + datasetUrn.get(), e); @@ -186,79 +241,121 @@ public Pair getIcebergMetadataEnveloped( public boolean deleteDataset(TableIdentifier tableIdentifier) { Urn resourceUrn = resourceUrn(tableIdentifier); + if (!entityService.exists(operationContext, resourceUrn)) { + return false; + } - // guard against concurrent modifications that depend on the resource (rename table/view) - Lock lock = resourceLocks.get(resourceUrn); - lock.lock(); - try { - if (!entityService.exists(operationContext, resourceUrn)) { - return false; - } - Optional urn = getDatasetUrn(tableIdentifier); + Optional datasetUrn = getDatasetUrn(tableIdentifier); + if (datasetUrn.isEmpty()) { + log.warn("Dataset urn not found for platform resource {}; cleaning up resource", resourceUrn); entityService.deleteUrn(operationContext, resourceUrn); - urn.ifPresent(x -> entityService.deleteUrn(operationContext, x)); - return true; - } finally { - lock.unlock(); + return false; } + + IcebergBatch icebergBatch = newIcebergBatch(operationContext); + icebergBatch.softDeleteEntity(resourceUrn, PLATFORM_RESOURCE_ENTITY_NAME); + icebergBatch.softDeleteEntity(datasetUrn.get(), DATASET_ENTITY_NAME); + + AspectsBatch aspectsBatch = icebergBatch.asAspectsBatch(); + List ingestResults = + entityService.ingestProposal(operationContext, aspectsBatch, false); + + boolean result = true; + for (IngestResult ingestResult : ingestResults) { + if (ingestResult.getResult().isNoOp()) { + result = false; + break; + } + } + + entityService.deleteUrn(operationContext, resourceUrn); + entityService.deleteUrn(operationContext, datasetUrn.get()); + + return result; } public DatasetUrn createDataset( - TableIdentifier tableIdentifier, boolean view, AuditStamp auditStamp) { + TableIdentifier tableIdentifier, boolean view, IcebergBatch icebergBatch) { String datasetName = platformInstance + "." + UUID.randomUUID(); DatasetUrn datasetUrn = new DatasetUrn(platformUrn(), datasetName, fabricType()); - createResource(datasetUrn, tableIdentifier, view, auditStamp); + + createResource(datasetUrn, tableIdentifier, view, icebergBatch); + return datasetUrn; } - public DatasetUrn renameDataset( - TableIdentifier fromTableId, TableIdentifier toTableId, boolean view, AuditStamp auditStamp) { + public void renameDataset(TableIdentifier fromTableId, TableIdentifier toTableId, boolean view) { + + Optional optDatasetUrn = getDatasetUrn(fromTableId); + if (optDatasetUrn.isEmpty()) { + throw noSuchEntity(view, fromTableId); + } + + DatasetUrn datasetUrn = optDatasetUrn.get(); + + IcebergBatch icebergBatch = newIcebergBatch(operationContext); + icebergBatch.softDeleteEntity(resourceUrn(fromTableId), PLATFORM_RESOURCE_ENTITY_NAME); + createResource(datasetUrn, toTableId, view, icebergBatch); + + DatasetProperties datasetProperties = + new DatasetProperties() + .setName(toTableId.name()) + .setQualifiedName(fullTableName(platformInstance, toTableId)); - // guard against concurrent modifications to the resource (other renames, deletion) - Lock lock = resourceLocks.get(resourceUrn(fromTableId)); - lock.lock(); + IcebergBatch.EntityBatch datasetBatch = + icebergBatch.updateEntity(datasetUrn, DATASET_ENTITY_NAME); + datasetBatch.aspect(DATASET_PROPERTIES_ASPECT_NAME, datasetProperties); + + if (!fromTableId.namespace().equals(toTableId.namespace())) { + Container container = + new Container().setContainer(containerUrn(platformInstance, toTableId.namespace())); + datasetBatch.aspect(CONTAINER_ASPECT_NAME, container); + } try { - Optional optDatasetUrn = getDatasetUrn(fromTableId); - if (optDatasetUrn.isEmpty()) { - if (view) { - throw new NoSuchViewException( - "No such view %s", fullTableName(platformInstance, fromTableId)); - } else { - throw new NoSuchTableException( - "No such table %s", fullTableName(platformInstance, fromTableId)); - } + AspectsBatch aspectsBatch = icebergBatch.asAspectsBatch(); + entityService.ingestProposal(operationContext, aspectsBatch, false); + } catch (ValidationException e) { + if (!entityService.exists(operationContext, resourceUrn(fromTableId), false)) { + // someone else deleted "fromTable" before we could get through + throw noSuchEntity(view, fromTableId); } - - DatasetUrn datasetUrn = optDatasetUrn.get(); - try { - createResource(datasetUrn, toTableId, view, auditStamp); - } catch (ValidationException e) { + if (entityService.exists(operationContext, resourceUrn(toTableId), true)) { throw new AlreadyExistsException( "%s already exists: %s", view ? "View" : "Table", fullTableName(platformInstance, toTableId)); } - entityService.deleteUrn(operationContext, resourceUrn(fromTableId)); - return datasetUrn; - } finally { - lock.unlock(); + throw new IllegalStateException( + String.format( + "Rename operation failed inexplicably, from %s to %s in warehouse %s", + fromTableId, toTableId, platformInstance)); } + + entityService.deleteUrn(operationContext, resourceUrn(fromTableId)); + } + + private RuntimeException noSuchEntity(boolean view, TableIdentifier tableIdentifier) { + return view + ? new NoSuchViewException( + "No such view %s", fullTableName(platformInstance, tableIdentifier)) + : new NoSuchTableException( + "No such table %s", fullTableName(platformInstance, tableIdentifier)); } private void createResource( - DatasetUrn datasetUrn, TableIdentifier tableIdentifier, boolean view, AuditStamp auditStamp) { + DatasetUrn datasetUrn, + TableIdentifier tableIdentifier, + boolean view, + IcebergBatch icebergBatch) { PlatformResourceInfo resourceInfo = new PlatformResourceInfo().setPrimaryKey(datasetUrn.toString()); resourceInfo.setResourceType(view ? "icebergView" : "icebergTable"); - MetadataChangeProposal mcp = new MetadataChangeProposal(); - mcp.setEntityUrn(resourceUrn(tableIdentifier)); - mcp.setEntityType(PLATFORM_RESOURCE_ENTITY_NAME); - mcp.setAspectName(PLATFORM_RESOURCE_INFO_ASPECT_NAME); - mcp.setChangeType(ChangeType.CREATE_ENTITY); - mcp.setAspect(serializeAspect(resourceInfo)); - - entityService.ingestProposal(operationContext, mcp, auditStamp, false); + icebergBatch.createEntity( + resourceUrn(tableIdentifier), + PLATFORM_RESOURCE_ENTITY_NAME, + PLATFORM_RESOURCE_INFO_ASPECT_NAME, + resourceInfo); } private FabricType fabricType() { @@ -268,8 +365,15 @@ private FabricType fabricType() { @SneakyThrows private Urn resourceUrn(TableIdentifier tableIdentifier) { return Urn.createFromString( - String.format( - "urn:li:platformResource:%s.%s", - PLATFORM_NAME, CatalogUtil.fullTableName(platformInstance, tableIdentifier))); + String.format("urn:li:platformResource:%s.%s", PLATFORM_NAME, tableName(tableIdentifier))); + } + + private String tableName(TableIdentifier tableIdentifier) { + return fullTableName(platformInstance, tableIdentifier); + } + + @VisibleForTesting + IcebergBatch newIcebergBatch(OperationContext operationContext) { + return new IcebergBatch(operationContext); } } diff --git a/metadata-service/iceberg-catalog/src/main/java/io/datahubproject/iceberg/catalog/DataHubRestCatalog.java b/metadata-service/iceberg-catalog/src/main/java/io/datahubproject/iceberg/catalog/DataHubRestCatalog.java index cbe58bf70546b5..41ebbd9c365b3c 100644 --- a/metadata-service/iceberg-catalog/src/main/java/io/datahubproject/iceberg/catalog/DataHubRestCatalog.java +++ b/metadata-service/iceberg-catalog/src/main/java/io/datahubproject/iceberg/catalog/DataHubRestCatalog.java @@ -1,13 +1,11 @@ package io.datahubproject.iceberg.catalog; import static com.linkedin.metadata.Constants.*; -import static com.linkedin.metadata.utils.GenericRecordUtils.serializeAspect; import static io.datahubproject.iceberg.catalog.Utils.*; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; -import com.linkedin.common.AuditStamp; import com.linkedin.common.SubTypes; -import com.linkedin.common.urn.DatasetUrn; import com.linkedin.common.urn.Urn; import com.linkedin.container.Container; import com.linkedin.container.ContainerProperties; @@ -15,7 +13,7 @@ import com.linkedin.data.template.StringArray; import com.linkedin.data.template.StringMap; import com.linkedin.dataset.DatasetProperties; -import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.aspect.batch.AspectsBatch; import com.linkedin.metadata.authorization.PoliciesConfig; import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.query.filter.Condition; @@ -27,7 +25,6 @@ import com.linkedin.metadata.search.SearchResult; import com.linkedin.metadata.search.utils.QueryUtils; import com.linkedin.metadata.utils.CriterionUtils; -import com.linkedin.mxe.MetadataChangeProposal; import io.datahubproject.iceberg.catalog.credentials.CredentialProvider; import io.datahubproject.iceberg.catalog.credentials.S3CredentialProvider; import io.datahubproject.metadata.context.OperationContext; @@ -164,44 +161,21 @@ operationContext, containerUrn(platformInstance(), toTableId.namespace()))) { throw new NoSuchNamespaceException("Namespace does not exist: " + toTableId.namespace()); } } - AuditStamp auditStamp = auditStamp(); - DatasetUrn datasetUrn = warehouse.renameDataset(fromTableId, toTableId, false, auditStamp); - DatasetProperties datasetProperties = new DatasetProperties(); - datasetProperties.setName(toTableId.name()); - datasetProperties.setQualifiedName(fullTableName(platformInstance(), toTableId)); - - MetadataChangeProposal mcp = new MetadataChangeProposal(); - mcp.setEntityType(DATASET_ENTITY_NAME); - mcp.setAspectName(DATASET_PROPERTIES_ASPECT_NAME); - mcp.setEntityUrn(datasetUrn); - mcp.setAspect(serializeAspect(datasetProperties)); - mcp.setChangeType(ChangeType.UPSERT); - ingestMcp(mcp, auditStamp); - - if (!fromTableId.namespace().equals(toTableId.namespace())) { - Container container = new Container(); - container.setContainer(containerUrn(platformInstance(), toTableId.namespace())); - - MetadataChangeProposal containerMcp = new MetadataChangeProposal(); - containerMcp.setEntityType(DATASET_ENTITY_NAME); - containerMcp.setAspectName(CONTAINER_ASPECT_NAME); - containerMcp.setEntityUrn(datasetUrn); - containerMcp.setAspect(serializeAspect(container)); - containerMcp.setChangeType(ChangeType.UPSERT); - StringMap headers = - new StringMap( - Collections.singletonMap(SYNC_INDEX_UPDATE_HEADER_NAME, Boolean.toString(true))); - mcp.setHeaders(headers); - containerMcp.setHeaders(headers); - ingestMcp(containerMcp, auditStamp); - } + warehouse.renameDataset(fromTableId, toTableId, view); } @Override public void createNamespace(Namespace namespace, Map properties) { - AuditStamp auditStamp = auditStamp(); Urn containerUrn = containerUrn(platformInstance(), namespace); + IcebergBatch icebergBatch = newIcebergBatch(operationContext); + IcebergBatch.EntityBatch containerBatch = + icebergBatch.createEntity( + containerUrn, + CONTAINER_ENTITY_NAME, + CONTAINER_PROPERTIES_ASPECT_NAME, + containerProperties(namespace, properties)); + int nLevels = namespace.length(); if (nLevels > 1) { String[] parentLevels = Arrays.copyOfRange(namespace.levels(), 0, nLevels - 1); @@ -211,24 +185,17 @@ public void createNamespace(Namespace namespace, Map properties) "Parent namespace %s does not exist in platformInstance-catalog %s", Joiner.on(".").join(parentLevels), platformInstance()); } - ingestContainerAspect( - containerUrn, - CONTAINER_ASPECT_NAME, - new Container().setContainer(parentContainerUrn), - auditStamp); + + containerBatch.aspect( + CONTAINER_ASPECT_NAME, new Container().setContainer(parentContainerUrn)); } - ingestContainerAspect( - containerUrn, - SUB_TYPES_ASPECT_NAME, - new SubTypes().setTypeNames(new StringArray(CONTAINER_SUB_TYPE)), - auditStamp); + containerBatch.platformInstance(platformInstance()); - ingestContainerProperties(namespace, properties, auditStamp); + containerBatch.aspect( + SUB_TYPES_ASPECT_NAME, new SubTypes().setTypeNames(new StringArray(CONTAINER_SUB_TYPE))); - MetadataChangeProposal platformInstanceMcp = - platformInstanceMcp(platformInstance(), containerUrn, CONTAINER_ENTITY_NAME); - ingestMcp(platformInstanceMcp, auditStamp); + ingestBatch(icebergBatch); } @Override @@ -313,15 +280,11 @@ public boolean removeProperties(Namespace namespace, Set set) throw new UnsupportedOperationException(); } - private void ingestContainerProperties( - Namespace namespace, Map properties, AuditStamp auditStamp) { - ingestContainerAspect( - containerUrn(platformInstance(), namespace), - CONTAINER_PROPERTIES_ASPECT_NAME, - new ContainerProperties() - .setName(namespace.levels()[namespace.length() - 1]) - .setCustomProperties(new StringMap(properties)), - auditStamp); + private ContainerProperties containerProperties( + Namespace namespace, Map properties) { + return new ContainerProperties() + .setName(namespace.levels()[namespace.length() - 1]) + .setCustomProperties(new StringMap(properties)); } public UpdateNamespacePropertiesResponse updateNamespaceProperties( @@ -347,7 +310,13 @@ public UpdateNamespacePropertiesResponse updateNamespaceProperties( properties.putAll(request.updates()); properties.keySet().removeAll(request.removals()); - ingestContainerProperties(namespace, properties, auditStamp()); + IcebergBatch icebergBatch = newIcebergBatch(operationContext); + Urn containerUrn = containerUrn(platformInstance(), namespace); + icebergBatch + .updateEntity(containerUrn, CONTAINER_ENTITY_NAME) + .aspect(CONTAINER_PROPERTIES_ASPECT_NAME, containerProperties(namespace, properties)); + + ingestBatch(icebergBatch); return responseBuilder.build(); } @@ -358,26 +327,14 @@ public void close() throws IOException { this.closeableGroup.close(); } - private void ingestContainerAspect( - Urn containerUrn, String aspectName, RecordTemplate aspect, AuditStamp auditStamp) { - MetadataChangeProposal mcp = new MetadataChangeProposal(); - - mcp.setEntityUrn(containerUrn); - mcp.setEntityType(CONTAINER_ENTITY_NAME); - mcp.setAspectName(aspectName); - mcp.setAspect(serializeAspect(aspect)); - mcp.setChangeType(ChangeType.UPSERT); - - StringMap headers = - new StringMap( - Collections.singletonMap(SYNC_INDEX_UPDATE_HEADER_NAME, Boolean.toString(true))); - mcp.setHeaders(headers); - - ingestMcp(mcp, auditStamp); + @VisibleForTesting + IcebergBatch newIcebergBatch(OperationContext operationContext) { + return new IcebergBatch(operationContext); } - private void ingestMcp(MetadataChangeProposal mcp, AuditStamp auditStamp) { - entityService.ingestProposal(operationContext, mcp, auditStamp, false); + private void ingestBatch(IcebergBatch icebergBatch) { + AspectsBatch aspectsBatch = icebergBatch.asAspectsBatch(); + entityService.ingestProposal(operationContext, aspectsBatch, false); } private List listTablesOrViews(Namespace namespace, String typeName) { diff --git a/metadata-service/iceberg-catalog/src/main/java/io/datahubproject/iceberg/catalog/IcebergBatch.java b/metadata-service/iceberg-catalog/src/main/java/io/datahubproject/iceberg/catalog/IcebergBatch.java new file mode 100644 index 00000000000000..cd62548fd39ae1 --- /dev/null +++ b/metadata-service/iceberg-catalog/src/main/java/io/datahubproject/iceberg/catalog/IcebergBatch.java @@ -0,0 +1,163 @@ +package io.datahubproject.iceberg.catalog; + +import static com.linkedin.metadata.Constants.*; +import static com.linkedin.metadata.aspect.validation.ConditionalWriteValidator.HTTP_HEADER_IF_VERSION_MATCH; +import static com.linkedin.metadata.utils.GenericRecordUtils.serializeAspect; +import static io.datahubproject.iceberg.catalog.Utils.*; + +import com.google.common.annotations.VisibleForTesting; +import com.linkedin.common.AuditStamp; +import com.linkedin.common.DataPlatformInstance; +import com.linkedin.common.Status; +import com.linkedin.common.urn.Urn; +import com.linkedin.data.template.RecordTemplate; +import com.linkedin.data.template.StringMap; +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.aspect.batch.AspectsBatch; +import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl; +import com.linkedin.mxe.MetadataChangeProposal; +import io.datahubproject.metadata.context.OperationContext; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +public class IcebergBatch { + private List mcps = new ArrayList<>(); + + @Getter private final AuditStamp auditStamp; + + private final OperationContext operationContext; + + public IcebergBatch(OperationContext operationContext) { + this.operationContext = operationContext; + this.auditStamp = + new AuditStamp() + .setActor(operationContext.getActorContext().getActorUrn()) + .setTime(System.currentTimeMillis()); + } + + @VisibleForTesting + IcebergBatch(OperationContext operationContext, AuditStamp auditStamp) { + this.operationContext = operationContext; + this.auditStamp = auditStamp; + } + + @RequiredArgsConstructor(access = AccessLevel.PRIVATE) + abstract class EntityBatch { + private final Urn urn; + private final String entityName; + + public void aspect(String aspectName, RecordTemplate aspectData) { + mcps.add(newMcp(urn, entityName, aspectName, aspectData, changeType())); + } + + public void platformInstance(String platformInstanceName) { + DataPlatformInstance platformInstance = + new DataPlatformInstance() + .setPlatform(platformUrn()) + .setInstance(platformInstanceUrn(platformInstanceName)); + + aspect(DATA_PLATFORM_INSTANCE_ASPECT_NAME, platformInstance); + } + + abstract ChangeType changeType(); + } + + class CreateEntityBatch extends EntityBatch { + + private CreateEntityBatch(Urn urn, String entityName) { + super(urn, entityName); + } + + @Override + ChangeType changeType() { + return ChangeType.CREATE; + } + } + + class UpdateEntityBatch extends EntityBatch { + + private UpdateEntityBatch(Urn urn, String entityName) { + super(urn, entityName); + } + + @Override + ChangeType changeType() { + return ChangeType.UPDATE; + } + + @Override + public void platformInstance(String platformInstanceName) { + // disallow updates of platform instance + throw new UnsupportedOperationException(); + } + } + + public EntityBatch createEntity( + Urn urn, String entityName, String creationAspectName, RecordTemplate aspectData) { + mcps.add(newMcp(urn, entityName, creationAspectName, aspectData, ChangeType.CREATE_ENTITY)); + mcps.add( + newMcp( + urn, + entityName, + STATUS_ASPECT_NAME, + new Status().setRemoved(false), + ChangeType.CREATE)); + return new CreateEntityBatch(urn, entityName); + } + + public EntityBatch updateEntity(Urn urn, String entityName) { + return new UpdateEntityBatch(urn, entityName); + } + + public EntityBatch conditionalUpdateEntity( + Urn urn, + String entityName, + String aspectName, + RecordTemplate aspectData, + String existingVersion) { + MetadataChangeProposal mcp = newMcp(urn, entityName, aspectName, aspectData, ChangeType.UPDATE); + mcp.getHeaders().put(HTTP_HEADER_IF_VERSION_MATCH, existingVersion); + mcps.add(mcp); + return new UpdateEntityBatch(urn, entityName); + } + + public void softDeleteEntity(Urn urn, String entityName) { + // UPSERT instead of UPDATE here, for backward compatibility + // i.e. if there are existing datasets without Status aspect + mcps.add( + newMcp( + urn, entityName, STATUS_ASPECT_NAME, new Status().setRemoved(true), ChangeType.UPSERT)); + } + + private static MetadataChangeProposal newMcp( + Urn urn, + String entityName, + String aspectName, + RecordTemplate aspectData, + ChangeType changeType) { + + MetadataChangeProposal mcp = new MetadataChangeProposal(); + mcp.setEntityUrn(urn); + mcp.setEntityType(entityName); + mcp.setAspectName(aspectName); + mcp.setHeaders(new StringMap(Map.of(SYNC_INDEX_UPDATE_HEADER_NAME, Boolean.toString(true)))); + mcp.setAspect(serializeAspect(aspectData)); + mcp.setChangeType(changeType); + return mcp; + } + + public AspectsBatch asAspectsBatch() { + return AspectsBatchImpl.builder() + .mcps(mcps, auditStamp, operationContext.getRetrieverContext()) + .build(); + } + + @VisibleForTesting + List getMcps() { + return mcps; + } +} diff --git a/metadata-service/iceberg-catalog/src/main/java/io/datahubproject/iceberg/catalog/TableOrViewOpsDelegate.java b/metadata-service/iceberg-catalog/src/main/java/io/datahubproject/iceberg/catalog/TableOrViewOpsDelegate.java index 9f3ddc01d3fc41..1747bed717a05e 100644 --- a/metadata-service/iceberg-catalog/src/main/java/io/datahubproject/iceberg/catalog/TableOrViewOpsDelegate.java +++ b/metadata-service/iceberg-catalog/src/main/java/io/datahubproject/iceberg/catalog/TableOrViewOpsDelegate.java @@ -1,26 +1,23 @@ package io.datahubproject.iceberg.catalog; import static com.linkedin.metadata.Constants.*; -import static com.linkedin.metadata.Constants.VIEW_PROPERTIES_ASPECT_NAME; -import static com.linkedin.metadata.aspect.validation.ConditionalWriteValidator.HTTP_HEADER_IF_VERSION_MATCH; import static com.linkedin.metadata.utils.GenericRecordUtils.serializeAspect; import static io.datahubproject.iceberg.catalog.DataHubIcebergWarehouse.DATASET_ICEBERG_METADATA_ASPECT_NAME; import static io.datahubproject.iceberg.catalog.Utils.*; -import static io.datahubproject.iceberg.catalog.Utils.platformInstanceMcp; import static org.apache.commons.lang3.StringUtils.capitalize; -import com.linkedin.common.AuditStamp; +import com.google.common.annotations.VisibleForTesting; import com.linkedin.common.SubTypes; import com.linkedin.common.urn.DatasetUrn; import com.linkedin.container.Container; import com.linkedin.data.template.StringArray; -import com.linkedin.data.template.StringMap; import com.linkedin.dataset.DatasetProfile; import com.linkedin.dataset.DatasetProperties; import com.linkedin.dataset.IcebergCatalogInfo; import com.linkedin.dataset.ViewProperties; import com.linkedin.entity.EnvelopedAspect; import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.aspect.batch.AspectsBatch; import com.linkedin.metadata.authorization.PoliciesConfig; import com.linkedin.metadata.entity.EntityService; import com.linkedin.mxe.MetadataChangeProposal; @@ -28,6 +25,7 @@ import com.linkedin.util.Pair; import io.datahubproject.metadata.context.OperationContext; import io.datahubproject.schematron.converters.avro.AvroSchemaConverter; +import java.util.Optional; import java.util.Set; import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; @@ -40,7 +38,10 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.*; import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.view.*; +import org.apache.iceberg.view.SQLViewRepresentation; +import org.apache.iceberg.view.ViewMetadata; +import org.apache.iceberg.view.ViewMetadataParser; +import org.apache.iceberg.view.ViewRepresentation; @Slf4j abstract class TableOrViewOpsDelegate { @@ -68,13 +69,13 @@ abstract class TableOrViewOpsDelegate { } public M refresh() { - IcebergCatalogInfo icebergMeta = warehouse.getIcebergMetadata(tableIdentifier); + Optional icebergMeta = warehouse.getIcebergMetadata(tableIdentifier); - if (icebergMeta == null || !isExpectedType(icebergMeta.isView())) { + if (icebergMeta.isEmpty() || !isExpectedType(icebergMeta.get().isView())) { return null; } - String location = icebergMeta.getMetadataPointer(); + String location = icebergMeta.get().getMetadataPointer(); if (io == null) { String locationDir = location.substring(0, location.lastIndexOf("/")); io = @@ -125,7 +126,6 @@ protected void doCommit( } DatasetUrn datasetUrn; - AuditStamp auditStamp = auditStamp(); // attempt to commit io = fileIOFactory.createIO( @@ -134,9 +134,11 @@ protected void doCommit( Set.of(metadata.location())); String newMetadataLocation = metadataWriter.get(); + IcebergBatch icebergBatch = newIcebergBatch(operationContext); + if (creation) { try { - datasetUrn = warehouse.createDataset(tableIdentifier, isView(), auditStamp); + datasetUrn = warehouse.createDataset(tableIdentifier, isView(), icebergBatch); } catch (ValidationException e) { throw new AlreadyExistsException("%s already exists: %s", capitalize(type()), name()); } @@ -144,35 +146,25 @@ protected void doCommit( datasetUrn = existingDatasetAspect.getSecond(); } - MetadataChangeProposal icebergMcp = newMcp(DATASET_ICEBERG_METADATA_ASPECT_NAME, datasetUrn); - icebergMcp.setAspect( - serializeAspect( - new IcebergCatalogInfo().setMetadataPointer(newMetadataLocation).setView(isView()))); - + IcebergCatalogInfo icebergCatalogInfo = + new IcebergCatalogInfo().setMetadataPointer(newMetadataLocation).setView(isView()); + IcebergBatch.EntityBatch datasetBatch; if (creation) { - icebergMcp.setChangeType(ChangeType.CREATE_ENTITY); + datasetBatch = + icebergBatch.createEntity( + datasetUrn, + DATASET_ENTITY_NAME, + DATASET_ICEBERG_METADATA_ASPECT_NAME, + icebergCatalogInfo); } else { String existingVersion = existingDatasetAspect.getFirst().getSystemMetadata().getVersion(); - StringMap headers = icebergMcp.getHeaders(); - if (headers == null) { - headers = new StringMap(); - icebergMcp.setHeaders(headers); - } - headers.put(HTTP_HEADER_IF_VERSION_MATCH, existingVersion); - icebergMcp.setChangeType( - ChangeType.UPSERT); // ideally should be UPDATE, but seems not supported yet. - } - try { - ingestMcp(icebergMcp, auditStamp); - } catch (ValidationException e) { - if (creation) { - // this is likely because table/view already exists i.e. created concurrently in a race - // condition - throw new AlreadyExistsException("%s already exists: %s", capitalize(type()), name()); - } else { - throw new CommitFailedException( - "Cannot commit to %s %s: stale metadata", capitalize(type()), name()); - } + datasetBatch = + icebergBatch.conditionalUpdateEntity( + datasetUrn, + DATASET_ENTITY_NAME, + DATASET_ICEBERG_METADATA_ASPECT_NAME, + icebergCatalogInfo, + existingVersion); } if (base == null || (base.currentSchemaId() != metadata.currentSchemaId())) { @@ -181,63 +173,56 @@ protected void doCommit( AvroSchemaConverter converter = AvroSchemaConverter.builder().build(); SchemaMetadata schemaMetadata = converter.toDataHubSchema(avroSchema, false, false, platformUrn(), null); - MetadataChangeProposal schemaMcp = newMcp(SCHEMA_METADATA_ASPECT_NAME, datasetUrn); - schemaMcp.setAspect(serializeAspect(schemaMetadata)); - schemaMcp.setChangeType(ChangeType.UPSERT); - ingestMcp(schemaMcp, auditStamp); + datasetBatch.aspect(SCHEMA_METADATA_ASPECT_NAME, schemaMetadata); } if (creation) { - DatasetProperties datasetProperties = new DatasetProperties(); - datasetProperties.setName(tableIdentifier.name()); - datasetProperties.setQualifiedName(name()); + datasetBatch.platformInstance(platformInstance()); - MetadataChangeProposal datasetPropertiesMcp = - newMcp(DATASET_PROPERTIES_ASPECT_NAME, datasetUrn, true); - datasetPropertiesMcp.setAspect(serializeAspect(datasetProperties)); - datasetPropertiesMcp.setChangeType(ChangeType.UPSERT); - - ingestMcp(datasetPropertiesMcp, auditStamp); - - MetadataChangeProposal platformInstanceMcp = - platformInstanceMcp(platformInstance(), datasetUrn, DATASET_ENTITY_NAME); - ingestMcp(platformInstanceMcp, auditStamp); + DatasetProperties datasetProperties = + new DatasetProperties().setName(tableIdentifier.name()).setQualifiedName(name()); + datasetBatch.aspect(DATASET_PROPERTIES_ASPECT_NAME, datasetProperties); Container container = new Container(); container.setContainer(containerUrn(platformInstance(), tableIdentifier.namespace())); - - MetadataChangeProposal containerMcp = newMcp(CONTAINER_ASPECT_NAME, datasetUrn, true); - containerMcp.setAspect(serializeAspect(container)); - containerMcp.setChangeType(ChangeType.UPSERT); - ingestMcp(containerMcp, auditStamp); + datasetBatch.aspect(CONTAINER_ASPECT_NAME, container); SubTypes subTypes = new SubTypes().setTypeNames(new StringArray(capitalize(type()))); - MetadataChangeProposal subTypesMcp = newMcp(SUB_TYPES_ASPECT_NAME, datasetUrn, true); - subTypesMcp.setAspect(serializeAspect(subTypes)); - subTypesMcp.setChangeType(ChangeType.UPSERT); - ingestMcp(subTypesMcp, auditStamp); + datasetBatch.aspect(SUB_TYPES_ASPECT_NAME, subTypes); } - sendProfileUpdate(metadata, auditStamp, datasetUrn); - onCommit(metadata.metadata(), auditStamp, datasetUrn); - } - - protected abstract DatasetProfile getDataSetProfile(M metadata); + additionalMcps(metadata.metadata(), datasetBatch); - private void sendProfileUpdate( - MetadataWrapper metadata, AuditStamp auditStamp, DatasetUrn datasetUrn) { + try { + AspectsBatch aspectsBatch = icebergBatch.asAspectsBatch(); + entityService.ingestProposal(operationContext, aspectsBatch, false); + } catch (ValidationException e) { + if (creation) { + // this is likely because table/view already exists i.e. created concurrently in a race + // condition + throw new AlreadyExistsException("%s already exists: %s", capitalize(type()), name()); + } else { + throw new CommitFailedException("Cannot commit to %s %s: stale metadata", type(), name()); + } + } - DatasetProfile dataSetProfile = getDataSetProfile(metadata.metadata()); - if (dataSetProfile != null) { - dataSetProfile.setTimestampMillis(auditStamp.getTime()); + DatasetProfile datasetProfile = + getDataSetProfile(metadata.metadata()) + .setTimestampMillis(icebergBatch.getAuditStamp().getTime()); - MetadataChangeProposal dataSetProfileMcp = newMcp(DATASET_PROFILE_ASPECT_NAME, datasetUrn); - dataSetProfileMcp.setAspect(serializeAspect(dataSetProfile)); - dataSetProfileMcp.setChangeType(ChangeType.UPSERT); - ingestMcp(dataSetProfileMcp, auditStamp); - } + MetadataChangeProposal datasetProfileMcp = + new MetadataChangeProposal() + .setEntityUrn(datasetUrn) + .setEntityType(DATASET_ENTITY_NAME) + .setAspectName(DATASET_PROFILE_ASPECT_NAME) + .setAspect(serializeAspect(datasetProfile)) + .setChangeType(ChangeType.UPSERT); + entityService.ingestProposal( + operationContext, datasetProfileMcp, icebergBatch.getAuditStamp(), true); } + protected abstract DatasetProfile getDataSetProfile(M metadata); + FileIO io() { return io; } @@ -250,28 +235,10 @@ private String platformInstance() { return warehouse.getPlatformInstance(); } - protected MetadataChangeProposal newMcp(String aspectName, DatasetUrn datasetUrn) { - return newMcp(aspectName, datasetUrn, false); // Default to async index update - } - - protected MetadataChangeProposal newMcp( - String aspectName, DatasetUrn datasetUrn, boolean syncIndexUpdate) { - MetadataChangeProposal mcp = new MetadataChangeProposal(); - mcp.setEntityUrn(datasetUrn); - mcp.setEntityType(DATASET_ENTITY_NAME); - mcp.setAspectName(aspectName); - - if (syncIndexUpdate) { - StringMap headers = new StringMap(); - headers.put(SYNC_INDEX_UPDATE_HEADER_NAME, Boolean.toString(true)); - mcp.setHeaders(headers); - } - - return mcp; - } - - protected void ingestMcp(MetadataChangeProposal mcp, AuditStamp auditStamp) { - entityService.ingestProposal(operationContext, mcp, auditStamp, false); + // override-able for testing + @VisibleForTesting + IcebergBatch newIcebergBatch(OperationContext operationContext) { + return new IcebergBatch(operationContext); } abstract boolean isView(); @@ -284,7 +251,7 @@ protected void ingestMcp(MetadataChangeProposal mcp, AuditStamp auditStamp) { abstract RuntimeException noSuchEntityException(); - void onCommit(M metadata, AuditStamp auditStamp, DatasetUrn datasetUrn) {} + void additionalMcps(M metadata, IcebergBatch.EntityBatch datasetBatch) {} } @Slf4j @@ -325,7 +292,7 @@ RuntimeException noSuchEntityException() { } @Override - void onCommit(ViewMetadata metadata, AuditStamp auditStamp, DatasetUrn datasetUrn) { + void additionalMcps(ViewMetadata metadata, IcebergBatch.EntityBatch datasetBatch) { SQLViewRepresentation sqlViewRepresentation = null; for (ViewRepresentation representation : metadata.currentVersion().representations()) { if (representation instanceof SQLViewRepresentation) { @@ -344,11 +311,7 @@ void onCommit(ViewMetadata metadata, AuditStamp auditStamp, DatasetUrn datasetUr .setViewLogic(sqlViewRepresentation.sql()) .setMaterialized(false) .setViewLanguage(sqlViewRepresentation.dialect()); - MetadataChangeProposal viewPropertiesMcp = newMcp(VIEW_PROPERTIES_ASPECT_NAME, datasetUrn); - viewPropertiesMcp.setAspect(serializeAspect(viewProperties)); - viewPropertiesMcp.setChangeType(ChangeType.UPSERT); - - ingestMcp(viewPropertiesMcp, auditStamp); + datasetBatch.aspect(VIEW_PROPERTIES_ASPECT_NAME, viewProperties); } } @@ -374,22 +337,19 @@ class TableOpsDelegate extends TableOrViewOpsDelegate { @Override protected DatasetProfile getDataSetProfile(TableMetadata metadata) { - Snapshot currentSnapshot = metadata.currentSnapshot(); - if (currentSnapshot == null) { - return null; - } DatasetProfile dataSetProfile = new DatasetProfile(); - if (currentSnapshot.summary() != null) { + long colCount = metadata.schema().columns().size(); + dataSetProfile.setColumnCount(colCount); + + Snapshot currentSnapshot = metadata.currentSnapshot(); + if (currentSnapshot != null && currentSnapshot.summary() != null) { String totalRecordsStr = currentSnapshot.summary().get(SnapshotSummary.TOTAL_RECORDS_PROP); if (totalRecordsStr != null) { dataSetProfile.setRowCount(Long.parseLong(totalRecordsStr)); } } - long colCount = metadata.schema().columns().size(); - dataSetProfile.setColumnCount(colCount); - return dataSetProfile; } diff --git a/metadata-service/iceberg-catalog/src/main/java/io/datahubproject/iceberg/catalog/Utils.java b/metadata-service/iceberg-catalog/src/main/java/io/datahubproject/iceberg/catalog/Utils.java index 767f30acdef25a..455eae4f3e2630 100644 --- a/metadata-service/iceberg-catalog/src/main/java/io/datahubproject/iceberg/catalog/Utils.java +++ b/metadata-service/iceberg-catalog/src/main/java/io/datahubproject/iceberg/catalog/Utils.java @@ -1,21 +1,14 @@ package io.datahubproject.iceberg.catalog; import static com.linkedin.metadata.Constants.*; -import static com.linkedin.metadata.utils.GenericRecordUtils.serializeAspect; -import com.linkedin.common.AuditStamp; -import com.linkedin.common.DataPlatformInstance; import com.linkedin.common.urn.DataPlatformUrn; import com.linkedin.common.urn.Urn; -import com.linkedin.events.metadata.ChangeType; -import com.linkedin.metadata.Constants; import com.linkedin.metadata.key.DataPlatformInstanceKey; import com.linkedin.metadata.utils.EntityKeyUtils; -import com.linkedin.mxe.MetadataChangeProposal; import java.net.URLEncoder; import java.nio.charset.Charset; -import java.util.HashSet; -import java.util.Set; +import java.util.*; import lombok.SneakyThrows; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.TableMetadata; @@ -29,29 +22,6 @@ public class Utils { private static final String NAMESPACE_CONTAINER_PREFIX = "urn:li:container:iceberg__"; - @SneakyThrows - public static AuditStamp auditStamp() { - return new AuditStamp() - .setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)) - .setTime(System.currentTimeMillis()); - } - - public static MetadataChangeProposal platformInstanceMcp( - String platformInstanceName, Urn urn, String entityType) { - DataPlatformInstance platformInstance = new DataPlatformInstance(); - platformInstance.setPlatform(platformUrn()); - platformInstance.setInstance(platformInstanceUrn(platformInstanceName)); - - MetadataChangeProposal mcp = new MetadataChangeProposal(); - mcp.setEntityUrn(urn); - mcp.setEntityType(entityType); - mcp.setAspectName(DATA_PLATFORM_INSTANCE_ASPECT_NAME); - mcp.setAspect(serializeAspect(platformInstance)); - mcp.setChangeType(ChangeType.UPSERT); - - return mcp; - } - public static DataPlatformUrn platformUrn() { return new DataPlatformUrn(PLATFORM_NAME); } diff --git a/metadata-service/iceberg-catalog/src/test/java/io/datahubproject/iceberg/catalog/DataHubIcebergWarehouseTest.java b/metadata-service/iceberg-catalog/src/test/java/io/datahubproject/iceberg/catalog/DataHubIcebergWarehouseTest.java index cacca17e2c479b..a2f889ea33a1ba 100644 --- a/metadata-service/iceberg-catalog/src/test/java/io/datahubproject/iceberg/catalog/DataHubIcebergWarehouseTest.java +++ b/metadata-service/iceberg-catalog/src/test/java/io/datahubproject/iceberg/catalog/DataHubIcebergWarehouseTest.java @@ -1,35 +1,35 @@ package io.datahubproject.iceberg.catalog; +import static com.linkedin.metadata.Constants.*; +import static io.datahubproject.iceberg.catalog.DataHubIcebergWarehouse.DATASET_ICEBERG_METADATA_ASPECT_NAME; +import static io.datahubproject.iceberg.catalog.Utils.fullTableName; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.*; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertTrue; +import static org.testng.Assert.*; -import com.linkedin.common.AuditStamp; import com.linkedin.common.FabricType; +import com.linkedin.common.Status; +import com.linkedin.common.urn.CorpuserUrn; import com.linkedin.common.urn.DataPlatformUrn; import com.linkedin.common.urn.DatasetUrn; import com.linkedin.common.urn.Urn; import com.linkedin.data.template.RecordTemplate; import com.linkedin.dataplatforminstance.IcebergWarehouseInfo; +import com.linkedin.dataset.DatasetProperties; import com.linkedin.dataset.IcebergCatalogInfo; +import com.linkedin.entity.Aspect; import com.linkedin.entity.EnvelopedAspect; +import com.linkedin.metadata.aspect.batch.AspectsBatch; import com.linkedin.metadata.entity.EntityService; -import com.linkedin.mxe.MetadataChangeProposal; import com.linkedin.platformresource.PlatformResourceInfo; import com.linkedin.secret.DataHubSecretValue; import com.linkedin.util.Pair; import io.datahubproject.iceberg.catalog.credentials.CredentialProvider; +import io.datahubproject.metadata.context.ActorContext; import io.datahubproject.metadata.context.OperationContext; import io.datahubproject.metadata.services.SecretService; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; +import java.util.*; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.exceptions.NotFoundException; @@ -50,9 +50,19 @@ public class DataHubIcebergWarehouseTest { @Mock private RecordTemplate warehouseAspect; + @Mock private IcebergBatch mockIcebergBatch; + @Mock private AspectsBatch mockAspectsBatch; + + private final Urn testUser = new CorpuserUrn("urn:li:corpuser:testUser"); + @BeforeMethod - public void setup() { + public void setup() throws Exception { MockitoAnnotations.openMocks(this); + ActorContext actorContext = mock(ActorContext.class); + when(operationContext.getActorContext()).thenReturn(actorContext); + when(actorContext.getActorUrn()).thenReturn(testUser); + + when(mockIcebergBatch.asAspectsBatch()).thenReturn(mockAspectsBatch); } @Test @@ -187,19 +197,25 @@ public void testGetDatasetUrn() throws Exception { PlatformResourceInfo resourceInfo = new PlatformResourceInfo(); resourceInfo.setPrimaryKey(expectedDatasetUrn.toString()); - when(entityService.getLatestAspect( - any(), - any(), - eq(DataHubIcebergWarehouse.DATAPLATFORM_INSTANCE_ICEBERG_WAREHOUSE_ASPECT_NAME))) - .thenReturn(warehouseAspect); - when(warehouseAspect.data()).thenReturn(new IcebergWarehouseInfo().data()); - when(entityService.getLatestAspect( - eq(operationContext), eq(resourceUrn), eq("platformResourceInfo"))) - .thenReturn(resourceInfo); + when(entityService.getLatestAspects( + same(operationContext), + eq(Set.of(resourceUrn)), + eq(Set.of(STATUS_ASPECT_NAME, PLATFORM_RESOURCE_INFO_ASPECT_NAME)), + eq(false))) + .thenReturn(Map.of(resourceUrn, List.of(new Status().setRemoved(false), resourceInfo))); DataHubIcebergWarehouse warehouse = - DataHubIcebergWarehouse.of( - platformInstance, entityService, secretService, operationContext); + new DataHubIcebergWarehouse( + platformInstance, + new IcebergWarehouseInfo(), + entityService, + secretService, + operationContext) { + @Override + IcebergBatch newIcebergBatch(OperationContext operationContext) { + return mockIcebergBatch; + } + }; Optional result = warehouse.getDatasetUrn(tableId); @@ -211,6 +227,9 @@ public void testGetDatasetUrn() throws Exception { public void testGetIcebergMetadata() throws Exception { String platformInstance = "test-platform"; TableIdentifier tableId = TableIdentifier.of("db", "table"); + Urn resourceUrn = + Urn.createFromString("urn:li:platformResource:iceberg.test-platform.db.table"); + DatasetUrn datasetUrn = new DatasetUrn( DataPlatformUrn.createFromString("urn:li:dataPlatform:iceberg"), @@ -220,39 +239,100 @@ public void testGetIcebergMetadata() throws Exception { IcebergCatalogInfo expectedMetadata = new IcebergCatalogInfo().setMetadataPointer("s3://bucket/path"); - when(entityService.getLatestAspect( - any(), - any(), - eq(DataHubIcebergWarehouse.DATAPLATFORM_INSTANCE_ICEBERG_WAREHOUSE_ASPECT_NAME))) - .thenReturn(warehouseAspect); - when(warehouseAspect.data()).thenReturn(new IcebergWarehouseInfo().data()); - // Mock getDatasetUrn behavior PlatformResourceInfo resourceInfo = new PlatformResourceInfo(); resourceInfo.setPrimaryKey(datasetUrn.toString()); - when(entityService.getLatestAspect(any(), any(), eq("platformResourceInfo"))) - .thenReturn(resourceInfo); + when(entityService.getLatestAspects( + same(operationContext), + eq(Set.of(resourceUrn)), + eq(Set.of(STATUS_ASPECT_NAME, PLATFORM_RESOURCE_INFO_ASPECT_NAME)), + eq(false))) + .thenReturn(Map.of(resourceUrn, List.of(new Status().setRemoved(false), resourceInfo))); - when(entityService.getLatestAspect( - eq(operationContext), - eq(datasetUrn), - eq(DataHubIcebergWarehouse.DATASET_ICEBERG_METADATA_ASPECT_NAME))) - .thenReturn(expectedMetadata); + when(entityService.getLatestAspects( + same(operationContext), + eq(Set.of(datasetUrn)), + eq(Set.of(STATUS_ASPECT_NAME, DATASET_ICEBERG_METADATA_ASPECT_NAME)), + eq(false))) + .thenReturn(Map.of(datasetUrn, List.of(expectedMetadata))); DataHubIcebergWarehouse warehouse = - DataHubIcebergWarehouse.of( - platformInstance, entityService, secretService, operationContext); + new DataHubIcebergWarehouse( + platformInstance, + new IcebergWarehouseInfo(), + entityService, + secretService, + operationContext) { + @Override + IcebergBatch newIcebergBatch(OperationContext operationContext) { + return mockIcebergBatch; + } + }; + + Optional result = warehouse.getIcebergMetadata(tableId); - IcebergCatalogInfo result = warehouse.getIcebergMetadata(tableId); + assertTrue(result.isPresent()); + assertEquals(result.get().getMetadataPointer(), expectedMetadata.getMetadataPointer()); + } - assertNotNull(result); - assertEquals(result.getMetadataPointer(), expectedMetadata.getMetadataPointer()); + @Test + public void testGetIcebergMetadataStatusRemoved() throws Exception { + String platformInstance = "test-platform"; + TableIdentifier tableId = TableIdentifier.of("db", "table"); + Urn resourceUrn = + Urn.createFromString("urn:li:platformResource:iceberg.test-platform.db.table"); + + DatasetUrn datasetUrn = + new DatasetUrn( + DataPlatformUrn.createFromString("urn:li:dataPlatform:iceberg"), + "uuid", + FabricType.PROD); + + IcebergCatalogInfo expectedMetadata = + new IcebergCatalogInfo().setMetadataPointer("s3://bucket/path"); + + // Mock getDatasetUrn behavior + PlatformResourceInfo resourceInfo = new PlatformResourceInfo(); + resourceInfo.setPrimaryKey(datasetUrn.toString()); + when(entityService.getLatestAspects( + same(operationContext), + eq(Set.of(resourceUrn)), + eq(Set.of(STATUS_ASPECT_NAME, PLATFORM_RESOURCE_INFO_ASPECT_NAME)), + eq(false))) + .thenReturn(Map.of(resourceUrn, List.of(new Status().setRemoved(false), resourceInfo))); + + when(entityService.getLatestAspects( + same(operationContext), + eq(Set.of(datasetUrn)), + eq(Set.of(STATUS_ASPECT_NAME, DATASET_ICEBERG_METADATA_ASPECT_NAME)), + eq(false))) + .thenReturn(Map.of(datasetUrn, List.of(new Status().setRemoved(true), expectedMetadata))); + + DataHubIcebergWarehouse warehouse = + new DataHubIcebergWarehouse( + platformInstance, + new IcebergWarehouseInfo(), + entityService, + secretService, + operationContext) { + @Override + IcebergBatch newIcebergBatch(OperationContext operationContext) { + return mockIcebergBatch; + } + }; + + Optional result = warehouse.getIcebergMetadata(tableId); + + assertTrue(result.isEmpty()); } @Test public void testGetIcebergMetadataEnveloped() throws Exception { String platformInstance = "test-platform"; TableIdentifier tableId = TableIdentifier.of("db", "table"); + Urn resourceUrn = + Urn.createFromString("urn:li:platformResource:iceberg.test-platform.db.table"); + DatasetUrn datasetUrn = new DatasetUrn( DataPlatformUrn.createFromString("urn:li:dataPlatform:iceberg"), @@ -261,29 +341,42 @@ public void testGetIcebergMetadataEnveloped() throws Exception { EnvelopedAspect expectedEnvelopedAspect = mock(EnvelopedAspect.class); - when(entityService.getLatestAspect( - any(), - any(), - eq(DataHubIcebergWarehouse.DATAPLATFORM_INSTANCE_ICEBERG_WAREHOUSE_ASPECT_NAME))) - .thenReturn(warehouseAspect); - when(warehouseAspect.data()).thenReturn(new IcebergWarehouseInfo().data()); - // Mock getDatasetUrn behavior PlatformResourceInfo resourceInfo = new PlatformResourceInfo(); resourceInfo.setPrimaryKey(datasetUrn.toString()); - when(entityService.getLatestAspect(any(), any(), eq("platformResourceInfo"))) - .thenReturn(resourceInfo); + when(entityService.getLatestAspects( + same(operationContext), + eq(Set.of(resourceUrn)), + eq(Set.of(STATUS_ASPECT_NAME, PLATFORM_RESOURCE_INFO_ASPECT_NAME)), + eq(false))) + .thenReturn(Map.of(resourceUrn, List.of(new Status().setRemoved(false), resourceInfo))); - when(entityService.getLatestEnvelopedAspect( - eq(operationContext), - eq("dataset"), - eq(datasetUrn), - eq(DataHubIcebergWarehouse.DATASET_ICEBERG_METADATA_ASPECT_NAME))) - .thenReturn(expectedEnvelopedAspect); + when(entityService.getLatestEnvelopedAspects( + same(operationContext), + eq(Set.of(datasetUrn)), + eq(Set.of(STATUS_ASPECT_NAME, DATASET_ICEBERG_METADATA_ASPECT_NAME)), + eq(false))) + .thenReturn( + Map.of( + datasetUrn, + List.of( + new EnvelopedAspect() + .setName(STATUS_ASPECT_NAME) + .setValue(new Aspect(new Status().setRemoved(false).data())), + expectedEnvelopedAspect))); DataHubIcebergWarehouse warehouse = - DataHubIcebergWarehouse.of( - platformInstance, entityService, secretService, operationContext); + new DataHubIcebergWarehouse( + platformInstance, + new IcebergWarehouseInfo(), + entityService, + secretService, + operationContext) { + @Override + IcebergBatch newIcebergBatch(OperationContext operationContext) { + return mockIcebergBatch; + } + }; Pair result = warehouse.getIcebergMetadataEnveloped(tableId); @@ -292,6 +385,63 @@ public void testGetIcebergMetadataEnveloped() throws Exception { assertEquals(result.getSecond(), datasetUrn); } + @Test + public void testGetIcebergMetadataEnvelopedStatusRemoved() throws Exception { + String platformInstance = "test-platform"; + TableIdentifier tableId = TableIdentifier.of("db", "table"); + Urn resourceUrn = + Urn.createFromString("urn:li:platformResource:iceberg.test-platform.db.table"); + + DatasetUrn datasetUrn = + new DatasetUrn( + DataPlatformUrn.createFromString("urn:li:dataPlatform:iceberg"), + "uuid", + FabricType.PROD); + + EnvelopedAspect expectedEnvelopedAspect = mock(EnvelopedAspect.class); + + // Mock getDatasetUrn behavior + PlatformResourceInfo resourceInfo = new PlatformResourceInfo(); + resourceInfo.setPrimaryKey(datasetUrn.toString()); + when(entityService.getLatestAspects( + same(operationContext), + eq(Set.of(resourceUrn)), + eq(Set.of(STATUS_ASPECT_NAME, PLATFORM_RESOURCE_INFO_ASPECT_NAME)), + eq(false))) + .thenReturn(Map.of(resourceUrn, List.of(new Status().setRemoved(false), resourceInfo))); + + when(entityService.getLatestEnvelopedAspects( + same(operationContext), + eq(Set.of(datasetUrn)), + eq(Set.of(STATUS_ASPECT_NAME, DATASET_ICEBERG_METADATA_ASPECT_NAME)), + eq(false))) + .thenReturn( + Map.of( + datasetUrn, + List.of( + new EnvelopedAspect() + .setName(STATUS_ASPECT_NAME) + .setValue(new Aspect(new Status().setRemoved(true).data())), + expectedEnvelopedAspect))); + + DataHubIcebergWarehouse warehouse = + new DataHubIcebergWarehouse( + platformInstance, + new IcebergWarehouseInfo(), + entityService, + secretService, + operationContext) { + @Override + IcebergBatch newIcebergBatch(OperationContext operationContext) { + return mockIcebergBatch; + } + }; + + Pair result = warehouse.getIcebergMetadataEnveloped(tableId); + + assertNull(result); + } + @Test public void testDeleteDataset() throws Exception { String platformInstance = "test-platform"; @@ -304,59 +454,86 @@ public void testDeleteDataset() throws Exception { "uuid", FabricType.PROD); - when(entityService.getLatestAspect( - any(), - any(), - eq(DataHubIcebergWarehouse.DATAPLATFORM_INSTANCE_ICEBERG_WAREHOUSE_ASPECT_NAME))) - .thenReturn(warehouseAspect); - when(warehouseAspect.data()).thenReturn(new IcebergWarehouseInfo().data()); when(entityService.exists(eq(operationContext), eq(resourceUrn))).thenReturn(true); // Mock getDatasetUrn behavior PlatformResourceInfo resourceInfo = new PlatformResourceInfo(); resourceInfo.setPrimaryKey(datasetUrn.toString()); - when(entityService.getLatestAspect(any(), any(), eq("platformResourceInfo"))) - .thenReturn(resourceInfo); + when(entityService.getLatestAspects( + same(operationContext), + eq(Set.of(resourceUrn)), + eq(Set.of(STATUS_ASPECT_NAME, PLATFORM_RESOURCE_INFO_ASPECT_NAME)), + eq(false))) + .thenReturn(Map.of(resourceUrn, List.of(new Status().setRemoved(false), resourceInfo))); + + when(entityService.ingestProposal(same(operationContext), same(mockAspectsBatch), eq(false))) + .thenReturn(List.of()); DataHubIcebergWarehouse warehouse = - DataHubIcebergWarehouse.of( - platformInstance, entityService, secretService, operationContext); + new DataHubIcebergWarehouse( + platformInstance, + new IcebergWarehouseInfo(), + entityService, + secretService, + operationContext) { + @Override + IcebergBatch newIcebergBatch(OperationContext operationContext) { + return mockIcebergBatch; + } + }; boolean result = warehouse.deleteDataset(tableId); - assertTrue(result); - verify(entityService).deleteUrn(eq(operationContext), eq(resourceUrn)); - verify(entityService).deleteUrn(eq(operationContext), eq(datasetUrn)); + + verify(mockIcebergBatch).softDeleteEntity(eq(resourceUrn), eq(PLATFORM_RESOURCE_ENTITY_NAME)); + verify(mockIcebergBatch).softDeleteEntity(eq(datasetUrn), eq(DATASET_ENTITY_NAME)); + verify(entityService).ingestProposal(same(operationContext), same(mockAspectsBatch), eq(false)); } @Test public void testCreateDataset() throws Exception { String platformInstance = "test-platform"; TableIdentifier tableId = TableIdentifier.of("db", "table"); - AuditStamp auditStamp = - new AuditStamp() - .setTime(System.currentTimeMillis()) - .setActor(Urn.createFromString("urn:li:corpuser:testUser")); - when(entityService.getLatestAspect(any(), any(), eq("icebergWarehouseInfo"))) - .thenReturn(warehouseAspect); - IcebergWarehouseInfo warehouse = new IcebergWarehouseInfo().setEnv(FabricType.PROD); - when(warehouseAspect.data()).thenReturn(warehouse.data()); + DataHubIcebergWarehouse warehouse = + new DataHubIcebergWarehouse( + platformInstance, + new IcebergWarehouseInfo().setEnv(FabricType.PROD), + entityService, + secretService, + operationContext) { + @Override + IcebergBatch newIcebergBatch(OperationContext operationContext) { + return mockIcebergBatch; + } + }; - DataHubIcebergWarehouse icebergWarehouse = - DataHubIcebergWarehouse.of( - platformInstance, entityService, secretService, operationContext); + Urn resourceUrn = + Urn.createFromString("urn:li:platformResource:iceberg.test-platform.db.table"); + IcebergBatch icebergBatch = mock(IcebergBatch.class); + when(icebergBatch.createEntity( + eq(resourceUrn), + eq(PLATFORM_RESOURCE_ENTITY_NAME), + eq(PLATFORM_RESOURCE_INFO_ASPECT_NAME), + any(PlatformResourceInfo.class))) + .thenReturn(null); - DatasetUrn result = icebergWarehouse.createDataset(tableId, false, auditStamp); + DatasetUrn result = warehouse.createDataset(tableId, false, icebergBatch); assertNotNull(result); assertEquals(result.getPlatformEntity(), Urn.createFromString("urn:li:dataPlatform:iceberg")); assertEquals(result.getOriginEntity(), FabricType.PROD); assertTrue(result.getDatasetNameEntity().startsWith(platformInstance + ".")); - verify(entityService) - .ingestProposal( - eq(operationContext), any(MetadataChangeProposal.class), eq(auditStamp), eq(false)); + verify(icebergBatch) + .createEntity( + eq(resourceUrn), + eq(PLATFORM_RESOURCE_ENTITY_NAME), + eq(PLATFORM_RESOURCE_INFO_ASPECT_NAME), + eq( + new PlatformResourceInfo() + .setPrimaryKey(result.toString()) + .setResourceType("icebergTable"))); } @Test @@ -364,42 +541,70 @@ public void testRenameDataset() throws Exception { String platformInstance = "test-platform"; TableIdentifier fromTableId = TableIdentifier.of("db", "oldTable"); TableIdentifier toTableId = TableIdentifier.of("db", "newTable"); + Urn fromResourceUrn = + Urn.createFromString("urn:li:platformResource:iceberg.test-platform.db.oldTable"); + Urn toResourceUrn = + Urn.createFromString("urn:li:platformResource:iceberg.test-platform.db.newTable"); DatasetUrn existingDatasetUrn = new DatasetUrn( DataPlatformUrn.createFromString("urn:li:dataPlatform:iceberg"), "test-dataset", FabricType.PROD); - AuditStamp auditStamp = - new AuditStamp() - .setTime(System.currentTimeMillis()) - .setActor(Urn.createFromString("urn:li:corpuser:testUser")); - - when(entityService.getLatestAspect( - any(), - any(), - eq(DataHubIcebergWarehouse.DATAPLATFORM_INSTANCE_ICEBERG_WAREHOUSE_ASPECT_NAME))) - .thenReturn(warehouseAspect); - when(warehouseAspect.data()).thenReturn(new IcebergWarehouseInfo().data()); // Mock getDatasetUrn behavior for source table - PlatformResourceInfo resourceInfo = new PlatformResourceInfo(); - resourceInfo.setPrimaryKey(existingDatasetUrn.toString()); - when(entityService.getLatestAspect(any(), any(), eq("platformResourceInfo"))) - .thenReturn(resourceInfo); - DataHubIcebergWarehouse warehouse = - DataHubIcebergWarehouse.of( - platformInstance, entityService, secretService, operationContext); - - DatasetUrn result = warehouse.renameDataset(fromTableId, toTableId, false, auditStamp); + PlatformResourceInfo resourceInfo = + new PlatformResourceInfo() + .setPrimaryKey(existingDatasetUrn.toString()) + .setResourceType("icebergTable"); - assertNotNull(result); - assertEquals(result, existingDatasetUrn); + when(entityService.getLatestAspects( + same(operationContext), + eq(Set.of(fromResourceUrn)), + eq(Set.of(STATUS_ASPECT_NAME, PLATFORM_RESOURCE_INFO_ASPECT_NAME)), + eq(false))) + .thenReturn(Map.of(fromResourceUrn, List.of(new Status().setRemoved(false), resourceInfo))); - verify(entityService) - .ingestProposal( - eq(operationContext), any(MetadataChangeProposal.class), eq(auditStamp), eq(false)); - verify(entityService).deleteUrn(eq(operationContext), any(Urn.class)); + DataHubIcebergWarehouse warehouse = + new DataHubIcebergWarehouse( + platformInstance, + new IcebergWarehouseInfo(), + entityService, + secretService, + operationContext) { + @Override + IcebergBatch newIcebergBatch(OperationContext operationContext) { + return mockIcebergBatch; + } + }; + + IcebergBatch.EntityBatch datasetUpdateBatch = mock(IcebergBatch.EntityBatch.class); + when(mockIcebergBatch.updateEntity(eq(existingDatasetUrn), eq(DATASET_ENTITY_NAME))) + .thenReturn(datasetUpdateBatch); + + when(entityService.ingestProposal(same(operationContext), same(mockAspectsBatch), eq(false))) + .thenReturn(List.of()); + + warehouse.renameDataset(fromTableId, toTableId, false); + + verify(mockIcebergBatch) + .softDeleteEntity(eq(fromResourceUrn), eq(PLATFORM_RESOURCE_ENTITY_NAME)); + verify(mockIcebergBatch) + .createEntity( + eq(toResourceUrn), + eq(PLATFORM_RESOURCE_ENTITY_NAME), + eq(PLATFORM_RESOURCE_INFO_ASPECT_NAME), + eq(resourceInfo)); + verify(datasetUpdateBatch) + .aspect( + DATASET_PROPERTIES_ASPECT_NAME, + new DatasetProperties() + .setName(toTableId.name()) + .setQualifiedName(fullTableName(platformInstance, toTableId))); + // no container aspect as rename is within same namespace + + verify(entityService).ingestProposal(same(operationContext), same(mockAspectsBatch), eq(false)); + verify(entityService).deleteUrn(eq(operationContext), eq(fromResourceUrn)); } @Test(expectedExceptions = NoSuchTableException.class) @@ -407,22 +612,21 @@ public void testRenameDataset_SourceTableNotFound() throws Exception { String platformInstance = "test-platform"; TableIdentifier fromTableId = TableIdentifier.of("db", "oldTable"); TableIdentifier toTableId = TableIdentifier.of("db", "newTable"); - AuditStamp auditStamp = - new AuditStamp() - .setTime(System.currentTimeMillis()) - .setActor(Urn.createFromString("urn:li:corpuser:testUser")); - - when(entityService.getLatestAspect(any(), any(), eq("icebergWarehouseInfo"))) - .thenReturn(warehouseAspect); - when(warehouseAspect.data()).thenReturn(new IcebergWarehouseInfo().data()); - - // Mock empty response for getDatasetUrn - when(entityService.getLatestAspect(any(), any(), eq("platformResourceInfo"))).thenReturn(null); DataHubIcebergWarehouse warehouse = - DataHubIcebergWarehouse.of( - platformInstance, entityService, secretService, operationContext); - - warehouse.renameDataset(fromTableId, toTableId, false, auditStamp); + new DataHubIcebergWarehouse( + platformInstance, + new IcebergWarehouseInfo(), + entityService, + secretService, + operationContext) { + @Override + IcebergBatch newIcebergBatch(OperationContext operationContext) { + return mockIcebergBatch; + } + }; + + // by default mock to return null on entity-service calls, so dataset should not be found + warehouse.renameDataset(fromTableId, toTableId, false); } } diff --git a/metadata-service/iceberg-catalog/src/test/java/io/datahubproject/iceberg/catalog/DataHubRestCatalogTest.java b/metadata-service/iceberg-catalog/src/test/java/io/datahubproject/iceberg/catalog/DataHubRestCatalogTest.java index ae333b0ccbd65c..cde5190f31e1fa 100644 --- a/metadata-service/iceberg-catalog/src/test/java/io/datahubproject/iceberg/catalog/DataHubRestCatalogTest.java +++ b/metadata-service/iceberg-catalog/src/test/java/io/datahubproject/iceberg/catalog/DataHubRestCatalogTest.java @@ -1,25 +1,31 @@ package io.datahubproject.iceberg.catalog; -import static com.linkedin.metadata.Constants.CONTAINER_PROPERTIES_ASPECT_NAME; +import static com.linkedin.metadata.Constants.*; import static io.datahubproject.iceberg.catalog.DataHubRestCatalog.*; +import static io.datahubproject.iceberg.catalog.Utils.*; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.*; import static org.testng.Assert.*; -import com.linkedin.common.AuditStamp; +import com.linkedin.common.DataPlatformInstance; +import com.linkedin.common.SubTypes; +import com.linkedin.common.urn.CorpuserUrn; import com.linkedin.common.urn.Urn; +import com.linkedin.container.Container; import com.linkedin.container.ContainerProperties; import com.linkedin.data.template.RecordTemplate; +import com.linkedin.data.template.StringArray; import com.linkedin.data.template.StringMap; import com.linkedin.dataset.DatasetProperties; +import com.linkedin.metadata.aspect.batch.AspectsBatch; import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.search.EntitySearchService; import com.linkedin.metadata.search.SearchEntity; import com.linkedin.metadata.search.SearchEntityArray; import com.linkedin.metadata.search.SearchResult; -import com.linkedin.mxe.MetadataChangeProposal; import io.datahubproject.iceberg.catalog.credentials.CredentialProvider; +import io.datahubproject.metadata.context.ActorContext; import io.datahubproject.metadata.context.OperationContext; import java.util.*; import java.util.HashMap; @@ -30,7 +36,6 @@ import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest; import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse; -import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.testng.annotations.BeforeMethod; @@ -48,59 +53,104 @@ public class DataHubRestCatalogTest { @Mock private CredentialProvider credentialProvider; + @Mock private IcebergBatch mockIcebergBatch; + @Mock private AspectsBatch mockAspectsBatch; + private DataHubRestCatalog catalog; + private final Urn testUser = new CorpuserUrn("urn:li:corpuser:testUser"); + + private final String platformInstanceName = "test-platform"; + @BeforeMethod public void setup() { MockitoAnnotations.openMocks(this); - when(warehouse.getPlatformInstance()).thenReturn("test-platform"); + when(warehouse.getPlatformInstance()).thenReturn(platformInstanceName); String warehouseRoot = "s3://data/warehouse/"; when(warehouse.getDataRoot()).thenReturn(warehouseRoot); catalog = new DataHubRestCatalog( - entityService, searchService, operationContext, warehouse, credentialProvider); + entityService, searchService, operationContext, warehouse, credentialProvider) { + @Override + IcebergBatch newIcebergBatch(OperationContext operationContext) { + return mockIcebergBatch; + } + }; + + ActorContext actorContext = mock(ActorContext.class); + when(operationContext.getActorContext()).thenReturn(actorContext); + when(actorContext.getActorUrn()).thenReturn(testUser); + + when(mockIcebergBatch.asAspectsBatch()).thenReturn(mockAspectsBatch); } @Test public void testCreateNamespace_SingleLevel() throws Exception { Namespace namespace = Namespace.of("db1"); - Map properties = Map.of(); + Map properties = Map.of("a", "b"); - catalog.createNamespace(namespace, properties); + Urn containerUrn = containerUrn(platformInstanceName, namespace); - ArgumentCaptor mcpCaptor = - ArgumentCaptor.forClass(MetadataChangeProposal.class); - verify(entityService, times(3)) - .ingestProposal( - eq(operationContext), mcpCaptor.capture(), any(AuditStamp.class), eq(false)); + IcebergBatch.EntityBatch entityBatch = mock(IcebergBatch.EntityBatch.class); + when(mockIcebergBatch.createEntity( + eq(containerUrn), + eq(CONTAINER_ENTITY_NAME), + eq(CONTAINER_PROPERTIES_ASPECT_NAME), + eq( + new ContainerProperties() + .setName(namespace.levels()[namespace.length() - 1]) + .setCustomProperties(new StringMap(properties))))) + .thenReturn(entityBatch); - List mcps = mcpCaptor.getAllValues(); + when(entityService.ingestProposal(same(operationContext), same(mockAspectsBatch), eq(false))) + .thenReturn(List.of()); - MetadataChangeProposal subTypesMcp = mcps.get(0); - assertEquals(subTypesMcp.getAspectName(), "subTypes"); + catalog.createNamespace(namespace, properties); - MetadataChangeProposal containerPropertiesMcp = mcps.get(1); - assertEquals(containerPropertiesMcp.getAspectName(), "containerProperties"); + verify(entityBatch).platformInstance(eq(platformInstanceName)); + verify(entityBatch) + .aspect(SUB_TYPES_ASPECT_NAME, new SubTypes().setTypeNames(new StringArray("Namespace"))); + verify(entityService).ingestProposal(same(operationContext), same(mockAspectsBatch), eq(false)); } @Test public void testCreateNamespace_MultiLevel() throws Exception { Namespace namespace = Namespace.of("db1", "schema1"); - Map properties = Map.of(); - - when(entityService.exists(eq(operationContext), any(Urn.class))).thenReturn(true); + Map properties = Map.of("a", "b"); + Urn containerUrn = containerUrn(platformInstanceName, namespace); + Urn parent = containerUrn(platformInstanceName, Namespace.of("db1")); + + when(entityService.exists(eq(operationContext), eq(parent))).thenReturn(true); + + IcebergBatch.EntityBatch entityBatch = mock(IcebergBatch.EntityBatch.class); + when(mockIcebergBatch.createEntity( + eq(containerUrn), + eq(CONTAINER_ENTITY_NAME), + eq(CONTAINER_PROPERTIES_ASPECT_NAME), + eq( + new ContainerProperties() + .setName(namespace.levels()[namespace.length() - 1]) + .setCustomProperties(new StringMap(properties))))) + .thenReturn(entityBatch); + + when(entityService.ingestProposal(same(operationContext), same(mockAspectsBatch), eq(false))) + .thenReturn(List.of()); catalog.createNamespace(namespace, properties); - ArgumentCaptor mcpCaptor = - ArgumentCaptor.forClass(MetadataChangeProposal.class); - verify(entityService, times(4)) - .ingestProposal( - eq(operationContext), mcpCaptor.capture(), any(AuditStamp.class), eq(false)); + verify(entityBatch).platformInstance(eq(platformInstanceName)); + verify(entityBatch).aspect(CONTAINER_ASPECT_NAME, new Container().setContainer(parent)); + verify(entityBatch) + .aspect(SUB_TYPES_ASPECT_NAME, new SubTypes().setTypeNames(new StringArray("Namespace"))); + verify(entityService).ingestProposal(same(operationContext), same(mockAspectsBatch), eq(false)); + verify(entityService).exists(eq(operationContext), eq(parent)); + } - List mcps = mcpCaptor.getAllValues(); - MetadataChangeProposal containerMcp = mcps.get(0); - assertEquals(containerMcp.getAspectName(), "container"); + private DataPlatformInstance dataPlatformInstance() { + DataPlatformInstance platformInstance = new DataPlatformInstance(); + platformInstance.setPlatform(platformUrn()); + platformInstance.setInstance(platformInstanceUrn(platformInstanceName)); + return platformInstance; } @Test(expectedExceptions = NoSuchNamespaceException.class) @@ -356,20 +406,8 @@ public void testUpdateNamespaceProperties() throws Exception { .remove("toRemove1") .build(); - UpdateNamespacePropertiesResponse response = - catalog.updateNamespaceProperties(namespace, request); - - assertTrue(response.removed().contains("toRemove1")); - assertTrue(response.updated().contains("new1")); - assertTrue(response.missing().isEmpty()); - - ArgumentCaptor mcpCaptor = - ArgumentCaptor.forClass(MetadataChangeProposal.class); - verify(entityService, atLeastOnce()) - .ingestProposal( - eq(operationContext), mcpCaptor.capture(), any(AuditStamp.class), eq(false)); + Urn containerUrn = containerUrn(platformInstanceName, namespace); - // Verify the final properties ContainerProperties expectedProps = new ContainerProperties() .setName("ns1") @@ -379,10 +417,22 @@ public void testUpdateNamespaceProperties() throws Exception { "existing1", "value1", "new1", "newValue1"))); - List mcps = mcpCaptor.getAllValues(); - MetadataChangeProposal finalMcp = mcps.get(mcps.size() - 1); - assertEquals(finalMcp.getAspectName(), "containerProperties"); - // Note: You might need to add more specific verification of the serialized aspect + IcebergBatch.EntityBatch entityBatch = mock(IcebergBatch.EntityBatch.class); + when(mockIcebergBatch.updateEntity(eq(containerUrn), eq(CONTAINER_ENTITY_NAME))) + .thenReturn(entityBatch); + + when(entityService.ingestProposal(same(operationContext), same(mockAspectsBatch), eq(false))) + .thenReturn(List.of()); + + UpdateNamespacePropertiesResponse response = + catalog.updateNamespaceProperties(namespace, request); + + assertTrue(response.removed().contains("toRemove1")); + assertTrue(response.updated().contains("new1")); + assertTrue(response.missing().isEmpty()); + + verify(entityBatch).aspect(eq(CONTAINER_PROPERTIES_ASPECT_NAME), eq(expectedProps)); + verify(entityService).ingestProposal(same(operationContext), same(mockAspectsBatch), eq(false)); } // Helper method for creating mock search entities diff --git a/metadata-service/iceberg-catalog/src/test/java/io/datahubproject/iceberg/catalog/IcbergBatchTest.java b/metadata-service/iceberg-catalog/src/test/java/io/datahubproject/iceberg/catalog/IcbergBatchTest.java new file mode 100644 index 00000000000000..ddfa6c78fb399c --- /dev/null +++ b/metadata-service/iceberg-catalog/src/test/java/io/datahubproject/iceberg/catalog/IcbergBatchTest.java @@ -0,0 +1,363 @@ +package io.datahubproject.iceberg.catalog; + +import static com.linkedin.metadata.Constants.*; +import static com.linkedin.metadata.aspect.validation.ConditionalWriteValidator.HTTP_HEADER_IF_VERSION_MATCH; +import static com.linkedin.metadata.utils.GenericRecordUtils.serializeAspect; +import static io.datahubproject.iceberg.catalog.Utils.platformInstanceUrn; +import static io.datahubproject.iceberg.catalog.Utils.platformUrn; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; +import static org.testng.Assert.*; + +import com.linkedin.common.AuditStamp; +import com.linkedin.common.DataPlatformInstance; +import com.linkedin.common.Status; +import com.linkedin.common.urn.CorpuserUrn; +import com.linkedin.common.urn.Urn; +import com.linkedin.data.DataMap; +import com.linkedin.data.template.RecordTemplate; +import com.linkedin.data.template.StringMap; +import com.linkedin.entity.Aspect; +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.aspect.batch.AspectsBatch; +import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl; +import com.linkedin.mxe.MetadataChangeProposal; +import io.datahubproject.metadata.context.ActorContext; +import io.datahubproject.metadata.context.OperationContext; +import io.datahubproject.metadata.context.RetrieverContext; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class IcbergBatchTest { + @Mock private OperationContext mockOperationContext; + + @Mock private RetrieverContext mockRetrieverContext; + + private Urn testActorUrn = new CorpuserUrn("urn:li:corpuser:testUser"); + + private IcebergBatch icebergBatch; + + @BeforeMethod + public void setup() { + MockitoAnnotations.openMocks(this); + ActorContext actorContext = mock(ActorContext.class); + when(mockOperationContext.getActorContext()).thenReturn(actorContext); + when(actorContext.getActorUrn()).thenReturn(testActorUrn); + + when(mockOperationContext.getRetrieverContext()).thenReturn(mockRetrieverContext); + icebergBatch = new IcebergBatch(mockOperationContext); + } + + @Test + public void testSoftDelete() { + Urn urn = mock(Urn.class); + String entityName = "SOME_ENTITY"; + icebergBatch.softDeleteEntity(urn, entityName); + assertEquals(icebergBatch.getMcps().size(), 1); + assertEquals( + icebergBatch.getMcps().get(0), + new MetadataChangeProposal() + .setEntityUrn(urn) + .setEntityType(entityName) + .setAspectName(STATUS_ASPECT_NAME) + .setHeaders( + new StringMap(Map.of(SYNC_INDEX_UPDATE_HEADER_NAME, Boolean.toString(true)))) + .setAspect(serializeAspect(new Status().setRemoved(true))) + .setChangeType(ChangeType.UPSERT)); + } + + @Test + public void testCreateBatch() { + Urn urn = mock(Urn.class); + String entityName = "SOME_ENTITY"; + RecordTemplate creationAspect = new Aspect(new DataMap(Map.of("someKey", "someValue"))); + + IcebergBatch.EntityBatch entityBatch = + icebergBatch.createEntity(urn, entityName, "CREATION_ASPECT", creationAspect); + + List expectedMcps = new ArrayList<>(); + expectedMcps.add( + new MetadataChangeProposal() + .setEntityUrn(urn) + .setEntityType(entityName) + .setAspectName("CREATION_ASPECT") + .setHeaders( + new StringMap(Map.of(SYNC_INDEX_UPDATE_HEADER_NAME, Boolean.toString(true)))) + .setAspect(serializeAspect(creationAspect)) + .setChangeType(ChangeType.CREATE_ENTITY)); + expectedMcps.add( + new MetadataChangeProposal() + .setEntityUrn(urn) + .setEntityType(entityName) + .setAspectName(STATUS_ASPECT_NAME) + .setHeaders( + new StringMap(Map.of(SYNC_INDEX_UPDATE_HEADER_NAME, Boolean.toString(true)))) + .setAspect(serializeAspect(new Status().setRemoved(false))) + .setChangeType(ChangeType.CREATE)); + assertEquals(icebergBatch.getMcps(), expectedMcps); + + entityBatch.platformInstance("some_platform"); + expectedMcps.add( + new MetadataChangeProposal() + .setEntityUrn(urn) + .setEntityType(entityName) + .setAspectName(DATA_PLATFORM_INSTANCE_ASPECT_NAME) + .setHeaders( + new StringMap(Map.of(SYNC_INDEX_UPDATE_HEADER_NAME, Boolean.toString(true)))) + .setAspect( + serializeAspect( + new DataPlatformInstance() + .setPlatform(platformUrn()) + .setInstance(platformInstanceUrn("some_platform")))) + .setChangeType(ChangeType.CREATE)); + assertEquals(icebergBatch.getMcps(), expectedMcps); + + entityBatch.aspect( + "some_aspect", new Aspect(new DataMap(Map.of("anotherKey", "anotherValue")))); + expectedMcps.add( + new MetadataChangeProposal() + .setEntityUrn(urn) + .setEntityType(entityName) + .setAspectName("some_aspect") + .setHeaders( + new StringMap(Map.of(SYNC_INDEX_UPDATE_HEADER_NAME, Boolean.toString(true)))) + .setAspect( + serializeAspect(new Aspect(new DataMap(Map.of("anotherKey", "anotherValue"))))) + .setChangeType(ChangeType.CREATE)); + assertEquals(icebergBatch.getMcps(), expectedMcps); + } + + @Test + public void testUpdateBatch() { + Urn urn = mock(Urn.class); + String entityName = "SOME_ENTITY"; + + IcebergBatch.EntityBatch entityBatch = icebergBatch.updateEntity(urn, entityName); + assertTrue(icebergBatch.getMcps().isEmpty()); + + try { + entityBatch.platformInstance("some_platform"); + fail(); + } catch (UnsupportedOperationException e) { + + } + + List expectedMcps = new ArrayList<>(); + entityBatch.aspect("some_aspect", new Aspect(new DataMap(Map.of("someKey", "someValue")))); + expectedMcps.add( + new MetadataChangeProposal() + .setEntityUrn(urn) + .setEntityType(entityName) + .setAspectName("some_aspect") + .setHeaders( + new StringMap(Map.of(SYNC_INDEX_UPDATE_HEADER_NAME, Boolean.toString(true)))) + .setAspect(serializeAspect(new Aspect(new DataMap(Map.of("someKey", "someValue"))))) + .setChangeType(ChangeType.UPDATE)); + assertEquals(icebergBatch.getMcps(), expectedMcps); + + entityBatch.aspect( + "some_other_aspect", new Aspect(new DataMap(Map.of("anotherKey", "anotherValue")))); + expectedMcps.add( + new MetadataChangeProposal() + .setEntityUrn(urn) + .setEntityType(entityName) + .setAspectName("some_other_aspect") + .setHeaders( + new StringMap(Map.of(SYNC_INDEX_UPDATE_HEADER_NAME, Boolean.toString(true)))) + .setAspect( + serializeAspect(new Aspect(new DataMap(Map.of("anotherKey", "anotherValue"))))) + .setChangeType(ChangeType.UPDATE)); + assertEquals(icebergBatch.getMcps(), expectedMcps); + } + + @Test + public void testConditionalUpdate() { + Urn urn = mock(Urn.class); + String entityName = "SOME_ENTITY"; + + IcebergBatch.EntityBatch entityBatch = + icebergBatch.conditionalUpdateEntity( + urn, + entityName, + "COND_ASPECT", + new Aspect(new DataMap(Map.of("someKey", "someValue"))), + "version1"); + + List expectedMcps = new ArrayList<>(); + expectedMcps.add( + new MetadataChangeProposal() + .setEntityUrn(urn) + .setEntityType(entityName) + .setAspectName("COND_ASPECT") + .setHeaders( + new StringMap( + Map.of( + SYNC_INDEX_UPDATE_HEADER_NAME, + Boolean.toString(true), + HTTP_HEADER_IF_VERSION_MATCH, + "version1"))) + .setAspect(serializeAspect(new Aspect(new DataMap(Map.of("someKey", "someValue"))))) + .setChangeType(ChangeType.UPDATE)); + assertEquals(icebergBatch.getMcps(), expectedMcps); + + try { + entityBatch.platformInstance("some_platform"); + fail(); + } catch (UnsupportedOperationException e) { + + } + + entityBatch.aspect( + "some_other_aspect", new Aspect(new DataMap(Map.of("anotherKey", "anotherValue")))); + expectedMcps.add( + new MetadataChangeProposal() + .setEntityUrn(urn) + .setEntityType(entityName) + .setAspectName("some_other_aspect") + .setHeaders( + new StringMap(Map.of(SYNC_INDEX_UPDATE_HEADER_NAME, Boolean.toString(true)))) + .setAspect( + serializeAspect(new Aspect(new DataMap(Map.of("anotherKey", "anotherValue"))))) + .setChangeType(ChangeType.UPDATE)); + assertEquals(icebergBatch.getMcps(), expectedMcps); + } + + @Test + public void testMixedBatch() { + Urn urn = mock(Urn.class); + String entityName = "SOME_ENTITY"; + RecordTemplate creationAspect = new Aspect(new DataMap(Map.of("someKey", "someValue"))); + + IcebergBatch.EntityBatch createEntityBatch = + icebergBatch.createEntity(urn, entityName, "CREATION_ASPECT", creationAspect); + IcebergBatch.EntityBatch updateEntityBatch = icebergBatch.updateEntity(urn, entityName); + IcebergBatch.EntityBatch condUpdateEntityBatch = + icebergBatch.conditionalUpdateEntity( + urn, + entityName, + "COND_ASPECT", + new Aspect(new DataMap(Map.of("someKey", "someValue"))), + "version1"); + + createEntityBatch.platformInstance("some_platform"); + updateEntityBatch.aspect("aspect1", new Aspect(new DataMap(Map.of("key1", "value1")))); + createEntityBatch.aspect("aspect2", new Aspect(new DataMap(Map.of("key2", "value2")))); + condUpdateEntityBatch.aspect("aspect3", new Aspect(new DataMap(Map.of("key3", "value3")))); + icebergBatch.softDeleteEntity(urn, entityName); + + List expectedMcps = new ArrayList<>(); + expectedMcps.add( + new MetadataChangeProposal() + .setEntityUrn(urn) + .setEntityType(entityName) + .setAspectName("CREATION_ASPECT") + .setHeaders( + new StringMap(Map.of(SYNC_INDEX_UPDATE_HEADER_NAME, Boolean.toString(true)))) + .setAspect(serializeAspect(creationAspect)) + .setChangeType(ChangeType.CREATE_ENTITY)); + expectedMcps.add( + new MetadataChangeProposal() + .setEntityUrn(urn) + .setEntityType(entityName) + .setAspectName(STATUS_ASPECT_NAME) + .setHeaders( + new StringMap(Map.of(SYNC_INDEX_UPDATE_HEADER_NAME, Boolean.toString(true)))) + .setAspect(serializeAspect(new Status().setRemoved(false))) + .setChangeType(ChangeType.CREATE)); + expectedMcps.add( + new MetadataChangeProposal() + .setEntityUrn(urn) + .setEntityType(entityName) + .setAspectName("COND_ASPECT") + .setHeaders( + new StringMap( + Map.of( + SYNC_INDEX_UPDATE_HEADER_NAME, + Boolean.toString(true), + HTTP_HEADER_IF_VERSION_MATCH, + "version1"))) + .setAspect(serializeAspect(new Aspect(new DataMap(Map.of("someKey", "someValue"))))) + .setChangeType(ChangeType.UPDATE)); + expectedMcps.add( + new MetadataChangeProposal() + .setEntityUrn(urn) + .setEntityType(entityName) + .setAspectName(DATA_PLATFORM_INSTANCE_ASPECT_NAME) + .setHeaders( + new StringMap(Map.of(SYNC_INDEX_UPDATE_HEADER_NAME, Boolean.toString(true)))) + .setAspect( + serializeAspect( + new DataPlatformInstance() + .setPlatform(platformUrn()) + .setInstance(platformInstanceUrn("some_platform")))) + .setChangeType(ChangeType.CREATE)); + expectedMcps.add( + new MetadataChangeProposal() + .setEntityUrn(urn) + .setEntityType(entityName) + .setAspectName("aspect1") + .setHeaders( + new StringMap(Map.of(SYNC_INDEX_UPDATE_HEADER_NAME, Boolean.toString(true)))) + .setAspect(serializeAspect(new Aspect(new DataMap(Map.of("key1", "value1"))))) + .setChangeType(ChangeType.UPDATE)); + expectedMcps.add( + new MetadataChangeProposal() + .setEntityUrn(urn) + .setEntityType(entityName) + .setAspectName("aspect2") + .setHeaders( + new StringMap(Map.of(SYNC_INDEX_UPDATE_HEADER_NAME, Boolean.toString(true)))) + .setAspect(serializeAspect(new Aspect(new DataMap(Map.of("key2", "value2"))))) + .setChangeType(ChangeType.CREATE)); + expectedMcps.add( + new MetadataChangeProposal() + .setEntityUrn(urn) + .setEntityType(entityName) + .setAspectName("aspect3") + .setHeaders( + new StringMap(Map.of(SYNC_INDEX_UPDATE_HEADER_NAME, Boolean.toString(true)))) + .setAspect(serializeAspect(new Aspect(new DataMap(Map.of("key3", "value3"))))) + .setChangeType(ChangeType.UPDATE)); + expectedMcps.add( + new MetadataChangeProposal() + .setEntityUrn(urn) + .setEntityType(entityName) + .setAspectName(STATUS_ASPECT_NAME) + .setHeaders( + new StringMap(Map.of(SYNC_INDEX_UPDATE_HEADER_NAME, Boolean.toString(true)))) + .setAspect(serializeAspect(new Status().setRemoved(true))) + .setChangeType(ChangeType.UPSERT)); + + assertEquals(icebergBatch.getMcps(), expectedMcps); + } + + @Test + public void testAsBatch() { + AspectsBatchImpl markerBatch = mock(AspectsBatchImpl.class); + AspectsBatchImpl.AspectsBatchImplBuilder builder = + mock(AspectsBatchImpl.AspectsBatchImplBuilder.class); + when(builder.mcps( + same(icebergBatch.getMcps()), any(AuditStamp.class), same(mockRetrieverContext))) + .thenReturn(builder); + when(builder.build()).thenReturn(markerBatch); + + try (MockedStatic stsClientMockedStatic = + mockStatic(AspectsBatchImpl.class)) { + stsClientMockedStatic.when(AspectsBatchImpl::builder).thenReturn(builder); + AspectsBatch actual = icebergBatch.asAspectsBatch(); + assertSame(actual, markerBatch); + } + + ArgumentCaptor auditStampArg = ArgumentCaptor.forClass(AuditStamp.class); + verify(builder) + .mcps(same(icebergBatch.getMcps()), auditStampArg.capture(), same(mockRetrieverContext)); + assertSame(auditStampArg.getValue().getActor(), testActorUrn); + } +} diff --git a/metadata-service/iceberg-catalog/src/test/java/io/datahubproject/iceberg/catalog/TableOpsDelegateTest.java b/metadata-service/iceberg-catalog/src/test/java/io/datahubproject/iceberg/catalog/TableOpsDelegateTest.java new file mode 100644 index 00000000000000..23f197950e21d8 --- /dev/null +++ b/metadata-service/iceberg-catalog/src/test/java/io/datahubproject/iceberg/catalog/TableOpsDelegateTest.java @@ -0,0 +1,429 @@ +package io.datahubproject.iceberg.catalog; + +import static com.linkedin.metadata.Constants.*; +import static com.linkedin.metadata.utils.GenericRecordUtils.serializeAspect; +import static io.datahubproject.iceberg.catalog.DataHubIcebergWarehouse.DATASET_ICEBERG_METADATA_ASPECT_NAME; +import static io.datahubproject.iceberg.catalog.Utils.containerUrn; +import static io.datahubproject.iceberg.catalog.Utils.platformUrn; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.*; +import static org.testng.Assert.*; + +import com.linkedin.common.AuditStamp; +import com.linkedin.common.SubTypes; +import com.linkedin.common.urn.CorpuserUrn; +import com.linkedin.common.urn.DatasetUrn; +import com.linkedin.container.Container; +import com.linkedin.data.template.StringArray; +import com.linkedin.dataset.DatasetProfile; +import com.linkedin.dataset.DatasetProperties; +import com.linkedin.dataset.IcebergCatalogInfo; +import com.linkedin.entity.Aspect; +import com.linkedin.entity.EnvelopedAspect; +import com.linkedin.metadata.aspect.batch.AspectsBatch; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.mxe.MetadataChangeProposal; +import com.linkedin.mxe.SystemMetadata; +import com.linkedin.schema.SchemaMetadata; +import com.linkedin.util.Pair; +import io.datahubproject.metadata.context.ActorContext; +import io.datahubproject.metadata.context.OperationContext; +import io.datahubproject.schematron.converters.avro.AvroSchemaConverter; +import java.time.Instant; +import java.util.Optional; +import java.util.Set; +import org.apache.iceberg.Schema; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.types.Types; +import org.mockito.*; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class TableOpsDelegateTest { + @Mock private DataHubIcebergWarehouse mockWarehouse; + @Mock private EntityService mockEntityService; + @Mock private OperationContext mockOperationContext; + @Mock private FileIOFactory mockFileIOFactory; + @Mock private FileIO mockFileIO; + @Mock private IcebergBatch mockIcebergBatch; + @Mock private AspectsBatch mockAspectsBatch; + + private DatasetProfile stubDatasetProfile; + + private static final TableIdentifier identifier = TableIdentifier.of("db", "entity"); + private static final String catalogName = "fooCatalog"; + private static final String fullName = "fooCatalog.db.entity"; + + private TableOpsDelegate tableDelegate; + + @BeforeMethod + public void setup() { + MockitoAnnotations.openMocks(this); + when(mockFileIOFactory.createIO(any(), any(), any(Set.class))).thenReturn(mockFileIO); + when(mockFileIOFactory.createIO(any(), any(), any(TableMetadata.class))).thenReturn(mockFileIO); + + when(mockWarehouse.getPlatformInstance()).thenReturn(catalogName); + + AuditStamp batchAuditStamp = new AuditStamp().setTime(Instant.now().toEpochMilli()); + when(mockIcebergBatch.getAuditStamp()).thenReturn(batchAuditStamp); + when(mockIcebergBatch.asAspectsBatch()).thenReturn(mockAspectsBatch); + + stubDatasetProfile = + new DatasetProfile() + .setColumnCount(2) + .setRowCount(3) + .setTimestampMillis(batchAuditStamp.getTime()); + + tableDelegate = + new TableOpsDelegate( + mockWarehouse, identifier, mockEntityService, mockOperationContext, mockFileIOFactory) { + @Override + IcebergBatch newIcebergBatch(OperationContext operationContext) { + return mockIcebergBatch; + } + + @Override + protected DatasetProfile getDataSetProfile(TableMetadata metadata) { + return stubDatasetProfile; + } + }; + + ActorContext actorContext = mock(ActorContext.class); + when(mockOperationContext.getActorContext()).thenReturn(actorContext); + when(actorContext.getActorUrn()).thenReturn(new CorpuserUrn("urn:li:corpuser:testUser")); + } + + // CREATION + @Test + public void testCreateTableSuccess() { + Schema schema = + new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get())); + TableMetadata metadata = mock(TableMetadata.class); + when(metadata.schema()).thenReturn(schema); + when(metadata.location()).thenReturn("s3://bucket/table"); + + // Simulating new table creation + when(mockWarehouse.getIcebergMetadataEnveloped(identifier)).thenReturn(null); + + DatasetUrn datasetUrn = mock(DatasetUrn.class); + + when(mockWarehouse.createDataset(eq(identifier), eq(false), same(mockIcebergBatch))) + .thenReturn(datasetUrn); + IcebergBatch.EntityBatch entityBatch = mock(IcebergBatch.EntityBatch.class); + when(mockIcebergBatch.createEntity( + eq(datasetUrn), + eq(DATASET_ENTITY_NAME), + eq(DATASET_ICEBERG_METADATA_ASPECT_NAME), + eq( + new IcebergCatalogInfo() + .setMetadataPointer("s3://bucket/metadata/00001-metadata.json") + .setView(false)))) + .thenReturn(entityBatch); + + tableDelegate.doCommit( + null, new MetadataWrapper<>(metadata), () -> "s3://bucket/metadata/00001-metadata.json"); + + verify(mockWarehouse).createDataset(eq(identifier), eq(false), same(mockIcebergBatch)); + verify(mockEntityService) + .ingestProposal(same(mockOperationContext), same(mockAspectsBatch), eq(false)); + + // verify schema + org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema, fullName); + AvroSchemaConverter converter = AvroSchemaConverter.builder().build(); + SchemaMetadata schemaMetadata = + converter.toDataHubSchema(avroSchema, false, false, platformUrn(), null); + verify(entityBatch).aspect(eq(SCHEMA_METADATA_ASPECT_NAME), eq(schemaMetadata)); + + // other aspects populated during creation + verify(entityBatch) + .aspect( + eq(DATASET_PROPERTIES_ASPECT_NAME), + eq(new DatasetProperties().setName(identifier.name()).setQualifiedName(fullName))); + verify(entityBatch) + .aspect( + eq(CONTAINER_ASPECT_NAME), + eq(new Container().setContainer(containerUrn(catalogName, Namespace.of("db"))))); + verify(entityBatch) + .aspect( + eq(SUB_TYPES_ASPECT_NAME), eq(new SubTypes().setTypeNames(new StringArray("Table")))); + verify(entityBatch).platformInstance(eq(catalogName)); + + verifyDatasetProfile(); + + verifyNoMoreInteractions(entityBatch); + verify(mockIcebergBatch).asAspectsBatch(); + } + + @Test( + expectedExceptions = AlreadyExistsException.class, + expectedExceptionsMessageRegExp = "Table already exists: " + fullName) + public void testCreateTableAlreadyExistsFailure() { + mockWarehouseIcebergMetadata("someLocation", false, "version1"); + TableMetadata newMetadata = mock(TableMetadata.class); + + tableDelegate.doCommit(null, new MetadataWrapper<>(newMetadata), null); + } + + @Test( + expectedExceptions = AlreadyExistsException.class, + expectedExceptionsMessageRegExp = "Table already exists: " + fullName) + public void testCreateTableConcurrencyFailure() { + mockWarehouseIcebergMetadata("someLocation", false, "version1"); + IcebergBatch icebergBatch = mock(IcebergBatch.class); + when(mockWarehouse.createDataset(eq(identifier), eq(false), same(icebergBatch))) + .thenThrow(ValidationException.class); + + tableDelegate.doCommit(null, new MetadataWrapper<>(mock(TableMetadata.class)), null); + } + + private Pair mockWarehouseIcebergMetadata( + String metadataPointer, boolean view, String version) { + IcebergCatalogInfo existingMetadata = + new IcebergCatalogInfo().setMetadataPointer(metadataPointer).setView(view); + + EnvelopedAspect envelopedAspect = + new EnvelopedAspect() + .setValue(new Aspect(existingMetadata.data())) + .setSystemMetadata(new SystemMetadata().setVersion(version)); + + DatasetUrn datasetUrn = mock(DatasetUrn.class); + Pair result = new Pair<>(envelopedAspect, datasetUrn); + when(mockWarehouse.getIcebergMetadataEnveloped(identifier)).thenReturn(result); + + return result; + } + + // UPDATES + @Test + public void testUpdateTableDataSuccess() { + String existingLocation = "s3://bucket/metadata/00001-metadata.json"; + String existingVersion = "version1"; + int existingSchemaId = 1; + + TableMetadata metadata = mock(TableMetadata.class); + when(metadata.location()).thenReturn("s3://bucket/table"); + when(metadata.currentSchemaId()).thenReturn(existingSchemaId); + + TableMetadata base = mock(TableMetadata.class); + when(base.metadataFileLocation()).thenReturn(existingLocation); + when(base.currentSchemaId()).thenReturn(existingSchemaId); + + Pair existingDatasetAspect = + mockWarehouseIcebergMetadata(existingLocation, false, existingVersion); + when(mockWarehouse.getIcebergMetadataEnveloped(identifier)).thenReturn(existingDatasetAspect); + + DatasetUrn datasetUrn = existingDatasetAspect.getSecond(); + + String newMetadataPointerLocation = "s3://bucket/metadata/00002-metadata.json"; + IcebergCatalogInfo newCatalogInfo = + new IcebergCatalogInfo().setMetadataPointer(newMetadataPointerLocation).setView(false); + + IcebergBatch.EntityBatch entityBatch = mock(IcebergBatch.EntityBatch.class); + when(mockIcebergBatch.conditionalUpdateEntity( + eq(datasetUrn), + eq(DATASET_ENTITY_NAME), + eq(DATASET_ICEBERG_METADATA_ASPECT_NAME), + eq(newCatalogInfo), + eq(existingVersion))) + .thenReturn(entityBatch); + + tableDelegate.doCommit( + new MetadataWrapper<>(base), + new MetadataWrapper<>(metadata), + () -> newMetadataPointerLocation); + + verify(mockEntityService) + .ingestProposal(same(mockOperationContext), same(mockAspectsBatch), eq(false)); + + verifyNoMoreInteractions(entityBatch); + + verifyDatasetProfile(); + } + + @Test + public void testUpdateTableSchemaSuccess() { + String existingLocation = "s3://bucket/metadata/00001-metadata.json"; + String existingVersion = "version1"; + int existingSchemaId = 1; + + Schema schema = + new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get())); + TableMetadata metadata = mock(TableMetadata.class); + when(metadata.schema()).thenReturn(schema); + when(metadata.location()).thenReturn("s3://bucket/table"); + when(metadata.currentSchemaId()).thenReturn(existingSchemaId + 1); + + TableMetadata base = mock(TableMetadata.class); + when(base.metadataFileLocation()).thenReturn(existingLocation); + when(base.currentSchemaId()).thenReturn(existingSchemaId); + + Pair existingDatasetAspect = + mockWarehouseIcebergMetadata(existingLocation, false, existingVersion); + when(mockWarehouse.getIcebergMetadataEnveloped(identifier)).thenReturn(existingDatasetAspect); + + DatasetUrn datasetUrn = existingDatasetAspect.getSecond(); + + String newMetadataPointerLocation = "s3://bucket/metadata/00002-metadata.json"; + IcebergCatalogInfo newgCatalogInfo = + new IcebergCatalogInfo().setMetadataPointer(newMetadataPointerLocation).setView(false); + + IcebergBatch.EntityBatch entityBatch = mock(IcebergBatch.EntityBatch.class); + when(mockIcebergBatch.conditionalUpdateEntity( + eq(datasetUrn), + eq(DATASET_ENTITY_NAME), + eq(DATASET_ICEBERG_METADATA_ASPECT_NAME), + eq(newgCatalogInfo), + eq(existingVersion))) + .thenReturn(entityBatch); + + tableDelegate.doCommit( + new MetadataWrapper<>(base), + new MetadataWrapper<>(metadata), + () -> newMetadataPointerLocation); + + verify(mockEntityService) + .ingestProposal(same(mockOperationContext), same(mockAspectsBatch), eq(false)); + + // verify schema + org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema, fullName); + AvroSchemaConverter converter = AvroSchemaConverter.builder().build(); + SchemaMetadata schemaMetadata = + converter.toDataHubSchema(avroSchema, false, false, platformUrn(), null); + verify(entityBatch).aspect(eq(SCHEMA_METADATA_ASPECT_NAME), eq(schemaMetadata)); + + verifyNoMoreInteractions(entityBatch); + + verifyDatasetProfile(); + } + + @Test + public void testUpdateTableConcurrencyFailure() { + String existingLocation = "s3://bucket/metadata/00001-metadata.json"; + String existingVersion = "version1"; + + TableMetadata base = mock(TableMetadata.class); + when(base.metadataFileLocation()).thenReturn(existingLocation); + + Pair existingDatasetAspect = + mockWarehouseIcebergMetadata(existingLocation, false, existingVersion); + when(mockWarehouse.getIcebergMetadataEnveloped(identifier)).thenReturn(existingDatasetAspect); + + when(mockEntityService.ingestProposal( + same(mockOperationContext), same(mockAspectsBatch), eq(false))) + .thenThrow(ValidationException.class); + + TableMetadata metadata = mock(TableMetadata.class); + when(metadata.location()).thenReturn("s3://bucket/table"); + + String newMetadataPointerLocation = "s3://bucket/metadata/00002-metadata.json"; + try { + tableDelegate.doCommit( + new MetadataWrapper<>(base), + new MetadataWrapper<>(metadata), + () -> newMetadataPointerLocation); + fail(); + } catch (CommitFailedException e) { + assertEquals(e.getMessage(), "Cannot commit to table " + fullName + ": stale metadata"); + } + IcebergCatalogInfo newCatalogInfo = + new IcebergCatalogInfo().setMetadataPointer(newMetadataPointerLocation).setView(false); + + verify(mockIcebergBatch) + .conditionalUpdateEntity( + eq(existingDatasetAspect.getSecond()), + eq(DATASET_ENTITY_NAME), + eq(DATASET_ICEBERG_METADATA_ASPECT_NAME), + eq(newCatalogInfo), + eq(existingVersion)); + } + + @Test + public void testUpdateTableStaleMetadataFailure() { + String existingLocation = "s3://bucket/metadata/00002-metadata.json"; + String staleLocationBeingUpdated = "s3://bucket/metadata/00001-metadata.json"; + + TableMetadata base = mock(TableMetadata.class); + when(base.metadataFileLocation()).thenReturn(staleLocationBeingUpdated); + + Pair existingDatasetAspect = + mockWarehouseIcebergMetadata(existingLocation, false, "version1"); + when(mockWarehouse.getIcebergMetadataEnveloped(identifier)).thenReturn(existingDatasetAspect); + + // new metadata location, metadataWriter etc. should not be accessed in this case + try { + tableDelegate.doCommit( + new MetadataWrapper<>(base), new MetadataWrapper<>(mock(TableMetadata.class)), null); + fail(); + } catch (CommitFailedException e) { + assertEquals(e.getMessage(), "Cannot commit to table " + fullName + ": stale metadata"); + } + + verifyNoInteractions(mockEntityService); + verifyNoInteractions(mockFileIOFactory); + } + + private void verifyDatasetProfile() { + ArgumentCaptor datasetProfileMcpCaptor = + ArgumentCaptor.forClass(MetadataChangeProposal.class); + verify(mockEntityService) + .ingestProposal( + same(mockOperationContext), datasetProfileMcpCaptor.capture(), any(), eq(true)); + assertEquals( + datasetProfileMcpCaptor.getValue().getAspect(), serializeAspect(stubDatasetProfile)); + } + + @Test( + expectedExceptions = NoSuchTableException.class, + expectedExceptionsMessageRegExp = "No such table " + fullName) + public void testUpdateTableButIsViewFailure() { + mockWarehouseIcebergMetadata("someLocation", true, "version1"); + + tableDelegate.doCommit( + new MetadataWrapper<>(mock(TableMetadata.class)), + new MetadataWrapper<>(mock(TableMetadata.class)), + null); + } + + // REFRESH + @Test + public void testTableRefreshSuccessful() { + String location = "s3://bucket/metadata/00001-metadata.json"; + // Arrange + IcebergCatalogInfo metadata = + new IcebergCatalogInfo().setMetadataPointer(location).setView(false); + when(mockWarehouse.getIcebergMetadata(identifier)).thenReturn(Optional.of(metadata)); + when(mockFileIO.newInputFile(location)).thenReturn(mock(InputFile.class)); + + TableMetadata expectedMetadata = mock(TableMetadata.class); + + try (MockedStatic tableMetadataParserMock = + Mockito.mockStatic(TableMetadataParser.class)) { + tableMetadataParserMock + .when(() -> TableMetadataParser.read(same(mockFileIO), eq(location))) + .thenReturn(expectedMetadata); + TableMetadata actualMetadata = tableDelegate.refresh(); + assertSame(actualMetadata, expectedMetadata); + } + } + + @Test + public void testRefreshNotFound() { + when(mockWarehouse.getIcebergMetadata(identifier)).thenReturn(Optional.empty()); + assertNull(tableDelegate.refresh()); + } +} diff --git a/metadata-service/iceberg-catalog/src/test/java/io/datahubproject/iceberg/catalog/UtilsTest.java b/metadata-service/iceberg-catalog/src/test/java/io/datahubproject/iceberg/catalog/UtilsTest.java index f7eb654c59f6a4..326f4d762e209f 100644 --- a/metadata-service/iceberg-catalog/src/test/java/io/datahubproject/iceberg/catalog/UtilsTest.java +++ b/metadata-service/iceberg-catalog/src/test/java/io/datahubproject/iceberg/catalog/UtilsTest.java @@ -3,12 +3,8 @@ import static org.mockito.Mockito.*; import static org.testng.Assert.*; -import com.linkedin.common.AuditStamp; import com.linkedin.common.urn.DataPlatformUrn; import com.linkedin.common.urn.Urn; -import com.linkedin.events.metadata.ChangeType; -import com.linkedin.metadata.Constants; -import com.linkedin.mxe.MetadataChangeProposal; import java.util.Map; import java.util.Set; import org.apache.iceberg.TableMetadata; @@ -29,29 +25,6 @@ public void setup() { MockitoAnnotations.initMocks(this); } - @Test - public void testAuditStamp() { - AuditStamp stamp = Utils.auditStamp(); - assertNotNull(stamp); - assertEquals(stamp.getActor().toString(), Constants.SYSTEM_ACTOR); - assertTrue(stamp.getTime() > 0); - } - - @Test - public void testPlatformInstanceMcp() { - String platformInstance = "testInstance"; - String entityType = "dataset"; - Urn urn = Utils.platformUrn(); - - MetadataChangeProposal mcp = Utils.platformInstanceMcp(platformInstance, urn, entityType); - - assertNotNull(mcp); - assertEquals(mcp.getEntityUrn(), urn); - assertEquals(mcp.getEntityType(), entityType); - assertEquals(mcp.getAspectName(), Constants.DATA_PLATFORM_INSTANCE_ASPECT_NAME); - assertEquals(mcp.getChangeType(), ChangeType.UPSERT); - } - @Test public void testPlatformUrn() { DataPlatformUrn urn = Utils.platformUrn(); diff --git a/metadata-service/iceberg-catalog/src/test/java/io/datahubproject/iceberg/catalog/ViewOpsDelegateTest.java b/metadata-service/iceberg-catalog/src/test/java/io/datahubproject/iceberg/catalog/ViewOpsDelegateTest.java new file mode 100644 index 00000000000000..3a47caaaf26a64 --- /dev/null +++ b/metadata-service/iceberg-catalog/src/test/java/io/datahubproject/iceberg/catalog/ViewOpsDelegateTest.java @@ -0,0 +1,424 @@ +package io.datahubproject.iceberg.catalog; + +import static com.linkedin.metadata.Constants.*; +import static com.linkedin.metadata.utils.GenericRecordUtils.serializeAspect; +import static io.datahubproject.iceberg.catalog.DataHubIcebergWarehouse.DATASET_ICEBERG_METADATA_ASPECT_NAME; +import static io.datahubproject.iceberg.catalog.Utils.containerUrn; +import static io.datahubproject.iceberg.catalog.Utils.platformUrn; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.*; +import static org.testng.Assert.*; + +import com.linkedin.common.AuditStamp; +import com.linkedin.common.SubTypes; +import com.linkedin.common.urn.CorpuserUrn; +import com.linkedin.common.urn.DatasetUrn; +import com.linkedin.container.Container; +import com.linkedin.data.template.StringArray; +import com.linkedin.dataset.DatasetProfile; +import com.linkedin.dataset.DatasetProperties; +import com.linkedin.dataset.IcebergCatalogInfo; +import com.linkedin.dataset.ViewProperties; +import com.linkedin.entity.Aspect; +import com.linkedin.entity.EnvelopedAspect; +import com.linkedin.metadata.aspect.batch.AspectsBatch; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.mxe.MetadataChangeProposal; +import com.linkedin.mxe.SystemMetadata; +import com.linkedin.schema.SchemaMetadata; +import com.linkedin.util.Pair; +import io.datahubproject.metadata.context.ActorContext; +import io.datahubproject.metadata.context.OperationContext; +import io.datahubproject.schematron.converters.avro.AvroSchemaConverter; +import java.time.Instant; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import org.apache.iceberg.Schema; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.*; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.view.*; +import org.mockito.*; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class ViewOpsDelegateTest { + @Mock private DataHubIcebergWarehouse mockWarehouse; + @Mock private EntityService mockEntityService; + @Mock private OperationContext mockOperationContext; + @Mock private FileIOFactory mockFileIOFactory; + @Mock private FileIO mockFileIO; + @Mock private IcebergBatch mockIcebergBatch; + @Mock private AspectsBatch mockAspectsBatch; + + private DatasetProfile stubDatasetProfile; + + private static final TableIdentifier identifier = TableIdentifier.of("db", "entity"); + private static final String catalogName = "fooCatalog"; + private static final String fullName = "fooCatalog.db.entity"; + + private ViewOpsDelegate viewDelegate; + + @BeforeMethod + public void setup() { + MockitoAnnotations.openMocks(this); + when(mockFileIOFactory.createIO(any(), any(), any(Set.class))).thenReturn(mockFileIO); + when(mockFileIOFactory.createIO(any(), any(), any(TableMetadata.class))).thenReturn(mockFileIO); + + when(mockWarehouse.getPlatformInstance()).thenReturn(catalogName); + + AuditStamp batchAuditStamp = new AuditStamp().setTime(Instant.now().toEpochMilli()); + when(mockIcebergBatch.getAuditStamp()).thenReturn(batchAuditStamp); + when(mockIcebergBatch.asAspectsBatch()).thenReturn(mockAspectsBatch); + + stubDatasetProfile = + new DatasetProfile() + .setColumnCount(2) + .setRowCount(3) + .setTimestampMillis(batchAuditStamp.getTime()); + + viewDelegate = + new ViewOpsDelegate( + mockWarehouse, identifier, mockEntityService, mockOperationContext, mockFileIOFactory) { + @Override + IcebergBatch newIcebergBatch(OperationContext operationContext) { + return mockIcebergBatch; + } + + @Override + protected DatasetProfile getDataSetProfile(ViewMetadata metadata) { + return stubDatasetProfile; + } + }; + + ActorContext actorContext = mock(ActorContext.class); + when(mockOperationContext.getActorContext()).thenReturn(actorContext); + when(actorContext.getActorUrn()).thenReturn(new CorpuserUrn("urn:li:corpuser:testUser")); + } + + private ViewProperties mockSqlRepresentation(ViewMetadata viewMetadata) { + String dialect = "someDialect"; + String sql = "select * from something"; + + SQLViewRepresentation sqlViewRepresentation = mock(SQLViewRepresentation.class); + ViewVersion viewVersion = mock(ViewVersion.class); + + when(sqlViewRepresentation.dialect()).thenReturn(dialect); + when(sqlViewRepresentation.sql()).thenReturn(sql); + + when(viewVersion.representations()).thenReturn(List.of(sqlViewRepresentation)); + when(viewMetadata.currentVersion()).thenReturn(viewVersion); + + return new ViewProperties().setViewLogic(sql).setMaterialized(false).setViewLanguage(dialect); + } + + // CREATION + @Test + public void testCreateViewSuccess() { + Schema schema = + new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get())); + ViewMetadata metadata = mock(ViewMetadata.class); + when(metadata.schema()).thenReturn(schema); + when(metadata.location()).thenReturn("s3://bucket/table"); + + ViewProperties viewProperties = mockSqlRepresentation(metadata); + + // Simulating new table creation + when(mockWarehouse.getIcebergMetadataEnveloped(identifier)).thenReturn(null); + + DatasetUrn datasetUrn = mock(DatasetUrn.class); + + when(mockWarehouse.createDataset(eq(identifier), eq(true), same(mockIcebergBatch))) + .thenReturn(datasetUrn); + IcebergBatch.EntityBatch entityBatch = mock(IcebergBatch.EntityBatch.class); + when(mockIcebergBatch.createEntity( + eq(datasetUrn), + eq(DATASET_ENTITY_NAME), + eq(DATASET_ICEBERG_METADATA_ASPECT_NAME), + eq( + new IcebergCatalogInfo() + .setMetadataPointer("s3://bucket/metadata/00001-metadata.json") + .setView(true)))) + .thenReturn(entityBatch); + + viewDelegate.doCommit( + null, new MetadataWrapper<>(metadata), () -> "s3://bucket/metadata/00001-metadata.json"); + + verify(mockWarehouse).createDataset(eq(identifier), eq(true), same(mockIcebergBatch)); + verify(mockEntityService) + .ingestProposal(same(mockOperationContext), same(mockAspectsBatch), eq(false)); + + // verify schema + org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema, fullName); + AvroSchemaConverter converter = AvroSchemaConverter.builder().build(); + SchemaMetadata schemaMetadata = + converter.toDataHubSchema(avroSchema, false, false, platformUrn(), null); + verify(entityBatch).aspect(eq(SCHEMA_METADATA_ASPECT_NAME), eq(schemaMetadata)); + + // other aspects populated during creation + verify(entityBatch) + .aspect( + eq(DATASET_PROPERTIES_ASPECT_NAME), + eq(new DatasetProperties().setName(identifier.name()).setQualifiedName(fullName))); + verify(entityBatch) + .aspect( + eq(CONTAINER_ASPECT_NAME), + eq(new Container().setContainer(containerUrn(catalogName, Namespace.of("db"))))); + verify(entityBatch) + .aspect( + eq(SUB_TYPES_ASPECT_NAME), eq(new SubTypes().setTypeNames(new StringArray("View")))); + verify(entityBatch).platformInstance(eq(catalogName)); + verify(entityBatch).aspect(eq(VIEW_PROPERTIES_ASPECT_NAME), eq(viewProperties)); + + verifyDatasetProfile(); + + verifyNoMoreInteractions(entityBatch); + } + + @Test( + expectedExceptions = AlreadyExistsException.class, + expectedExceptionsMessageRegExp = "View already exists: " + fullName) + public void testCreateViewAlreadyExistsFailure() { + mockWarehouseIcebergMetadata("someLocation", true, "version1"); + ViewMetadata newMetadata = mock(ViewMetadata.class); + + viewDelegate.doCommit(null, new MetadataWrapper<>(newMetadata), null); + } + + @Test( + expectedExceptions = AlreadyExistsException.class, + expectedExceptionsMessageRegExp = "View already exists: " + fullName) + public void testCreateViewConcurrencyFailure() { + mockWarehouseIcebergMetadata("someLocation", true, "version1"); + IcebergBatch icebergBatch = mock(IcebergBatch.class); + when(mockWarehouse.createDataset(eq(identifier), eq(true), same(icebergBatch))) + .thenThrow(ValidationException.class); + + viewDelegate.doCommit(null, new MetadataWrapper<>(mock(ViewMetadata.class)), null); + } + + private Pair mockWarehouseIcebergMetadata( + String metadataPointer, boolean view, String version) { + IcebergCatalogInfo existingMetadata = + new IcebergCatalogInfo().setMetadataPointer(metadataPointer).setView(view); + + EnvelopedAspect envelopedAspect = + new EnvelopedAspect() + .setValue(new Aspect(existingMetadata.data())) + .setSystemMetadata(new SystemMetadata().setVersion(version)); + + DatasetUrn datasetUrn = mock(DatasetUrn.class); + Pair result = new Pair<>(envelopedAspect, datasetUrn); + when(mockWarehouse.getIcebergMetadataEnveloped(identifier)).thenReturn(result); + + return result; + } + + // UPDATES + @Test + public void testUpdateViewSuccess() { + String existingLocation = "s3://bucket/metadata/00001-metadata.json"; + String existingVersion = "version1"; + int existingSchemaId = 1; + + Schema schema = + new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get())); + ViewMetadata metadata = mock(ViewMetadata.class); + when(metadata.schema()).thenReturn(schema); + when(metadata.location()).thenReturn("s3://bucket/table"); + when(metadata.currentSchemaId()).thenReturn(existingSchemaId + 1); + + ViewProperties viewProperties = mockSqlRepresentation(metadata); + + ViewMetadata base = mock(ViewMetadata.class); + when(base.metadataFileLocation()).thenReturn(existingLocation); + when(base.currentSchemaId()).thenReturn(existingSchemaId); + + Pair existingDatasetAspect = + mockWarehouseIcebergMetadata(existingLocation, true, existingVersion); + when(mockWarehouse.getIcebergMetadataEnveloped(identifier)).thenReturn(existingDatasetAspect); + + DatasetUrn datasetUrn = existingDatasetAspect.getSecond(); + + String newMetadataPointerLocation = "s3://bucket/metadata/00002-metadata.json"; + IcebergCatalogInfo newgCatalogInfo = + new IcebergCatalogInfo().setMetadataPointer(newMetadataPointerLocation).setView(true); + + IcebergBatch.EntityBatch entityBatch = mock(IcebergBatch.EntityBatch.class); + when(mockIcebergBatch.conditionalUpdateEntity( + eq(datasetUrn), + eq(DATASET_ENTITY_NAME), + eq(DATASET_ICEBERG_METADATA_ASPECT_NAME), + eq(newgCatalogInfo), + eq(existingVersion))) + .thenReturn(entityBatch); + + viewDelegate.doCommit( + new MetadataWrapper<>(base), + new MetadataWrapper<>(metadata), + () -> newMetadataPointerLocation); + + verify(mockEntityService) + .ingestProposal(same(mockOperationContext), same(mockAspectsBatch), eq(false)); + + // verify schema + org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema, fullName); + AvroSchemaConverter converter = AvroSchemaConverter.builder().build(); + SchemaMetadata schemaMetadata = + converter.toDataHubSchema(avroSchema, false, false, platformUrn(), null); + verify(entityBatch).aspect(eq(SCHEMA_METADATA_ASPECT_NAME), eq(schemaMetadata)); + + verify(entityBatch).aspect(eq(VIEW_PROPERTIES_ASPECT_NAME), eq(viewProperties)); + verifyNoMoreInteractions(entityBatch); + + verifyDatasetProfile(); + } + + @Test( + expectedExceptions = CommitFailedException.class, + expectedExceptionsMessageRegExp = "Cannot commit to view " + fullName + ": stale metadata") + public void testUpdateViewConcurrencyFailure() { + String existingLocation = "s3://bucket/metadata/00001-metadata.json"; + String existingVersion = "version1"; + + String newMetadataPointerLocation = "s3://bucket/metadata/00002-metadata.json"; + + ViewMetadata base = mock(ViewMetadata.class); + when(base.metadataFileLocation()).thenReturn(existingLocation); + + Pair existingDatasetAspect = + mockWarehouseIcebergMetadata(existingLocation, true, existingVersion); + when(mockWarehouse.getIcebergMetadataEnveloped(identifier)).thenReturn(existingDatasetAspect); + + when(mockEntityService.ingestProposal( + same(mockOperationContext), same(mockAspectsBatch), eq(false))) + .thenThrow(ValidationException.class); + + ViewMetadata metadata = mock(ViewMetadata.class); + when(metadata.location()).thenReturn("s3://bucket/table"); + + IcebergCatalogInfo newCatalogInfo = + new IcebergCatalogInfo().setMetadataPointer(newMetadataPointerLocation).setView(true); + + IcebergBatch.EntityBatch entityBatch = mock(IcebergBatch.EntityBatch.class); + when(mockIcebergBatch.conditionalUpdateEntity( + eq(existingDatasetAspect.getSecond()), + eq(DATASET_ENTITY_NAME), + eq(DATASET_ICEBERG_METADATA_ASPECT_NAME), + eq(newCatalogInfo), + eq(existingVersion))) + .thenReturn(entityBatch); + + ViewProperties viewProperties = mockSqlRepresentation(metadata); + viewDelegate.doCommit( + new MetadataWrapper<>(base), + new MetadataWrapper<>(metadata), + () -> newMetadataPointerLocation); + + verify(mockIcebergBatch) + .conditionalUpdateEntity( + eq(existingDatasetAspect.getSecond()), + eq(DATASET_ENTITY_NAME), + eq(DATASET_ICEBERG_METADATA_ASPECT_NAME), + eq(newCatalogInfo), + eq(existingVersion)); + + verify(entityBatch).aspect(eq(VIEW_PROPERTIES_ASPECT_NAME), eq(viewProperties)); + verifyNoMoreInteractions(entityBatch); + } + + @Test + public void testUpdateViewStaleMetadataFailure() { + String existingLocation = "s3://bucket/metadata/00002-metadata.json"; + String staleLocationBeingUpdated = "s3://bucket/metadata/00001-metadata.json"; + + ViewMetadata base = mock(ViewMetadata.class); + when(base.metadataFileLocation()).thenReturn(staleLocationBeingUpdated); + + Pair existingDatasetAspect = + mockWarehouseIcebergMetadata(existingLocation, true, "version1"); + when(mockWarehouse.getIcebergMetadataEnveloped(identifier)).thenReturn(existingDatasetAspect); + + // new metadata location, metadataWriter etc. should not be accessed in this case + try { + viewDelegate.doCommit( + new MetadataWrapper<>(base), new MetadataWrapper<>(mock(ViewMetadata.class)), null); + fail(); + } catch (CommitFailedException e) { + assertEquals(e.getMessage(), "Cannot commit to view " + fullName + ": stale metadata"); + } + + verifyNoInteractions(mockEntityService); + verifyNoInteractions(mockFileIOFactory); + } + + private void verifyDatasetProfile() { + ArgumentCaptor datasetProfileMcpCaptor = + ArgumentCaptor.forClass(MetadataChangeProposal.class); + verify(mockEntityService) + .ingestProposal( + same(mockOperationContext), datasetProfileMcpCaptor.capture(), any(), eq(true)); + assertEquals( + datasetProfileMcpCaptor.getValue().getAspect(), serializeAspect(stubDatasetProfile)); + } + + @Test( + expectedExceptions = NoSuchViewException.class, + expectedExceptionsMessageRegExp = "No such view " + fullName) + public void testUpdateViewButIsTableFailure() { + mockWarehouseIcebergMetadata("someLocation", false, "version1"); + + viewDelegate.doCommit( + new MetadataWrapper<>(mock(ViewMetadata.class)), + new MetadataWrapper<>(mock(ViewMetadata.class)), + null); + } + + @Test(expectedExceptions = IllegalStateException.class) + public void testMissingIcebergMetadata() { + when(mockWarehouse.getIcebergMetadataEnveloped(identifier)).thenReturn(null); + viewDelegate.doCommit( + new MetadataWrapper<>(mock(ViewMetadata.class)), + new MetadataWrapper<>(mock(ViewMetadata.class)), + null); + } + + // REFRESH + @Test + public void testViewRefreshSuccessful() { + String location = "s3://bucket/metadata/00001-metadata.json"; + IcebergCatalogInfo metadata = + new IcebergCatalogInfo().setMetadataPointer(location).setView(true); + when(mockWarehouse.getIcebergMetadata(identifier)).thenReturn(Optional.of(metadata)); + InputFile inputFile = mock(InputFile.class); + when(mockFileIO.newInputFile(location)).thenReturn(inputFile); + + ViewMetadata expectedMetadata = mock(ViewMetadata.class); + + try (MockedStatic viewMetadataParserMock = + Mockito.mockStatic(ViewMetadataParser.class)) { + viewMetadataParserMock + .when(() -> ViewMetadataParser.read(same(inputFile))) + .thenReturn(expectedMetadata); + ViewMetadata actualMetadata = viewDelegate.refresh(); + assertSame(actualMetadata, expectedMetadata); + } + } + + @Test + public void testRefreshNotFound() { + when(mockWarehouse.getIcebergMetadata(identifier)).thenReturn(Optional.empty()); + assertNull(viewDelegate.refresh()); + } +}