Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): URN, Entity, and Aspect name Async Validation #12797

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ This file documents any backwards-incompatible changes in DataHub and assists pe

- #12716: Fix the `platform_instance` being added twice to the URN. If you want to have the previous behavior back, you need to add your platform_instance twice (i.e. `plat.plat`).

- #12797: Previously endpoints when used in ASYNC mode would not validate URNs, entity & aspect names immediately. Starting with this release, even in ASYNC mode, these requests will be returned with http code 400.


### Known Issues

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package com.linkedin.metadata.aspect.batch;

import com.google.common.collect.ImmutableSet;
import com.linkedin.common.urn.Urn;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.patch.template.AspectTemplateEngine;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.mxe.SystemMetadata;
import java.util.Collections;
Expand Down Expand Up @@ -82,4 +84,6 @@ static boolean supportsPatch(AspectSpec aspectSpec) {
}
return true;
}

default void validate(Urn urn, String aspectName, EntityRegistry entityRegistry) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import com.linkedin.metadata.aspect.plugins.hooks.MutationHook;
import com.linkedin.metadata.aspect.plugins.validation.ValidationExceptionCollection;
import com.linkedin.metadata.entity.validation.ValidationException;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.util.Pair;
import java.util.ArrayList;
Expand Down Expand Up @@ -153,10 +152,11 @@ private Stream<? extends BatchItem> proposedItemsToChangeItemStream(List<MCPItem

private static BatchItem patchDiscriminator(MCPItem mcpItem, AspectRetriever aspectRetriever) {
if (ChangeType.PATCH.equals(mcpItem.getChangeType())) {
return PatchItemImpl.PatchItemImplBuilder.build(
mcpItem.getMetadataChangeProposal(),
mcpItem.getAuditStamp(),
aspectRetriever.getEntityRegistry());
return PatchItemImpl.builder()
.build(
mcpItem.getMetadataChangeProposal(),
mcpItem.getAuditStamp(),
aspectRetriever.getEntityRegistry());
}
return ChangeItemImpl.builder()
.build(mcpItem.getMetadataChangeProposal(), mcpItem.getAuditStamp(), aspectRetriever);
Expand Down Expand Up @@ -195,22 +195,18 @@ public AspectsBatchImplBuilder mcps(
mcp -> {
try {
if (alternateMCPValidation) {
EntitySpec entitySpec =
retrieverContext
.getAspectRetriever()
.getEntityRegistry()
.getEntitySpec(mcp.getEntityType());
return ProposedItem.builder()
.metadataChangeProposal(mcp)
.entitySpec(entitySpec)
.auditStamp(auditStamp)
.build();
.build(
mcp,
auditStamp,
retrieverContext.getAspectRetriever().getEntityRegistry());
}
if (mcp.getChangeType().equals(ChangeType.PATCH)) {
return PatchItemImpl.PatchItemImplBuilder.build(
mcp,
auditStamp,
retrieverContext.getAspectRetriever().getEntityRegistry());
return PatchItemImpl.builder()
.build(
mcp,
auditStamp,
retrieverContext.getAspectRetriever().getEntityRegistry());
} else {
return ChangeItemImpl.builder()
.build(mcp, auditStamp, retrieverContext.getAspectRetriever());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import com.linkedin.metadata.aspect.batch.ChangeMCP;
import com.linkedin.metadata.aspect.batch.MCPItem;
import com.linkedin.metadata.aspect.patch.template.common.GenericPatchTemplate;
import com.linkedin.metadata.entity.AspectUtils;
import com.linkedin.metadata.entity.validation.ValidationApiUtils;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
Expand Down Expand Up @@ -139,38 +138,51 @@ public ChangeItemImplBuilder systemMetadata(SystemMetadata systemMetadata) {
return this;
}

public ChangeItemImplBuilder changeType(ChangeType changeType) {
this.changeType = validateOrDefaultChangeType(changeType);
return this;
}

@SneakyThrows
public ChangeItemImpl build(AspectRetriever aspectRetriever) {
// Apply change type default
this.changeType = validateOrDefaultChangeType(changeType);
if (this.changeType == null) {
changeType(null); // Apply change type default
}

// Apply empty headers
if (this.headers == null) {
this.headers = Map.of();
}

if (this.urn == null && this.metadataChangeProposal != null) {
this.urn = this.metadataChangeProposal.getEntityUrn();
}

ValidationApiUtils.validateUrn(aspectRetriever.getEntityRegistry(), this.urn);
log.debug("entity type = {}", this.urn.getEntityType());

entitySpec(aspectRetriever.getEntityRegistry().getEntitySpec(this.urn.getEntityType()));
entitySpec(
ValidationApiUtils.validateEntity(
aspectRetriever.getEntityRegistry(), this.urn.getEntityType()));
log.debug("entity spec = {}", this.entitySpec);

aspectSpec(ValidationApiUtils.validate(this.entitySpec, this.aspectName));
aspectSpec(ValidationApiUtils.validateAspect(this.entitySpec, this.aspectName));
log.debug("aspect spec = {}", this.aspectSpec);

if (this.recordTemplate == null && this.metadataChangeProposal != null) {
this.recordTemplate = convertToRecordTemplate(this.metadataChangeProposal, aspectSpec);
}

ValidationApiUtils.validateRecordTemplate(
this.entitySpec, this.urn, this.recordTemplate, aspectRetriever);

if (this.systemMetadata == null) {
// generate default
systemMetadata(null);
}

return new ChangeItemImpl(
this.changeType,
this.urn,
this.aspectName,
this.recordTemplate,
SystemMetadataUtils.generateSystemMetadataIfEmpty(this.systemMetadata),
this.systemMetadata,
this.auditStamp,
this.metadataChangeProposal,
this.entitySpec,
Expand All @@ -183,35 +195,16 @@ public ChangeItemImpl build(AspectRetriever aspectRetriever) {
public ChangeItemImpl build(
MetadataChangeProposal mcp, AuditStamp auditStamp, AspectRetriever aspectRetriever) {

log.debug("entity type = {}", mcp.getEntityType());
EntitySpec entitySpec =
aspectRetriever.getEntityRegistry().getEntitySpec(mcp.getEntityType());
AspectSpec aspectSpec = AspectUtils.validateAspect(mcp, entitySpec);

if (!MCPItem.isValidChangeType(ChangeType.UPSERT, aspectSpec)) {
throw new UnsupportedOperationException(
"ChangeType not supported: "
+ mcp.getChangeType()
+ " for aspect "
+ mcp.getAspectName());
}

Urn urn = mcp.getEntityUrn();
if (urn == null) {
urn = EntityKeyUtils.getUrnFromProposal(mcp, entitySpec.getKeyAspectSpec());
}

return ChangeItemImpl.builder()
.changeType(mcp.getChangeType())
.urn(urn)
.aspectName(mcp.getAspectName())
.systemMetadata(
SystemMetadataUtils.generateSystemMetadataIfEmpty(mcp.getSystemMetadata()))
.metadataChangeProposal(mcp)
.auditStamp(auditStamp)
.recordTemplate(convertToRecordTemplate(mcp, aspectSpec))
.nextAspectVersion(this.nextAspectVersion)
.build(aspectRetriever);
// Validation includes: Urn, Entity, Aspect
this.metadataChangeProposal =
ValidationApiUtils.validateMCP(aspectRetriever.getEntityRegistry(), mcp);
this.urn = this.metadataChangeProposal.getEntityUrn(); // validation ensures existence
this.auditStamp = auditStamp;
this.aspectName = mcp.getAspectName(); // prior validation
changeType(mcp.getChangeType());
this.systemMetadata = mcp.getSystemMetadata();
this.headers = mcp.getHeaders();
return build(aspectRetriever);
}

// specific to impl, other impls support PATCH, etc
Expand All @@ -226,6 +219,11 @@ private static ChangeType validateOrDefaultChangeType(@Nullable ChangeType chang

private static RecordTemplate convertToRecordTemplate(
MetadataChangeProposal mcp, AspectSpec aspectSpec) {

if (mcp.getAspect() == null) {
return null;
}

RecordTemplate aspect;
try {
aspect =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,12 @@ public DeleteItemImpl build(AspectRetriever aspectRetriever) {
ValidationApiUtils.validateUrn(aspectRetriever.getEntityRegistry(), this.urn);
log.debug("entity type = {}", this.urn.getEntityType());

entitySpec(aspectRetriever.getEntityRegistry().getEntitySpec(this.urn.getEntityType()));
entitySpec(
ValidationApiUtils.validateEntity(
aspectRetriever.getEntityRegistry(), this.urn.getEntityType()));
log.debug("entity spec = {}", this.entitySpec);

aspectSpec(ValidationApiUtils.validate(this.entitySpec, this.aspectName));
aspectSpec(ValidationApiUtils.validateAspect(this.entitySpec, this.aspectName));
log.debug("aspect spec = {}", this.aspectSpec);

return new DeleteItemImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ public MCLItemImpl build(AspectRetriever aspectRetriever) {
log.debug("entity spec = {}", this.entitySpec);

aspectSpec(
ValidationApiUtils.validate(this.entitySpec, this.metadataChangeLog.getAspectName()));
ValidationApiUtils.validateAspect(
this.entitySpec, this.metadataChangeLog.getAspectName()));
log.debug("aspect spec = {}", this.aspectSpec);

Pair<RecordTemplate, RecordTemplate> aspects =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import static com.linkedin.metadata.Constants.INGESTION_MAX_SERIALIZED_STRING_LENGTH;
import static com.linkedin.metadata.Constants.MAX_JACKSON_STRING_SIZE;
import static com.linkedin.metadata.entity.AspectUtils.validateAspect;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.StreamReadConstraints;
Expand Down Expand Up @@ -151,65 +150,82 @@ public ChangeItemImpl applyPatch(RecordTemplate recordTemplate, AspectRetriever

public static class PatchItemImplBuilder {

// Ensure use of other builders
private PatchItemImpl build() {
return null;
}

public PatchItemImpl.PatchItemImplBuilder systemMetadata(SystemMetadata systemMetadata) {
this.systemMetadata = SystemMetadataUtils.generateSystemMetadataIfEmpty(systemMetadata);
return this;
}

public PatchItemImpl.PatchItemImplBuilder aspectSpec(AspectSpec aspectSpec) {
if (!MCPItem.isValidChangeType(ChangeType.PATCH, aspectSpec)) {
throw new UnsupportedOperationException(
"ChangeType not supported: " + ChangeType.PATCH + " for aspect " + this.aspectName);
}
this.aspectSpec = aspectSpec;
return this;
}

public PatchItemImpl.PatchItemImplBuilder patch(JsonPatch patch) {
if (patch == null) {
throw new IllegalArgumentException(String.format("Missing patch to apply. Item: %s", this));
}
this.patch = patch;
return this;
}

public PatchItemImpl build(EntityRegistry entityRegistry) {
ValidationApiUtils.validateUrn(entityRegistry, this.urn);
urn(ValidationApiUtils.validateUrn(entityRegistry, this.urn));
log.debug("entity type = {}", this.urn.getEntityType());

entitySpec(entityRegistry.getEntitySpec(this.urn.getEntityType()));
entitySpec(ValidationApiUtils.validateEntity(entityRegistry, this.urn.getEntityType()));
log.debug("entity spec = {}", this.entitySpec);

aspectSpec(ValidationApiUtils.validate(this.entitySpec, this.aspectName));
aspectSpec(ValidationApiUtils.validateAspect(this.entitySpec, this.aspectName));
log.debug("aspect spec = {}", this.aspectSpec);

if (this.patch == null) {
throw new IllegalArgumentException(
String.format("Missing patch to apply. Aspect: %s", this.aspectSpec.getName()));
if (this.systemMetadata == null) {
// generate default
systemMetadata(null);
}

return new PatchItemImpl(
this.urn,
this.aspectName,
SystemMetadataUtils.generateSystemMetadataIfEmpty(this.systemMetadata),
this.systemMetadata,
this.auditStamp,
this.patch,
Objects.requireNonNull(this.patch),
this.metadataChangeProposal,
this.entitySpec,
this.aspectSpec);
}

public static PatchItemImpl build(
public PatchItemImpl build(
MetadataChangeProposal mcp, AuditStamp auditStamp, EntityRegistry entityRegistry) {
log.debug("entity type = {}", mcp.getEntityType());
EntitySpec entitySpec = entityRegistry.getEntitySpec(mcp.getEntityType());
AspectSpec aspectSpec = validateAspect(mcp, entitySpec);

if (!MCPItem.isValidChangeType(ChangeType.PATCH, aspectSpec)) {
throw new UnsupportedOperationException(
"ChangeType not supported: "
+ mcp.getChangeType()
+ " for aspect "
+ mcp.getAspectName());
}
// Validation includes: Urn, Entity, Aspect
this.metadataChangeProposal = ValidationApiUtils.validateMCP(entityRegistry, mcp);
this.urn = this.metadataChangeProposal.getEntityUrn(); // validation ensures existence
this.auditStamp = auditStamp;
this.aspectName = mcp.getAspectName();
systemMetadata(mcp.getSystemMetadata());
patch(convertToJsonPatch(mcp));

Urn urn = mcp.getEntityUrn();
if (urn == null) {
urn = EntityKeyUtils.getUrnFromProposal(mcp, entitySpec.getKeyAspectSpec());
}
entitySpec(entityRegistry.getEntitySpec(this.urn.getEntityType())); // prior validation
aspectSpec(entitySpec.getAspectSpec(this.aspectName)); // prior validation

return PatchItemImpl.builder()
.urn(urn)
.aspectName(mcp.getAspectName())
.systemMetadata(
SystemMetadataUtils.generateSystemMetadataIfEmpty(mcp.getSystemMetadata()))
.metadataChangeProposal(mcp)
.auditStamp(auditStamp)
.patch(convertToJsonPatch(mcp))
.build(entityRegistry);
return new PatchItemImpl(
this.urn,
this.aspectName,
this.systemMetadata,
this.auditStamp,
this.patch,
this.metadataChangeProposal,
this.entitySpec,
this.aspectSpec);
}

public static JsonPatch convertToJsonPatch(MetadataChangeProposal mcp) {
Expand Down
Loading
Loading