diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index adb86c1bce1b37..b4326b8a746cd5 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -28,6 +28,8 @@ This file documents any backwards-incompatible changes in DataHub and assists pe - #12716: Fix the `platform_instance` being added twice to the URN. If you want to have the previous behavior back, you need to add your platform_instance twice (i.e. `plat.plat`). +- #12797: Previously endpoints when used in ASYNC mode would not validate URNs, entity & aspect names immediately. Starting with this release, even in ASYNC mode, these requests will be returned with http code 400. + ### Known Issues diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/MCPItem.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/MCPItem.java index b03e478fec0c72..c6e1c1a58814fc 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/MCPItem.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/MCPItem.java @@ -1,9 +1,11 @@ package com.linkedin.metadata.aspect.batch; import com.google.common.collect.ImmutableSet; +import com.linkedin.common.urn.Urn; import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.aspect.patch.template.AspectTemplateEngine; import com.linkedin.metadata.models.AspectSpec; +import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.mxe.MetadataChangeProposal; import com.linkedin.mxe.SystemMetadata; import java.util.Collections; @@ -82,4 +84,6 @@ static boolean supportsPatch(AspectSpec aspectSpec) { } return true; } + + default void validate(Urn urn, String aspectName, EntityRegistry entityRegistry) {} } diff --git a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java index 3ac1e957ffe179..ce435688a239a8 100644 --- a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java +++ b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java @@ -14,7 +14,6 @@ import com.linkedin.metadata.aspect.plugins.hooks.MutationHook; import com.linkedin.metadata.aspect.plugins.validation.ValidationExceptionCollection; import com.linkedin.metadata.entity.validation.ValidationException; -import com.linkedin.metadata.models.EntitySpec; import com.linkedin.mxe.MetadataChangeProposal; import com.linkedin.util.Pair; import java.util.ArrayList; @@ -153,10 +152,11 @@ private Stream proposedItemsToChangeItemStream(List { try { if (alternateMCPValidation) { - EntitySpec entitySpec = - retrieverContext - .getAspectRetriever() - .getEntityRegistry() - .getEntitySpec(mcp.getEntityType()); return ProposedItem.builder() - .metadataChangeProposal(mcp) - .entitySpec(entitySpec) - .auditStamp(auditStamp) - .build(); + .build( + mcp, + auditStamp, + retrieverContext.getAspectRetriever().getEntityRegistry()); } if (mcp.getChangeType().equals(ChangeType.PATCH)) { - return PatchItemImpl.PatchItemImplBuilder.build( - mcp, - auditStamp, - retrieverContext.getAspectRetriever().getEntityRegistry()); + return PatchItemImpl.builder() + .build( + mcp, + auditStamp, + retrieverContext.getAspectRetriever().getEntityRegistry()); } else { return ChangeItemImpl.builder() .build(mcp, auditStamp, retrieverContext.getAspectRetriever()); diff --git a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/ChangeItemImpl.java b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/ChangeItemImpl.java index 2415870585e4e1..5aca3833d4bb9b 100644 --- a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/ChangeItemImpl.java +++ b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/ChangeItemImpl.java @@ -13,7 +13,6 @@ import com.linkedin.metadata.aspect.batch.ChangeMCP; import com.linkedin.metadata.aspect.batch.MCPItem; import com.linkedin.metadata.aspect.patch.template.common.GenericPatchTemplate; -import com.linkedin.metadata.entity.AspectUtils; import com.linkedin.metadata.entity.validation.ValidationApiUtils; import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.models.EntitySpec; @@ -139,38 +138,51 @@ public ChangeItemImplBuilder systemMetadata(SystemMetadata systemMetadata) { return this; } + public ChangeItemImplBuilder changeType(ChangeType changeType) { + this.changeType = validateOrDefaultChangeType(changeType); + return this; + } + @SneakyThrows public ChangeItemImpl build(AspectRetriever aspectRetriever) { - // Apply change type default - this.changeType = validateOrDefaultChangeType(changeType); + if (this.changeType == null) { + changeType(null); // Apply change type default + } // Apply empty headers if (this.headers == null) { this.headers = Map.of(); } - if (this.urn == null && this.metadataChangeProposal != null) { - this.urn = this.metadataChangeProposal.getEntityUrn(); - } - ValidationApiUtils.validateUrn(aspectRetriever.getEntityRegistry(), this.urn); log.debug("entity type = {}", this.urn.getEntityType()); - entitySpec(aspectRetriever.getEntityRegistry().getEntitySpec(this.urn.getEntityType())); + entitySpec( + ValidationApiUtils.validateEntity( + aspectRetriever.getEntityRegistry(), this.urn.getEntityType())); log.debug("entity spec = {}", this.entitySpec); - aspectSpec(ValidationApiUtils.validate(this.entitySpec, this.aspectName)); + aspectSpec(ValidationApiUtils.validateAspect(this.entitySpec, this.aspectName)); log.debug("aspect spec = {}", this.aspectSpec); + if (this.recordTemplate == null && this.metadataChangeProposal != null) { + this.recordTemplate = convertToRecordTemplate(this.metadataChangeProposal, aspectSpec); + } + ValidationApiUtils.validateRecordTemplate( this.entitySpec, this.urn, this.recordTemplate, aspectRetriever); + if (this.systemMetadata == null) { + // generate default + systemMetadata(null); + } + return new ChangeItemImpl( this.changeType, this.urn, this.aspectName, this.recordTemplate, - SystemMetadataUtils.generateSystemMetadataIfEmpty(this.systemMetadata), + this.systemMetadata, this.auditStamp, this.metadataChangeProposal, this.entitySpec, @@ -183,35 +195,16 @@ public ChangeItemImpl build(AspectRetriever aspectRetriever) { public ChangeItemImpl build( MetadataChangeProposal mcp, AuditStamp auditStamp, AspectRetriever aspectRetriever) { - log.debug("entity type = {}", mcp.getEntityType()); - EntitySpec entitySpec = - aspectRetriever.getEntityRegistry().getEntitySpec(mcp.getEntityType()); - AspectSpec aspectSpec = AspectUtils.validateAspect(mcp, entitySpec); - - if (!MCPItem.isValidChangeType(ChangeType.UPSERT, aspectSpec)) { - throw new UnsupportedOperationException( - "ChangeType not supported: " - + mcp.getChangeType() - + " for aspect " - + mcp.getAspectName()); - } - - Urn urn = mcp.getEntityUrn(); - if (urn == null) { - urn = EntityKeyUtils.getUrnFromProposal(mcp, entitySpec.getKeyAspectSpec()); - } - - return ChangeItemImpl.builder() - .changeType(mcp.getChangeType()) - .urn(urn) - .aspectName(mcp.getAspectName()) - .systemMetadata( - SystemMetadataUtils.generateSystemMetadataIfEmpty(mcp.getSystemMetadata())) - .metadataChangeProposal(mcp) - .auditStamp(auditStamp) - .recordTemplate(convertToRecordTemplate(mcp, aspectSpec)) - .nextAspectVersion(this.nextAspectVersion) - .build(aspectRetriever); + // Validation includes: Urn, Entity, Aspect + this.metadataChangeProposal = + ValidationApiUtils.validateMCP(aspectRetriever.getEntityRegistry(), mcp); + this.urn = this.metadataChangeProposal.getEntityUrn(); // validation ensures existence + this.auditStamp = auditStamp; + this.aspectName = mcp.getAspectName(); // prior validation + changeType(mcp.getChangeType()); + this.systemMetadata = mcp.getSystemMetadata(); + this.headers = mcp.getHeaders(); + return build(aspectRetriever); } // specific to impl, other impls support PATCH, etc @@ -226,6 +219,11 @@ private static ChangeType validateOrDefaultChangeType(@Nullable ChangeType chang private static RecordTemplate convertToRecordTemplate( MetadataChangeProposal mcp, AspectSpec aspectSpec) { + + if (mcp.getAspect() == null) { + return null; + } + RecordTemplate aspect; try { aspect = diff --git a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/DeleteItemImpl.java b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/DeleteItemImpl.java index 8fffc18fe32718..8990d7626dffed 100644 --- a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/DeleteItemImpl.java +++ b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/DeleteItemImpl.java @@ -94,10 +94,12 @@ public DeleteItemImpl build(AspectRetriever aspectRetriever) { ValidationApiUtils.validateUrn(aspectRetriever.getEntityRegistry(), this.urn); log.debug("entity type = {}", this.urn.getEntityType()); - entitySpec(aspectRetriever.getEntityRegistry().getEntitySpec(this.urn.getEntityType())); + entitySpec( + ValidationApiUtils.validateEntity( + aspectRetriever.getEntityRegistry(), this.urn.getEntityType())); log.debug("entity spec = {}", this.entitySpec); - aspectSpec(ValidationApiUtils.validate(this.entitySpec, this.aspectName)); + aspectSpec(ValidationApiUtils.validateAspect(this.entitySpec, this.aspectName)); log.debug("aspect spec = {}", this.aspectSpec); return new DeleteItemImpl( diff --git a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/MCLItemImpl.java b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/MCLItemImpl.java index db24a107c5dd72..0cdd6b30706b6e 100644 --- a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/MCLItemImpl.java +++ b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/MCLItemImpl.java @@ -95,7 +95,8 @@ public MCLItemImpl build(AspectRetriever aspectRetriever) { log.debug("entity spec = {}", this.entitySpec); aspectSpec( - ValidationApiUtils.validate(this.entitySpec, this.metadataChangeLog.getAspectName())); + ValidationApiUtils.validateAspect( + this.entitySpec, this.metadataChangeLog.getAspectName())); log.debug("aspect spec = {}", this.aspectSpec); Pair aspects = diff --git a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/PatchItemImpl.java b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/PatchItemImpl.java index 5e4e36cfe6fbd8..74800adc2daf32 100644 --- a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/PatchItemImpl.java +++ b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/PatchItemImpl.java @@ -2,7 +2,6 @@ import static com.linkedin.metadata.Constants.INGESTION_MAX_SERIALIZED_STRING_LENGTH; import static com.linkedin.metadata.Constants.MAX_JACKSON_STRING_SIZE; -import static com.linkedin.metadata.entity.AspectUtils.validateAspect; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.StreamReadConstraints; @@ -151,65 +150,82 @@ public ChangeItemImpl applyPatch(RecordTemplate recordTemplate, AspectRetriever public static class PatchItemImplBuilder { + // Ensure use of other builders + private PatchItemImpl build() { + return null; + } + public PatchItemImpl.PatchItemImplBuilder systemMetadata(SystemMetadata systemMetadata) { this.systemMetadata = SystemMetadataUtils.generateSystemMetadataIfEmpty(systemMetadata); return this; } + public PatchItemImpl.PatchItemImplBuilder aspectSpec(AspectSpec aspectSpec) { + if (!MCPItem.isValidChangeType(ChangeType.PATCH, aspectSpec)) { + throw new UnsupportedOperationException( + "ChangeType not supported: " + ChangeType.PATCH + " for aspect " + this.aspectName); + } + this.aspectSpec = aspectSpec; + return this; + } + + public PatchItemImpl.PatchItemImplBuilder patch(JsonPatch patch) { + if (patch == null) { + throw new IllegalArgumentException(String.format("Missing patch to apply. Item: %s", this)); + } + this.patch = patch; + return this; + } + public PatchItemImpl build(EntityRegistry entityRegistry) { - ValidationApiUtils.validateUrn(entityRegistry, this.urn); + urn(ValidationApiUtils.validateUrn(entityRegistry, this.urn)); log.debug("entity type = {}", this.urn.getEntityType()); - entitySpec(entityRegistry.getEntitySpec(this.urn.getEntityType())); + entitySpec(ValidationApiUtils.validateEntity(entityRegistry, this.urn.getEntityType())); log.debug("entity spec = {}", this.entitySpec); - aspectSpec(ValidationApiUtils.validate(this.entitySpec, this.aspectName)); + aspectSpec(ValidationApiUtils.validateAspect(this.entitySpec, this.aspectName)); log.debug("aspect spec = {}", this.aspectSpec); - if (this.patch == null) { - throw new IllegalArgumentException( - String.format("Missing patch to apply. Aspect: %s", this.aspectSpec.getName())); + if (this.systemMetadata == null) { + // generate default + systemMetadata(null); } return new PatchItemImpl( this.urn, this.aspectName, - SystemMetadataUtils.generateSystemMetadataIfEmpty(this.systemMetadata), + this.systemMetadata, this.auditStamp, - this.patch, + Objects.requireNonNull(this.patch), this.metadataChangeProposal, this.entitySpec, this.aspectSpec); } - public static PatchItemImpl build( + public PatchItemImpl build( MetadataChangeProposal mcp, AuditStamp auditStamp, EntityRegistry entityRegistry) { - log.debug("entity type = {}", mcp.getEntityType()); - EntitySpec entitySpec = entityRegistry.getEntitySpec(mcp.getEntityType()); - AspectSpec aspectSpec = validateAspect(mcp, entitySpec); - if (!MCPItem.isValidChangeType(ChangeType.PATCH, aspectSpec)) { - throw new UnsupportedOperationException( - "ChangeType not supported: " - + mcp.getChangeType() - + " for aspect " - + mcp.getAspectName()); - } + // Validation includes: Urn, Entity, Aspect + this.metadataChangeProposal = ValidationApiUtils.validateMCP(entityRegistry, mcp); + this.urn = this.metadataChangeProposal.getEntityUrn(); // validation ensures existence + this.auditStamp = auditStamp; + this.aspectName = mcp.getAspectName(); + systemMetadata(mcp.getSystemMetadata()); + patch(convertToJsonPatch(mcp)); - Urn urn = mcp.getEntityUrn(); - if (urn == null) { - urn = EntityKeyUtils.getUrnFromProposal(mcp, entitySpec.getKeyAspectSpec()); - } + entitySpec(entityRegistry.getEntitySpec(this.urn.getEntityType())); // prior validation + aspectSpec(entitySpec.getAspectSpec(this.aspectName)); // prior validation - return PatchItemImpl.builder() - .urn(urn) - .aspectName(mcp.getAspectName()) - .systemMetadata( - SystemMetadataUtils.generateSystemMetadataIfEmpty(mcp.getSystemMetadata())) - .metadataChangeProposal(mcp) - .auditStamp(auditStamp) - .patch(convertToJsonPatch(mcp)) - .build(entityRegistry); + return new PatchItemImpl( + this.urn, + this.aspectName, + this.systemMetadata, + this.auditStamp, + this.patch, + this.metadataChangeProposal, + this.entitySpec, + this.aspectSpec); } public static JsonPatch convertToJsonPatch(MetadataChangeProposal mcp) { diff --git a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/ProposedItem.java b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/ProposedItem.java index 370f1f6f073e65..eaa423e1f1ad34 100644 --- a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/ProposedItem.java +++ b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/ProposedItem.java @@ -6,14 +6,14 @@ import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.aspect.batch.BatchItem; import com.linkedin.metadata.aspect.batch.MCPItem; +import com.linkedin.metadata.entity.validation.ValidationApiUtils; import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.models.EntitySpec; -import com.linkedin.metadata.utils.EntityKeyUtils; +import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.utils.GenericRecordUtils; import com.linkedin.metadata.utils.SystemMetadataUtils; import com.linkedin.mxe.MetadataChangeProposal; import com.linkedin.mxe.SystemMetadata; -import java.util.Objects; import javax.annotation.Nonnull; import javax.annotation.Nullable; import lombok.Builder; @@ -25,6 +25,7 @@ @Getter @Builder(toBuilder = true) public class ProposedItem implements MCPItem { + @Nonnull private final Urn urn; @Nonnull private final MetadataChangeProposal metadataChangeProposal; @Nonnull private final AuditStamp auditStamp; // derived @@ -64,17 +65,6 @@ public RecordTemplate getRecordTemplate() { return null; } - @Nonnull - @Override - public Urn getUrn() { - Urn urn = metadataChangeProposal.getEntityUrn(); - if (urn == null) { - urn = - EntityKeyUtils.getUrnFromProposal(metadataChangeProposal, entitySpec.getKeyAspectSpec()); - } - return urn; - } - @Nullable @Override public SystemMetadata getSystemMetadata() { @@ -114,16 +104,37 @@ public int hashCode() { } public static class ProposedItemBuilder { - public ProposedItem build() { - // Ensure systemMetadata + // Ensure use of other builders + private ProposedItem build() { + return null; + } + + public ProposedItem build( + @Nonnull MetadataChangeProposal metadataChangeProposal, + AuditStamp auditStamp, + @Nonnull EntityRegistry entityRegistry) { + + // Validation includes: Urn, Entity, Aspect + this.metadataChangeProposal = + ValidationApiUtils.validateMCP(entityRegistry, metadataChangeProposal); + this.auditStamp = auditStamp; + this.metadataChangeProposal.setSystemMetadata( + SystemMetadataUtils.generateSystemMetadataIfEmpty( + this.metadataChangeProposal.getSystemMetadata())); + + this.urn = metadataChangeProposal.getEntityUrn(); // validation ensures existence + log.debug("entity type = {}", this.urn.getEntityType()); + + entitySpec(entityRegistry.getEntitySpec(this.urn.getEntityType())); // prior validation + log.debug("entity spec = {}", this.entitySpec); + + aspectSpec( + entitySpec.getAspectSpec( + this.metadataChangeProposal.getAspectName())); // prior validation + log.debug("aspect spec = {}", this.aspectSpec); + return new ProposedItem( - Objects.requireNonNull(this.metadataChangeProposal) - .setSystemMetadata( - SystemMetadataUtils.generateSystemMetadataIfEmpty( - this.metadataChangeProposal.getSystemMetadata())), - this.auditStamp, - this.entitySpec, - this.aspectSpec); + this.urn, this.metadataChangeProposal, this.auditStamp, this.entitySpec, this.aspectSpec); } } } diff --git a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/validation/ValidationApiUtils.java b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/validation/ValidationApiUtils.java index 8b5d8d696688e3..f9f71ca8778827 100644 --- a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/validation/ValidationApiUtils.java +++ b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/validation/ValidationApiUtils.java @@ -8,9 +8,12 @@ import com.linkedin.metadata.models.EntitySpec; import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.utils.EntityApiUtils; +import com.linkedin.metadata.utils.EntityKeyUtils; import com.linkedin.metadata.utils.EntityRegistryUrnValidator; import com.linkedin.metadata.utils.RecordTemplateValidator; import com.linkedin.metadata.utils.UrnValidationUtil; +import com.linkedin.mxe.MetadataChangeProposal; +import java.util.Objects; import java.util.function.Consumer; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -47,16 +50,33 @@ public static void validateTrimOrThrow(RecordTemplate record) { }); } - public static void validateUrn(@Nonnull EntityRegistry entityRegistry, @Nonnull final Urn urn) { + @Nonnull + public static Urn validateUrn(@Nonnull EntityRegistry entityRegistry, final Urn urn) { + if (urn == null) { + throw new ValidationException("Cannot validate null URN."); + } + UrnValidationUtil.validateUrn( entityRegistry, urn, Boolean.TRUE.equals( Boolean.parseBoolean( System.getenv().getOrDefault(STRICT_URN_VALIDATION_ENABLED, "false")))); + return urn; + } + + @Nonnull + public static EntitySpec validateEntity( + @Nonnull EntityRegistry entityRegistry, String entityType) { + EntitySpec entitySpec = entityRegistry.getEntitySpec(entityType); + if (entitySpec == null) { + throw new ValidationException("Unknown entity: " + entityType); + } + return entitySpec; } - public static AspectSpec validate(EntitySpec entitySpec, String aspectName) { + @Nonnull + public static AspectSpec validateAspect(@Nonnull EntitySpec entitySpec, String aspectName) { if (aspectName == null || aspectName.isEmpty()) { throw new UnsupportedOperationException( "Aspect name is required for create and update operations"); @@ -65,7 +85,7 @@ public static AspectSpec validate(EntitySpec entitySpec, String aspectName) { AspectSpec aspectSpec = entitySpec.getAspectSpec(aspectName); if (aspectSpec == null) { - throw new RuntimeException( + throw new ValidationException( String.format("Unknown aspect %s for entity %s", aspectName, entitySpec.getName())); } @@ -96,4 +116,42 @@ public static void validateRecordTemplate( RecordTemplateValidator.validateTrim(aspect, resultFunction, validator); } } + + /** + * Given an MCP validate 3 primary components: URN, Entity, Aspect validate against the + * EntityRegistry + * + * @param entityRegistry the entity registry + * @param mcp the MetadataChangeProposal to validate + * @return the validated MetadataChangeProposal + */ + public static MetadataChangeProposal validateMCP( + @Nonnull EntityRegistry entityRegistry, MetadataChangeProposal mcp) { + if (mcp == null) { + throw new UnsupportedOperationException("MetadataChangeProposal is required."); + } + + final EntitySpec entitySpec; + final Urn urn; + if (mcp.getEntityUrn() != null) { + urn = mcp.getEntityUrn(); + entitySpec = validateEntity(entityRegistry, urn.getEntityType()); + } else { + entitySpec = validateEntity(entityRegistry, mcp.getEntityType()); + urn = EntityKeyUtils.getUrnFromProposal(mcp, entitySpec.getKeyAspectSpec()); + mcp.setEntityUrn(urn); + } + + if (!Objects.equals(mcp.getEntityType(), urn.getEntityType())) { + throw new ValidationException( + String.format( + "URN entity type does not match MCP entity type. %s != %s", + urn.getEntityType(), mcp.getEntityType())); + } + + validateUrn(entityRegistry, urn); + validateAspect(entitySpec, mcp.getAspectName()); + + return mcp; + } } diff --git a/metadata-io/metadata-io-api/src/test/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImplTest.java b/metadata-io/metadata-io-api/src/test/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImplTest.java index 22b8c0628ab855..7010722536b8a9 100644 --- a/metadata-io/metadata-io-api/src/test/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImplTest.java +++ b/metadata-io/metadata-io-api/src/test/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImplTest.java @@ -25,6 +25,7 @@ import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig; import com.linkedin.metadata.aspect.plugins.hooks.MutationHook; import com.linkedin.metadata.entity.SearchRetriever; +import com.linkedin.metadata.entity.validation.ValidationException; import com.linkedin.metadata.models.registry.ConfigEntityRegistry; import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.models.registry.EntityRegistryException; @@ -35,7 +36,6 @@ import com.linkedin.metadata.utils.GenericRecordUtils; import com.linkedin.mxe.GenericAspect; import com.linkedin.mxe.MetadataChangeProposal; -import com.linkedin.mxe.SystemMetadata; import com.linkedin.structured.StructuredProperties; import com.linkedin.structured.StructuredPropertyValueAssignmentArray; import com.linkedin.util.Pair; @@ -227,13 +227,12 @@ public void toUpsertBatchItemsProposedItemTest() { List testItems = List.of( ProposedItem.builder() - .entitySpec(testRegistry.getEntitySpec(DATASET_ENTITY_NAME)) - .metadataChangeProposal( + .build( new MetadataChangeProposal() .setEntityUrn( UrnUtils.getUrn( "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)")) - .setAspectName("my-custom-aspect") + .setAspectName(STATUS_ASPECT_NAME) .setEntityType(DATASET_ENTITY_NAME) .setChangeType(ChangeType.UPSERT) .setAspect( @@ -241,18 +240,17 @@ public void toUpsertBatchItemsProposedItemTest() { .setContentType("application/json") .setValue( ByteString.copyString( - "{\"foo\":\"bar\"}", StandardCharsets.UTF_8))) - .setSystemMetadata(new SystemMetadata())) - .auditStamp(auditStamp) - .build(), + "{\"foo\":\"bar\",\"removed\":false}", + StandardCharsets.UTF_8))), + auditStamp, + testRegistry), ProposedItem.builder() - .entitySpec(testRegistry.getEntitySpec(DATASET_ENTITY_NAME)) - .metadataChangeProposal( + .build( new MetadataChangeProposal() .setEntityUrn( UrnUtils.getUrn( "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_deleted,PROD)")) - .setAspectName("my-custom-aspect") + .setAspectName(STATUS_ASPECT_NAME) .setEntityType(DATASET_ENTITY_NAME) .setChangeType(ChangeType.UPSERT) .setAspect( @@ -260,10 +258,10 @@ public void toUpsertBatchItemsProposedItemTest() { .setContentType("application/json") .setValue( ByteString.copyString( - "{\"foo\":\"bar\"}", StandardCharsets.UTF_8))) - .setSystemMetadata(new SystemMetadata())) - .auditStamp(auditStamp) - .build()); + "{\"foo\":\"bar\",\"removed\":false}", + StandardCharsets.UTF_8))), + auditStamp, + testRegistry)); AspectsBatchImpl testBatch = AspectsBatchImpl.builder().items(testItems).retrieverContext(retrieverContext).build(); @@ -307,6 +305,27 @@ public void toUpsertBatchItemsProposedItemTest() { "Mutation to status aspect"); } + @Test(expectedExceptions = ValidationException.class) + public void toUpsertBatchItemsProposedItemInvalidAspectTest() { + AuditStamp auditStamp = AuditStampUtils.createDefaultAuditStamp(); + ProposedItem.builder() + .build( + new MetadataChangeProposal() + .setEntityUrn( + UrnUtils.getUrn( + "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)")) + .setAspectName("my-custom-aspect") + .setEntityType(DATASET_ENTITY_NAME) + .setChangeType(ChangeType.UPSERT) + .setAspect( + new GenericAspect() + .setContentType("application/json") + .setValue( + ByteString.copyString("{\"foo\":\"bar\"}", StandardCharsets.UTF_8))), + auditStamp, + testRegistry); + } + @Test public void singleInvalidDoesntBreakBatch() { MetadataChangeProposal proposal1 = diff --git a/metadata-io/src/test/java/com/linkedin/metadata/aspect/hooks/IgnoreUnknownMutatorTest.java b/metadata-io/src/test/java/com/linkedin/metadata/aspect/hooks/IgnoreUnknownMutatorTest.java index 19be1eb14667d8..f54f224df49c32 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/aspect/hooks/IgnoreUnknownMutatorTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/aspect/hooks/IgnoreUnknownMutatorTest.java @@ -75,8 +75,7 @@ public void testUnknownFieldInTagAssociationArray() throws URISyntaxException { List testItems = List.of( ProposedItem.builder() - .entitySpec(TEST_REGISTRY.getEntitySpec(DATASET_ENTITY_NAME)) - .metadataChangeProposal( + .build( new MetadataChangeProposal() .setEntityUrn(TEST_DATASET_URN) .setAspectName(GLOBAL_TAGS_ASPECT_NAME) @@ -89,9 +88,9 @@ public void testUnknownFieldInTagAssociationArray() throws URISyntaxException { ByteString.copyString( "{\"tags\":[{\"tag\":\"urn:li:tag:Legacy\",\"foo\":\"bar\"}]}", StandardCharsets.UTF_8))) - .setSystemMetadata(new SystemMetadata())) - .auditStamp(AuditStampUtils.createDefaultAuditStamp()) - .build()); + .setSystemMetadata(new SystemMetadata()), + AuditStampUtils.createDefaultAuditStamp(), + TEST_REGISTRY)); List result = test.proposalMutation(testItems, retrieverContext).toList(); @@ -114,8 +113,7 @@ public void testUnknownFieldDatasetProperties() throws URISyntaxException { List testItems = List.of( ProposedItem.builder() - .entitySpec(TEST_REGISTRY.getEntitySpec(DATASET_ENTITY_NAME)) - .metadataChangeProposal( + .build( new MetadataChangeProposal() .setEntityUrn(TEST_DATASET_URN) .setAspectName(DATASET_PROPERTIES_ASPECT_NAME) @@ -127,10 +125,9 @@ public void testUnknownFieldDatasetProperties() throws URISyntaxException { .setValue( ByteString.copyString( "{\"foo\":\"bar\",\"customProperties\":{\"prop2\":\"pikachu\",\"prop1\":\"fakeprop\"}}", - StandardCharsets.UTF_8))) - .setSystemMetadata(new SystemMetadata())) - .auditStamp(AuditStampUtils.createDefaultAuditStamp()) - .build()); + StandardCharsets.UTF_8))), + AuditStampUtils.createDefaultAuditStamp(), + TEST_REGISTRY)); List result = test.proposalMutation(testItems, retrieverContext).toList(); diff --git a/metadata-io/src/test/java/com/linkedin/metadata/client/JavaEntityClientTest.java b/metadata-io/src/test/java/com/linkedin/metadata/client/JavaEntityClientTest.java index 4d977d179f91e4..d030a90f13820e 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/client/JavaEntityClientTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/client/JavaEntityClientTest.java @@ -222,18 +222,7 @@ void tesIngestOrderingWithProposedItem() throws RemoteInvocationException { .urn(testUrn) .request( ProposedItem.builder() - .metadataChangeProposal(mcp) - .entitySpec( - opContext - .getEntityRegistry() - .getEntitySpec(Constants.CONTAINER_ENTITY_NAME)) - .aspectSpec( - opContext - .getEntityRegistry() - .getEntitySpec(Constants.CONTAINER_ENTITY_NAME) - .getAspectSpec(Constants.STATUS_ASPECT_NAME)) - .auditStamp(auditStamp) - .build()) + .build(mcp, auditStamp, opContext.getEntityRegistry())) .result(UpdateAspectResult.builder().mcp(mcp).urn(testUrn).build()) .isUpdate(true) .publishedMCL(true) diff --git a/metadata-jobs/mce-consumer/build.gradle b/metadata-jobs/mce-consumer/build.gradle index 21951106ca6b24..17accb7ab7de34 100644 --- a/metadata-jobs/mce-consumer/build.gradle +++ b/metadata-jobs/mce-consumer/build.gradle @@ -38,6 +38,10 @@ dependencies { annotationProcessor externalDependency.lombok implementation externalDependency.awsMskIamAuth + + testImplementation externalDependency.testng + testImplementation externalDependency.mockito + testImplementation externalDependency.mockitoInline } task avroSchemaSources(type: Copy) { diff --git a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java index 98195f57526824..bc90451434c260 100644 --- a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java +++ b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java @@ -126,6 +126,9 @@ public void consume(final ConsumerRecord consumerRecord) log.debug("MetadataChangeProposal {}", event); } String urn = entityClient.ingestProposal(systemOperationContext, event, false); + if (urn == null) { + throw new IllegalStateException("Failed to ingest MCP."); + } log.info("Successfully processed MCP event urn: {}", urn); } catch (Throwable throwable) { log.error("MCP Processor Error", throwable); diff --git a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/batch/BatchMetadataChangeProposalsProcessor.java b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/batch/BatchMetadataChangeProposalsProcessor.java index 6f9798aa1da2c4..13efc95d491b0e 100644 --- a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/batch/BatchMetadataChangeProposalsProcessor.java +++ b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/batch/BatchMetadataChangeProposalsProcessor.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import javax.annotation.PostConstruct; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -109,32 +110,37 @@ public void consume(final List> consumerRe } } - List systemMetadataList = - metadataChangeProposals.stream().map(MetadataChangeProposal::getSystemMetadata).toList(); - systemOperationContext.withQueueSpan( - "consume", - systemMetadataList, - topicName, - () -> { - try { - List urns = - entityClient.batchIngestProposals( - systemOperationContext, metadataChangeProposals, false); - log.info("Successfully processed MCP event urns: {}", urns); - } catch (Throwable throwable) { - log.error("MCP Processor Error", throwable); - Span currentSpan = Span.current(); - currentSpan.recordException(throwable); - currentSpan.setStatus(StatusCode.ERROR, throwable.getMessage()); - currentSpan.setAttribute(MetricUtils.ERROR_TYPE, throwable.getClass().getName()); - - kafkaProducer.produceFailedMetadataChangeProposal( - systemOperationContext, metadataChangeProposals, throwable); - } - }, - BATCH_SIZE_ATTR, - String.valueOf(metadataChangeProposals.size()), - MetricUtils.DROPWIZARD_NAME, - MetricUtils.name(this.getClass(), "consume")); + if (!metadataChangeProposals.isEmpty()) { + List systemMetadataList = + metadataChangeProposals.stream().map(MetadataChangeProposal::getSystemMetadata).toList(); + systemOperationContext.withQueueSpan( + "consume", + systemMetadataList, + topicName, + () -> { + try { + List urns = + entityClient.batchIngestProposals( + systemOperationContext, metadataChangeProposals, false); + + log.info( + "Successfully processed MCP event urns: {}", + urns.stream().filter(Objects::nonNull).toList()); + } catch (Throwable throwable) { + log.error("MCP Processor Error", throwable); + Span currentSpan = Span.current(); + currentSpan.recordException(throwable); + currentSpan.setStatus(StatusCode.ERROR, throwable.getMessage()); + currentSpan.setAttribute(MetricUtils.ERROR_TYPE, throwable.getClass().getName()); + + kafkaProducer.produceFailedMetadataChangeProposal( + systemOperationContext, metadataChangeProposals, throwable); + } + }, + BATCH_SIZE_ATTR, + String.valueOf(metadataChangeProposals.size()), + MetricUtils.DROPWIZARD_NAME, + MetricUtils.name(this.getClass(), "consume")); + } } } diff --git a/metadata-jobs/mce-consumer/src/test/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessorTest.java b/metadata-jobs/mce-consumer/src/test/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessorTest.java new file mode 100644 index 00000000000000..bd66ac66881dd1 --- /dev/null +++ b/metadata-jobs/mce-consumer/src/test/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessorTest.java @@ -0,0 +1,291 @@ +package com.linkedin.metadata.kafka; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.codahale.metrics.MetricRegistry; +import com.linkedin.common.Status; +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.entity.client.EntityClientConfig; +import com.linkedin.entity.client.SystemEntityClient; +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.gms.factory.config.ConfigurationProvider; +import com.linkedin.metadata.EventUtils; +import com.linkedin.metadata.client.SystemJavaEntityClient; +import com.linkedin.metadata.config.cache.client.EntityClientCacheConfig; +import com.linkedin.metadata.dao.throttle.ThrottleSensor; +import com.linkedin.metadata.entity.DeleteEntityService; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.event.EventProducer; +import com.linkedin.metadata.search.EntitySearchService; +import com.linkedin.metadata.search.LineageSearchService; +import com.linkedin.metadata.search.SearchService; +import com.linkedin.metadata.search.client.CachingEntitySearchService; +import com.linkedin.metadata.service.RollbackService; +import com.linkedin.metadata.timeseries.TimeseriesAspectService; +import com.linkedin.metadata.utils.GenericRecordUtils; +import com.linkedin.metadata.utils.metrics.MetricUtils; +import com.linkedin.mxe.MetadataChangeProposal; +import com.linkedin.mxe.Topics; +import io.datahubproject.metadata.context.OperationContext; +import io.datahubproject.test.metadata.context.TestOperationContexts; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import java.io.IOException; +import java.util.List; +import org.apache.avro.generic.GenericRecord; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.MockitoAnnotations; +import org.slf4j.MDC; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class MetadataChangeProposalsProcessorTest { + + private SystemEntityClient entityClient; + + private MetadataChangeProposalsProcessor processor; + + private final OperationContext opContext = + TestOperationContexts.systemContextNoSearchAuthorization(); + + @Mock private EntityService mockEntityService; + + @Mock private DeleteEntityService mockDeleteEntityService; + + @Mock private EntitySearchService mockEntitySearchService; + + @Mock private CachingEntitySearchService mockCachingEntitySearchService; + + @Mock private SearchService mockSearchService; + + @Mock private LineageSearchService mockLineageSearchService; + + @Mock private TimeseriesAspectService mockTimeseriesAspectService; + + @Mock private RollbackService mockRollbackService; + + @Mock private EventProducer mockKafkaProducer; + + @Mock private ThrottleSensor mockKafkaThrottle; + + @Mock private KafkaListenerEndpointRegistry mockRegistry; + + @Mock private ConfigurationProvider mockProvider; + + @Mock private ConsumerRecord mockConsumerRecord; + + @Mock private GenericRecord mockRecord; + + @Mock private Span mockSpan; + + private AutoCloseable mocks; + + private MockedStatic spanMock; + private MockedStatic metricUtilsMock; + private MockedStatic eventUtilsMock; + + @BeforeMethod + public void setup() { + mocks = MockitoAnnotations.openMocks(this); + + entityClient = + new SystemJavaEntityClient( + mockEntityService, + mockDeleteEntityService, + mockEntitySearchService, + mockCachingEntitySearchService, + mockSearchService, + mockLineageSearchService, + mockTimeseriesAspectService, + mockRollbackService, + mockKafkaProducer, + new EntityClientCacheConfig(), + EntityClientConfig.builder().build()); + + // Setup the processor + processor = + new MetadataChangeProposalsProcessor( + opContext, + entityClient, + mockKafkaProducer, + mockKafkaThrottle, + mockRegistry, + mockProvider); + + // Setup mocks for static methods + spanMock = mockStatic(Span.class); + spanMock.when(Span::current).thenReturn(mockSpan); + + metricUtilsMock = mockStatic(MetricUtils.class); + MetricRegistry mockMetricRegistry = mock(MetricRegistry.class); + metricUtilsMock.when(MetricUtils::get).thenReturn(mockMetricRegistry); + metricUtilsMock + .when(() -> MetricUtils.name(eq(MetadataChangeProposalsProcessor.class), any())) + .thenReturn("metricName"); + + eventUtilsMock = mockStatic(EventUtils.class); + + // Setup consumer record mock + when(mockConsumerRecord.value()).thenReturn(mockRecord); + when(mockConsumerRecord.key()).thenReturn("test-key"); // doesn't matter for test + when(mockConsumerRecord.topic()).thenReturn(Topics.METADATA_CHANGE_PROPOSAL); + when(mockConsumerRecord.partition()).thenReturn(0); + when(mockConsumerRecord.offset()).thenReturn(0L); + when(mockConsumerRecord.timestamp()).thenReturn(System.currentTimeMillis()); + when(mockConsumerRecord.serializedValueSize()).thenReturn(100); + } + + @AfterMethod + public void tearDown() throws Exception { + // Close static mocks first + if (spanMock != null) { + spanMock.close(); + spanMock = null; // Set to null after closing + } + + if (metricUtilsMock != null) { + metricUtilsMock.close(); + metricUtilsMock = null; // Set to null after closing + } + + if (eventUtilsMock != null) { + eventUtilsMock.close(); + eventUtilsMock = null; // Set to null after closing + } + + // Then close other mocks + if (mocks != null) { + mocks.close(); + mocks = null; // Set to null after closing + } + + MDC.clear(); + } + + @Test + public void testDeserializationFailure() throws Exception { + // Mock conversion from Avro to throw IOException + IOException deserializationException = new IOException("Failed to deserialize Avro record"); + eventUtilsMock + .when(() -> EventUtils.avroToPegasusMCP(mockRecord)) + .thenThrow(deserializationException); + + // Execute test + processor.consume(mockConsumerRecord); + + // Verify that kafkaProducer was not called (since we can't forward properly) + verify(mockKafkaProducer, never()).produceFailedMetadataChangeProposal(any(), any(), any()); + } + + @Test + public void testValidationFailureWithUrnValidationException() throws Exception { + // Create a MCP that will fail with a specific validation exception + MetadataChangeProposal mcp = new MetadataChangeProposal(); + // Invalid URN to trigger validation + Urn entityUrn = UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hive,test,INVALID)"); + mcp.setEntityUrn(entityUrn); + mcp.setEntityType("dataset"); + mcp.setAspectName("status"); + mcp.setChangeType(ChangeType.UPSERT); + mcp.setAspect(GenericRecordUtils.serializeAspect(new Status().setRemoved(false))); + + // Mock conversion from Avro to Pegasus MCP + eventUtilsMock.when(() -> EventUtils.avroToPegasusMCP(mockRecord)).thenReturn(mcp); + + // Execute test + processor.consume(mockConsumerRecord); + + ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Throwable.class); + verify(mockKafkaProducer) + .produceFailedMetadataChangeProposal( + eq(opContext), eq(List.of(mcp)), exceptionCaptor.capture()); + + // Verify kafkaProducer was called to produce the failed MCP + verify(mockKafkaProducer, times(1)) + .produceFailedMetadataChangeProposal(eq(opContext), eq(List.of(mcp)), any(Throwable.class)); + + // Verify error handling + Throwable validationException = exceptionCaptor.getValue(); + verify(mockSpan).recordException(validationException); + verify(mockSpan).setStatus(StatusCode.ERROR, "Failed to ingest MCP."); + } + + @Test + public void testValidationFailureWithEntityValidationException() throws Exception { + // Create a MCP that will fail with a specific validation exception + MetadataChangeProposal mcp = new MetadataChangeProposal(); + Urn entityUrn = UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hive,test,PROD)"); + mcp.setEntityUrn(entityUrn); + mcp.setEntityType("FOOBAR"); // Invalid entity type + mcp.setAspectName("status"); + mcp.setChangeType(ChangeType.UPSERT); + mcp.setAspect(GenericRecordUtils.serializeAspect(new Status().setRemoved(false))); + + // Mock conversion from Avro to Pegasus MCP + eventUtilsMock.when(() -> EventUtils.avroToPegasusMCP(mockRecord)).thenReturn(mcp); + + // Execute test + processor.consume(mockConsumerRecord); + + ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Throwable.class); + verify(mockKafkaProducer) + .produceFailedMetadataChangeProposal( + eq(opContext), eq(List.of(mcp)), exceptionCaptor.capture()); + + // Verify kafkaProducer was called to produce the failed MCP + verify(mockKafkaProducer, times(1)) + .produceFailedMetadataChangeProposal(eq(opContext), eq(List.of(mcp)), any(Throwable.class)); + + // Verify error handling + Throwable validationException = exceptionCaptor.getValue(); + verify(mockSpan).recordException(validationException); + verify(mockSpan) + .setStatus( + StatusCode.ERROR, "URN entity type does not match MCP entity type. dataset != FOOBAR"); + } + + @Test + public void testValidationFailureWithAspectValidationException() throws Exception { + // Create a MCP that will fail with a specific validation exception + MetadataChangeProposal mcp = new MetadataChangeProposal(); + Urn entityUrn = UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hive,test,PROD)"); + mcp.setEntityUrn(entityUrn); + mcp.setEntityType("dataset"); + mcp.setAspectName("INVALID"); // Invalid aspect + mcp.setChangeType(ChangeType.UPSERT); + mcp.setAspect(GenericRecordUtils.serializeAspect(new Status().setRemoved(false))); + + // Mock conversion from Avro to Pegasus MCP + eventUtilsMock.when(() -> EventUtils.avroToPegasusMCP(mockRecord)).thenReturn(mcp); + + // Execute test + processor.consume(mockConsumerRecord); + + ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Throwable.class); + verify(mockKafkaProducer) + .produceFailedMetadataChangeProposal( + eq(opContext), eq(List.of(mcp)), exceptionCaptor.capture()); + + // Verify kafkaProducer was called to produce the failed MCP + verify(mockKafkaProducer, times(1)) + .produceFailedMetadataChangeProposal(eq(opContext), eq(List.of(mcp)), any(Throwable.class)); + + // Verify error handling + Throwable validationException = exceptionCaptor.getValue(); + verify(mockSpan).recordException(validationException); + verify(mockSpan).setStatus(StatusCode.ERROR, "Unknown aspect INVALID for entity dataset"); + } +} diff --git a/metadata-jobs/mce-consumer/src/test/java/com/linkedin/metadata/kafka/batch/BatchMetadataChangeProposalsProcessorTest.java b/metadata-jobs/mce-consumer/src/test/java/com/linkedin/metadata/kafka/batch/BatchMetadataChangeProposalsProcessorTest.java new file mode 100644 index 00000000000000..2c7b372d829b8b --- /dev/null +++ b/metadata-jobs/mce-consumer/src/test/java/com/linkedin/metadata/kafka/batch/BatchMetadataChangeProposalsProcessorTest.java @@ -0,0 +1,390 @@ +package com.linkedin.metadata.kafka.batch; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.codahale.metrics.Histogram; +import com.codahale.metrics.MetricRegistry; +import com.linkedin.common.Status; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.entity.client.EntityClientConfig; +import com.linkedin.entity.client.SystemEntityClient; +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.gms.factory.config.ConfigurationProvider; +import com.linkedin.metadata.EventUtils; +import com.linkedin.metadata.aspect.batch.AspectsBatch; +import com.linkedin.metadata.client.SystemJavaEntityClient; +import com.linkedin.metadata.config.cache.client.EntityClientCacheConfig; +import com.linkedin.metadata.dao.throttle.ThrottleSensor; +import com.linkedin.metadata.entity.DeleteEntityService; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.event.EventProducer; +import com.linkedin.metadata.search.EntitySearchService; +import com.linkedin.metadata.search.LineageSearchService; +import com.linkedin.metadata.search.SearchService; +import com.linkedin.metadata.search.client.CachingEntitySearchService; +import com.linkedin.metadata.service.RollbackService; +import com.linkedin.metadata.timeseries.TimeseriesAspectService; +import com.linkedin.metadata.utils.GenericRecordUtils; +import com.linkedin.metadata.utils.metrics.MetricUtils; +import com.linkedin.mxe.MetadataChangeProposal; +import com.linkedin.mxe.SystemMetadata; +import com.linkedin.mxe.Topics; +import io.datahubproject.metadata.context.OperationContext; +import io.datahubproject.test.metadata.context.TestOperationContexts; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.avro.generic.GenericRecord; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.MockitoAnnotations; +import org.slf4j.MDC; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class BatchMetadataChangeProposalsProcessorTest { + + private SystemEntityClient entityClient; + private BatchMetadataChangeProposalsProcessor processor; + private final OperationContext opContext = + TestOperationContexts.systemContextNoSearchAuthorization(); + + @Mock private EntityService mockEntityService; + + @Mock private DeleteEntityService mockDeleteEntityService; + + @Mock private EntitySearchService mockEntitySearchService; + + @Mock private CachingEntitySearchService mockCachingEntitySearchService; + + @Mock private SearchService mockSearchService; + + @Mock private LineageSearchService mockLineageSearchService; + + @Mock private TimeseriesAspectService mockTimeseriesAspectService; + + @Mock private RollbackService mockRollbackService; + + @Mock private EventProducer mockKafkaProducer; + + @Mock private ThrottleSensor mockKafkaThrottle; + + @Mock private KafkaListenerEndpointRegistry mockRegistry; + + @Mock private ConfigurationProvider mockProvider; + + @Mock private Histogram mockHistogram; + + @Mock private ConsumerRecord mockConsumerRecord1; + + @Mock private ConsumerRecord mockConsumerRecord2; + + @Mock private ConsumerRecord mockConsumerRecord3; + + @Mock private GenericRecord mockRecord1; + + @Mock private GenericRecord mockRecord2; + + @Mock private GenericRecord mockRecord3; + + @Mock private Span mockSpan; + + private AutoCloseable mocks; + private MockedStatic spanMock; + private MockedStatic metricUtilsMock; + private MockedStatic eventUtilsMock; + + @BeforeMethod + public void setup() { + mocks = MockitoAnnotations.openMocks(this); + + // Create the entity client following the pattern in MetadataChangeProposalsProcessorTest + entityClient = + new SystemJavaEntityClient( + mockEntityService, + mockDeleteEntityService, + mockEntitySearchService, + mockCachingEntitySearchService, + mockSearchService, + mockLineageSearchService, + mockTimeseriesAspectService, + mockRollbackService, + mockKafkaProducer, + new EntityClientCacheConfig(), + EntityClientConfig.builder().build()); + + // Setup the processor + processor = + new BatchMetadataChangeProposalsProcessor( + opContext, + entityClient, + mockKafkaProducer, + mockKafkaThrottle, + mockRegistry, + mockProvider); + + // Set fmcpTopicName field via reflection + try { + java.lang.reflect.Field field = + BatchMetadataChangeProposalsProcessor.class.getDeclaredField("fmcpTopicName"); + field.setAccessible(true); + field.set(processor, Topics.FAILED_METADATA_CHANGE_PROPOSAL); + + field = BatchMetadataChangeProposalsProcessor.class.getDeclaredField("mceConsumerGroupId"); + field.setAccessible(true); + field.set(processor, "MetadataChangeProposal-Consumer"); + } catch (Exception e) { + throw new RuntimeException("Failed to set field via reflection", e); + } + + // Setup mocks for static methods + spanMock = mockStatic(Span.class); + spanMock.when(Span::current).thenReturn(mockSpan); + + metricUtilsMock = mockStatic(MetricUtils.class); + MetricRegistry mockMetricRegistry = mock(MetricRegistry.class); + when(mockMetricRegistry.histogram(any(String.class))).thenReturn(mockHistogram); + metricUtilsMock.when(MetricUtils::get).thenReturn(mockMetricRegistry); + metricUtilsMock + .when(() -> MetricUtils.name(eq(BatchMetadataChangeProposalsProcessor.class), any())) + .thenReturn("metricName"); + + eventUtilsMock = mockStatic(EventUtils.class); + + // Setup consumer record mocks + setupConsumerRecordMock(mockConsumerRecord1, mockRecord1, "test-key-1", 0, 0L); + setupConsumerRecordMock(mockConsumerRecord2, mockRecord2, "test-key-2", 0, 1L); + setupConsumerRecordMock(mockConsumerRecord3, mockRecord3, "test-key-3", 0, 2L); + } + + private void setupConsumerRecordMock( + ConsumerRecord consumerRecord, + GenericRecord record, + String key, + int partition, + long offset) { + when(consumerRecord.value()).thenReturn(record); + when(consumerRecord.key()).thenReturn(key); + when(consumerRecord.topic()).thenReturn(Topics.METADATA_CHANGE_PROPOSAL); + when(consumerRecord.partition()).thenReturn(partition); + when(consumerRecord.offset()).thenReturn(offset); + when(consumerRecord.timestamp()).thenReturn(System.currentTimeMillis()); + when(consumerRecord.serializedValueSize()).thenReturn(100); + } + + @AfterMethod + public void tearDown() throws Exception { + // Close static mocks first + if (spanMock != null) { + spanMock.close(); + spanMock = null; + } + + if (metricUtilsMock != null) { + metricUtilsMock.close(); + metricUtilsMock = null; + } + + if (eventUtilsMock != null) { + eventUtilsMock.close(); + eventUtilsMock = null; + } + + // Then close other mocks + if (mocks != null) { + mocks.close(); + mocks = null; + } + + MDC.clear(); + } + + @Test + public void testDeserializationFailure() throws Exception { + // Mock conversion from Avro to throw IOException + IOException deserializationException = new IOException("Failed to deserialize Avro record"); + eventUtilsMock + .when(() -> EventUtils.avroToPegasusMCP(mockRecord1)) + .thenThrow(deserializationException); + + List> records = List.of(mockConsumerRecord1); + + // Execute test + processor.consume(records); + + // Verify that kafkaProducer was not called (since we can't forward properly) + verify(mockKafkaProducer, never()).produceFailedMetadataChangeProposal(any(), any(), any()); + } + + @Test + public void testSuccessfulBatchIngestion() throws Exception { + // Create MCPs + MetadataChangeProposal mcp1 = new MetadataChangeProposal(); + mcp1.setSystemMetadata(new SystemMetadata()); + mcp1.setChangeType(ChangeType.UPSERT); + mcp1.setEntityUrn( + UrnUtils.getUrn( + "urn:li:dataset:(urn:li:dataPlatform:test,testSuccessfulBatchIngestion1,PROD)")); + mcp1.setAspect(GenericRecordUtils.serializeAspect(new Status().setRemoved(false))); + mcp1.setEntityType("dataset"); + mcp1.setAspectName("status"); + MetadataChangeProposal mcp2 = new MetadataChangeProposal(); + mcp2.setSystemMetadata(new SystemMetadata()); + mcp2.setChangeType(ChangeType.UPSERT); + mcp2.setEntityUrn( + UrnUtils.getUrn( + "urn:li:dataset:(urn:li:dataPlatform:test,testSuccessfulBatchIngestion2,PROD)")); + mcp2.setAspect(GenericRecordUtils.serializeAspect(new Status().setRemoved(false))); + mcp2.setEntityType("dataset"); + mcp2.setAspectName("status"); + + // Mock conversion from Avro to Pegasus MCP + eventUtilsMock.when(() -> EventUtils.avroToPegasusMCP(mockRecord1)).thenReturn(mcp1); + eventUtilsMock.when(() -> EventUtils.avroToPegasusMCP(mockRecord2)).thenReturn(mcp2); + + List> records = + List.of(mockConsumerRecord1, mockConsumerRecord2); + + // Execute test + processor.consume(records); + + // Verify that mockEntityService.batchIngestProposals was called + verify(mockEntityService, times(1)).ingestProposal(any(), any(), eq(false)); + + // Verify that kafkaProducer was not called (since ingestion was successful) + verify(mockKafkaProducer, never()).produceFailedMetadataChangeProposal(any(), any(), any()); + } + + @Test + public void testEmptyBatch() throws Exception { + // Execute test with empty list + processor.consume(new ArrayList<>()); + + // Verify that entityClient.batchIngestProposals was not called + verify(mockEntityService, never()) + .ingestProposal(any(OperationContext.class), any(), anyBoolean()); + + // Verify that kafkaProducer was not called + verify(mockKafkaProducer, never()).produceFailedMetadataChangeProposal(any(), any(), any()); + } + + @Test + public void testIngestionFailure() throws Exception { + // Create 3 Invalid MCPs + MetadataChangeProposal mcp1 = new MetadataChangeProposal(); + mcp1.setSystemMetadata(new SystemMetadata()); + mcp1.setChangeType(ChangeType.UPSERT); + mcp1.setEntityUrn(UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hive,test,INVALID)")); + mcp1.setAspect(GenericRecordUtils.serializeAspect(new Status().setRemoved(false))); + mcp1.setEntityType("dataset"); + mcp1.setAspectName("status"); + MetadataChangeProposal mcp2 = new MetadataChangeProposal(); + mcp2.setSystemMetadata(new SystemMetadata()); + mcp2.setChangeType(ChangeType.UPSERT); + mcp2.setEntityUrn( + UrnUtils.getUrn( + "urn:li:dataset:(urn:li:dataPlatform:test,testSuccessfulBatchIngestion2,PROD)")); + mcp2.setAspect(GenericRecordUtils.serializeAspect(new Status().setRemoved(false))); + mcp2.setEntityType("FOOBAR"); // Invalid entity type + mcp2.setAspectName("status"); + MetadataChangeProposal mcp3 = new MetadataChangeProposal(); + mcp3.setSystemMetadata(new SystemMetadata()); + mcp3.setChangeType(ChangeType.UPSERT); + mcp3.setEntityUrn( + UrnUtils.getUrn( + "urn:li:dataset:(urn:li:dataPlatform:test,testSuccessfulBatchIngestion2,PROD)")); + mcp3.setAspect(GenericRecordUtils.serializeAspect(new Status().setRemoved(false))); + mcp3.setEntityType("dataset"); + mcp3.setAspectName("INVALID"); // Invalid aspect + + // Mock conversion from Avro to Pegasus MCP + eventUtilsMock.when(() -> EventUtils.avroToPegasusMCP(mockRecord1)).thenReturn(mcp1); + eventUtilsMock.when(() -> EventUtils.avroToPegasusMCP(mockRecord2)).thenReturn(mcp2); + eventUtilsMock.when(() -> EventUtils.avroToPegasusMCP(mockRecord3)).thenReturn(mcp3); + + List> records = + List.of(mockConsumerRecord1, mockConsumerRecord2, mockConsumerRecord3); + + // Execute test + processor.consume(records); + + ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Throwable.class); + verify(mockKafkaProducer) + .produceFailedMetadataChangeProposal(eq(opContext), anyList(), exceptionCaptor.capture()); + + // Verify error handling + Throwable ingestionException = exceptionCaptor.getValue(); + verify(mockSpan).recordException(ingestionException); + verify(mockSpan).setStatus(StatusCode.ERROR, ingestionException.getMessage()); + + // Verify that kafkaProducer was called to produce the failed MCPs + ArgumentCaptor> mcpCaptor = ArgumentCaptor.forClass(List.class); + verify(mockKafkaProducer, times(1)) + .produceFailedMetadataChangeProposal( + eq(opContext), mcpCaptor.capture(), eq(ingestionException)); + + List capturedMCPs = mcpCaptor.getValue(); + assert capturedMCPs.size() == 3; + assert capturedMCPs.contains(mcp1); + assert capturedMCPs.contains(mcp2); + assert capturedMCPs.contains(mcp3); + + // Verify that ingestProposal was not called + verify(mockEntityService, never()) + .ingestProposal(any(OperationContext.class), any(), anyBoolean()); + } + + @Test + public void testMixedDeserializationResults() throws Exception { + // Mock successful conversion for one record and failure for the other + MetadataChangeProposal mcp1 = new MetadataChangeProposal(); + mcp1.setSystemMetadata(new SystemMetadata()); + mcp1.setChangeType(ChangeType.UPSERT); + mcp1.setEntityUrn( + UrnUtils.getUrn( + "urn:li:dataset:(urn:li:dataPlatform:test,testMixedDeserializationResults,PROD)")); + mcp1.setAspect(GenericRecordUtils.serializeAspect(new Status().setRemoved(false))); + mcp1.setEntityType("dataset"); + mcp1.setAspectName("status"); + + eventUtilsMock.when(() -> EventUtils.avroToPegasusMCP(mockRecord1)).thenReturn(mcp1); + + IOException deserializationException = new IOException("Failed to deserialize Avro record"); + eventUtilsMock + .when(() -> EventUtils.avroToPegasusMCP(mockRecord2)) + .thenThrow(deserializationException); + + List> records = + List.of(mockConsumerRecord1, mockConsumerRecord2); + + // Execute test + processor.consume(records); + + // Verify that entityClient.batchIngestProposals was called with only the successful MCP + ArgumentCaptor mcpCaptor = ArgumentCaptor.forClass(AspectsBatch.class); + verify(mockEntityService, times(1)).ingestProposal(any(), mcpCaptor.capture(), eq(false)); + + AspectsBatch aspectsBatch = mcpCaptor.getValue(); + assert aspectsBatch.getMCPItems().size() == 1; + assert aspectsBatch.getMCPItems().stream() + .anyMatch(i -> i.getMetadataChangeProposal().equals(mcp1)); + + // Verify that kafkaProducer was not called (since we handled the deserialize exception) + verify(mockKafkaProducer, never()).produceFailedMetadataChangeProposal(any(), any(), any()); + } +} diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/config/GlobalControllerExceptionHandler.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/config/GlobalControllerExceptionHandler.java index 1430faf2fd26b5..e0fe15f471f2fd 100644 --- a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/config/GlobalControllerExceptionHandler.java +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/config/GlobalControllerExceptionHandler.java @@ -45,8 +45,8 @@ public ResponseEntity handleConflict(RuntimeException ex) { return new ResponseEntity<>(ex.getMessage(), HttpStatus.BAD_REQUEST); } - @ExceptionHandler(InvalidUrnException.class) - public static ResponseEntity> handleUrnException(InvalidUrnException e) { + @ExceptionHandler({IllegalArgumentException.class, InvalidUrnException.class}) + public static ResponseEntity> handleUrnException(Exception e) { return new ResponseEntity<>(Map.of("error", e.getMessage()), HttpStatus.BAD_REQUEST); } diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/controller/EntityController.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/controller/EntityController.java index e8ba5f1a697efa..faaf79b4610b83 100644 --- a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/controller/EntityController.java +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/controller/EntityController.java @@ -165,23 +165,18 @@ protected AspectsBatch toMCPBatch( JsonNode jsonNodeAspect = aspect.getValue().get("value"); if (opContext.getValidationContext().isAlternateValidation()) { - ProposedItem.ProposedItemBuilder builder = + items.add( ProposedItem.builder() - .metadataChangeProposal( + .build( new MetadataChangeProposal() .setEntityUrn(entityUrn) .setAspectName(aspect.getKey()) .setEntityType(entityUrn.getEntityType()) .setChangeType(ChangeType.UPSERT) .setAspect(GenericRecordUtils.serializeAspect(jsonNodeAspect)) - .setSystemMetadata(SystemMetadataUtils.createDefaultSystemMetadata())) - .auditStamp(AuditStampUtils.createAuditStamp(actor.toUrnStr())) - .entitySpec( - opContext - .getAspectRetriever() - .getEntityRegistry() - .getEntitySpec(entityUrn.getEntityType())); - items.add(builder.build()); + .setSystemMetadata(SystemMetadataUtils.createDefaultSystemMetadata()), + AuditStampUtils.createAuditStamp(actor.toUrnStr()), + entityRegistry)); } else if (aspectSpec != null) { ChangeItemImpl.ChangeItemImplBuilder builder = ChangeItemImpl.builder() diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/controller/EntityController.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/controller/EntityController.java index 4d2a9ed215efc2..2610b842f5b120 100644 --- a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/controller/EntityController.java +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/controller/EntityController.java @@ -646,9 +646,9 @@ protected AspectsBatch toMCPBatch( JsonNode jsonNodeAspect = aspect.getValue().get("value"); if (opContext.getValidationContext().isAlternateValidation()) { - ProposedItem.ProposedItemBuilder builder = + items.add( ProposedItem.builder() - .metadataChangeProposal( + .build( new MetadataChangeProposal() .setEntityUrn(entityUrn) .setAspectName(aspect.getKey()) @@ -658,14 +658,9 @@ protected AspectsBatch toMCPBatch( .setHeaders( headers != null ? new StringMap(headers) : null, SetMode.IGNORE_NULL) - .setSystemMetadata(systemMetadata, SetMode.IGNORE_NULL)) - .auditStamp(AuditStampUtils.createAuditStamp(actor.toUrnStr())) - .entitySpec( - opContext - .getAspectRetriever() - .getEntityRegistry() - .getEntitySpec(entityUrn.getEntityType())); - items.add(builder.build()); + .setSystemMetadata(systemMetadata, SetMode.IGNORE_NULL), + AuditStampUtils.createAuditStamp(actor.toUrnStr()), + entityRegistry)); } else if (aspectSpec != null) { ChangeItemImpl.ChangeItemImplBuilder builder = ChangeItemImpl.builder() diff --git a/metadata-service/restli-servlet-impl/src/test/java/com/linkedin/metadata/resources/entity/AspectResourceTest.java b/metadata-service/restli-servlet-impl/src/test/java/com/linkedin/metadata/resources/entity/AspectResourceTest.java index 15dca1f56d3c19..c3f8b2f9563948 100644 --- a/metadata-service/restli-servlet-impl/src/test/java/com/linkedin/metadata/resources/entity/AspectResourceTest.java +++ b/metadata-service/restli-servlet-impl/src/test/java/com/linkedin/metadata/resources/entity/AspectResourceTest.java @@ -39,6 +39,7 @@ import java.util.Optional; import com.linkedin.mxe.SystemMetadata; +import com.linkedin.restli.server.RestLiServiceException; import io.datahubproject.metadata.context.OperationContext; import io.datahubproject.test.metadata.context.TestOperationContexts; import mock.MockEntityRegistry; @@ -150,7 +151,7 @@ public void testAsyncDefaultAspects() throws URISyntaxException { verifyNoMoreInteractions(producer); } - @Test + @Test(expectedExceptions = RestLiServiceException.class, expectedExceptionsMessageRegExp = "Unknown aspect notAnAspect for entity dataset") public void testNoValidateAsync() throws URISyntaxException { OperationContext noValidateOpContext = TestOperationContexts.systemContextNoValidate(); aspectResource.setSystemOperationContext(noValidateOpContext); @@ -170,10 +171,5 @@ public void testNoValidateAsync() throws URISyntaxException { Actor actor = new Actor(ActorType.USER, "user"); when(mockAuthentication.getActor()).thenReturn(actor); aspectResource.ingestProposal(mcp, "true"); - verify(producer, times(1)).produceMetadataChangeProposal(any(OperationContext.class), eq(urn), argThat(arg -> arg.getMetadataChangeProposal().equals(mcp))); - verifyNoMoreInteractions(producer); - verifyNoMoreInteractions(aspectDao); - reset(producer, aspectDao); - aspectResource.setSystemOperationContext(opContext); } } diff --git a/smoke-test/tests/openapi/v3/exceptions.json b/smoke-test/tests/openapi/v3/exceptions.json index 6ac498cc08c7b0..d9126ae71ace73 100644 --- a/smoke-test/tests/openapi/v3/exceptions.json +++ b/smoke-test/tests/openapi/v3/exceptions.json @@ -6,6 +6,13 @@ "method": "delete" } }, + { + "request": { + "url": "/openapi/v3/entity/dataset/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Atest%2CInvalidExceptions%2CPROD%29", + "description": "Remove dataset used for invalid tests", + "method": "delete" + } + }, { "request": { "url": "/openapi/v3/entity/dataset", @@ -67,5 +74,179 @@ "error": "Validation Error" } } + }, + { + "request": { + "url": "/openapi/v3/entity/dataset", + "description": "Test Invalid URN - SYNC", + "params": { + "createIfNotExists": "false", + "async": "false" + }, + "json": [ + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,InvalidExceptions,INVALID)", + "status": { + "value": { + "removed": false + } + } + } + ] + }, + "response": { + "status_codes": [ + 400 + ] + } + }, + { + "request": { + "url": "/openapi/v3/entity/dataset", + "description": "Test Invalid URN - ASYNC", + "params": { + "createIfNotExists": "false", + "async": "true" + }, + "json": [ + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,InvalidExceptions,INVALID)", + "status": { + "value": { + "removed": false + } + } + } + ] + }, + "response": { + "status_codes": [ + 400 + ] + } + }, + { + "request": { + "url": "/openapi/v3/entity/dataset", + "description": "Test Invalid Aspect - SYNC", + "params": { + "createIfNotExists": "false", + "async": "false" + }, + "json": [ + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,InvalidExceptions,PROD)", + "INVALID": { + "value": { + "removed": false + } + } + } + ] + }, + "response": { + "status_codes": [ + 400 + ], + "exclude_regex_paths": [ + "root\\['message'\\]" + ], + "json": { + "error": "Validation Error" + } + } + }, + { + "request": { + "url": "/openapi/v3/entity/dataset", + "description": "Test Invalid Aspect - ASYNC", + "params": { + "createIfNotExists": "false", + "async": "true" + }, + "json": [ + { + "urn": "urn:li:dataset:(urn:li:dataPlatform:test,InvalidExceptions,PROD)", + "INVALID": { + "value": { + "removed": false + } + } + } + ] + }, + "response": { + "status_codes": [ + 400 + ], + "exclude_regex_paths": [ + "root\\['message'\\]" + ], + "json": { + "error": "Validation Error" + } + } + }, + { + "request": { + "url": "/openapi/v3/entity/dataset", + "description": "Test Invalid Entity - SYNC", + "params": { + "createIfNotExists": "false", + "async": "false" + }, + "json": [ + { + "urn": "urn:li:FOOBAR:(urn:li:dataPlatform:test,InvalidExceptions,PROD)", + "status": { + "value": { + "removed": false + } + } + } + ] + }, + "response": { + "status_codes": [ + 400 + ], + "exclude_regex_paths": [ + "root\\['message'\\]" + ], + "json": { + "error": "Invalid urn!: urn:li:FOOBAR:(urn:li:dataPlatform:test,InvalidExceptions,PROD)" + } + } + }, + { + "request": { + "url": "/openapi/v3/entity/dataset", + "description": "Test Invalid Entity - ASYNC", + "params": { + "createIfNotExists": "false", + "async": "true" + }, + "json": [ + { + "urn": "urn:li:FOOBAR:(urn:li:dataPlatform:test,InvalidExceptions,PROD)", + "INVALID": { + "value": { + "removed": false + } + } + } + ] + }, + "response": { + "status_codes": [ + 400 + ], + "exclude_regex_paths": [ + "root\\['message'\\]" + ], + "json": { + "error": "Invalid urn!: urn:li:FOOBAR:(urn:li:dataPlatform:test,InvalidExceptions,PROD)" + } + } } ] \ No newline at end of file