Skip to content

Commit

Permalink
feat(api): URN, Entity, and Aspect name Async Validation (#12797)
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored Mar 6, 2025
1 parent 2bc1e52 commit 41b0629
Show file tree
Hide file tree
Showing 22 changed files with 1,165 additions and 211 deletions.
2 changes: 2 additions & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ This file documents any backwards-incompatible changes in DataHub and assists pe

- #12716: Fix the `platform_instance` being added twice to the URN. If you want to have the previous behavior back, you need to add your platform_instance twice (i.e. `plat.plat`).

- #12797: Previously endpoints when used in ASYNC mode would not validate URNs, entity & aspect names immediately. Starting with this release, even in ASYNC mode, these requests will be returned with http code 400.


### Known Issues

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package com.linkedin.metadata.aspect.batch;

import com.google.common.collect.ImmutableSet;
import com.linkedin.common.urn.Urn;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.patch.template.AspectTemplateEngine;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.mxe.SystemMetadata;
import java.util.Collections;
Expand Down Expand Up @@ -82,4 +84,6 @@ static boolean supportsPatch(AspectSpec aspectSpec) {
}
return true;
}

default void validate(Urn urn, String aspectName, EntityRegistry entityRegistry) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import com.linkedin.metadata.aspect.plugins.hooks.MutationHook;
import com.linkedin.metadata.aspect.plugins.validation.ValidationExceptionCollection;
import com.linkedin.metadata.entity.validation.ValidationException;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.util.Pair;
import java.util.ArrayList;
Expand Down Expand Up @@ -153,10 +152,11 @@ private Stream<? extends BatchItem> proposedItemsToChangeItemStream(List<MCPItem

private static BatchItem patchDiscriminator(MCPItem mcpItem, AspectRetriever aspectRetriever) {
if (ChangeType.PATCH.equals(mcpItem.getChangeType())) {
return PatchItemImpl.PatchItemImplBuilder.build(
mcpItem.getMetadataChangeProposal(),
mcpItem.getAuditStamp(),
aspectRetriever.getEntityRegistry());
return PatchItemImpl.builder()
.build(
mcpItem.getMetadataChangeProposal(),
mcpItem.getAuditStamp(),
aspectRetriever.getEntityRegistry());
}
return ChangeItemImpl.builder()
.build(mcpItem.getMetadataChangeProposal(), mcpItem.getAuditStamp(), aspectRetriever);
Expand Down Expand Up @@ -195,22 +195,18 @@ public AspectsBatchImplBuilder mcps(
mcp -> {
try {
if (alternateMCPValidation) {
EntitySpec entitySpec =
retrieverContext
.getAspectRetriever()
.getEntityRegistry()
.getEntitySpec(mcp.getEntityType());
return ProposedItem.builder()
.metadataChangeProposal(mcp)
.entitySpec(entitySpec)
.auditStamp(auditStamp)
.build();
.build(
mcp,
auditStamp,
retrieverContext.getAspectRetriever().getEntityRegistry());
}
if (mcp.getChangeType().equals(ChangeType.PATCH)) {
return PatchItemImpl.PatchItemImplBuilder.build(
mcp,
auditStamp,
retrieverContext.getAspectRetriever().getEntityRegistry());
return PatchItemImpl.builder()
.build(
mcp,
auditStamp,
retrieverContext.getAspectRetriever().getEntityRegistry());
} else {
return ChangeItemImpl.builder()
.build(mcp, auditStamp, retrieverContext.getAspectRetriever());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import com.linkedin.metadata.aspect.batch.ChangeMCP;
import com.linkedin.metadata.aspect.batch.MCPItem;
import com.linkedin.metadata.aspect.patch.template.common.GenericPatchTemplate;
import com.linkedin.metadata.entity.AspectUtils;
import com.linkedin.metadata.entity.validation.ValidationApiUtils;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
Expand Down Expand Up @@ -139,38 +138,51 @@ public ChangeItemImplBuilder systemMetadata(SystemMetadata systemMetadata) {
return this;
}

public ChangeItemImplBuilder changeType(ChangeType changeType) {
this.changeType = validateOrDefaultChangeType(changeType);
return this;
}

@SneakyThrows
public ChangeItemImpl build(AspectRetriever aspectRetriever) {
// Apply change type default
this.changeType = validateOrDefaultChangeType(changeType);
if (this.changeType == null) {
changeType(null); // Apply change type default
}

// Apply empty headers
if (this.headers == null) {
this.headers = Map.of();
}

if (this.urn == null && this.metadataChangeProposal != null) {
this.urn = this.metadataChangeProposal.getEntityUrn();
}

ValidationApiUtils.validateUrn(aspectRetriever.getEntityRegistry(), this.urn);
log.debug("entity type = {}", this.urn.getEntityType());

entitySpec(aspectRetriever.getEntityRegistry().getEntitySpec(this.urn.getEntityType()));
entitySpec(
ValidationApiUtils.validateEntity(
aspectRetriever.getEntityRegistry(), this.urn.getEntityType()));
log.debug("entity spec = {}", this.entitySpec);

aspectSpec(ValidationApiUtils.validate(this.entitySpec, this.aspectName));
aspectSpec(ValidationApiUtils.validateAspect(this.entitySpec, this.aspectName));
log.debug("aspect spec = {}", this.aspectSpec);

if (this.recordTemplate == null && this.metadataChangeProposal != null) {
this.recordTemplate = convertToRecordTemplate(this.metadataChangeProposal, aspectSpec);
}

ValidationApiUtils.validateRecordTemplate(
this.entitySpec, this.urn, this.recordTemplate, aspectRetriever);

if (this.systemMetadata == null) {
// generate default
systemMetadata(null);
}

return new ChangeItemImpl(
this.changeType,
this.urn,
this.aspectName,
this.recordTemplate,
SystemMetadataUtils.generateSystemMetadataIfEmpty(this.systemMetadata),
this.systemMetadata,
this.auditStamp,
this.metadataChangeProposal,
this.entitySpec,
Expand All @@ -183,35 +195,16 @@ public ChangeItemImpl build(AspectRetriever aspectRetriever) {
public ChangeItemImpl build(
MetadataChangeProposal mcp, AuditStamp auditStamp, AspectRetriever aspectRetriever) {

log.debug("entity type = {}", mcp.getEntityType());
EntitySpec entitySpec =
aspectRetriever.getEntityRegistry().getEntitySpec(mcp.getEntityType());
AspectSpec aspectSpec = AspectUtils.validateAspect(mcp, entitySpec);

if (!MCPItem.isValidChangeType(ChangeType.UPSERT, aspectSpec)) {
throw new UnsupportedOperationException(
"ChangeType not supported: "
+ mcp.getChangeType()
+ " for aspect "
+ mcp.getAspectName());
}

Urn urn = mcp.getEntityUrn();
if (urn == null) {
urn = EntityKeyUtils.getUrnFromProposal(mcp, entitySpec.getKeyAspectSpec());
}

return ChangeItemImpl.builder()
.changeType(mcp.getChangeType())
.urn(urn)
.aspectName(mcp.getAspectName())
.systemMetadata(
SystemMetadataUtils.generateSystemMetadataIfEmpty(mcp.getSystemMetadata()))
.metadataChangeProposal(mcp)
.auditStamp(auditStamp)
.recordTemplate(convertToRecordTemplate(mcp, aspectSpec))
.nextAspectVersion(this.nextAspectVersion)
.build(aspectRetriever);
// Validation includes: Urn, Entity, Aspect
this.metadataChangeProposal =
ValidationApiUtils.validateMCP(aspectRetriever.getEntityRegistry(), mcp);
this.urn = this.metadataChangeProposal.getEntityUrn(); // validation ensures existence
this.auditStamp = auditStamp;
this.aspectName = mcp.getAspectName(); // prior validation
changeType(mcp.getChangeType());
this.systemMetadata = mcp.getSystemMetadata();
this.headers = mcp.getHeaders();
return build(aspectRetriever);
}

// specific to impl, other impls support PATCH, etc
Expand All @@ -226,6 +219,11 @@ private static ChangeType validateOrDefaultChangeType(@Nullable ChangeType chang

private static RecordTemplate convertToRecordTemplate(
MetadataChangeProposal mcp, AspectSpec aspectSpec) {

if (mcp.getAspect() == null) {
return null;
}

RecordTemplate aspect;
try {
aspect =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,12 @@ public DeleteItemImpl build(AspectRetriever aspectRetriever) {
ValidationApiUtils.validateUrn(aspectRetriever.getEntityRegistry(), this.urn);
log.debug("entity type = {}", this.urn.getEntityType());

entitySpec(aspectRetriever.getEntityRegistry().getEntitySpec(this.urn.getEntityType()));
entitySpec(
ValidationApiUtils.validateEntity(
aspectRetriever.getEntityRegistry(), this.urn.getEntityType()));
log.debug("entity spec = {}", this.entitySpec);

aspectSpec(ValidationApiUtils.validate(this.entitySpec, this.aspectName));
aspectSpec(ValidationApiUtils.validateAspect(this.entitySpec, this.aspectName));
log.debug("aspect spec = {}", this.aspectSpec);

return new DeleteItemImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ public MCLItemImpl build(AspectRetriever aspectRetriever) {
log.debug("entity spec = {}", this.entitySpec);

aspectSpec(
ValidationApiUtils.validate(this.entitySpec, this.metadataChangeLog.getAspectName()));
ValidationApiUtils.validateAspect(
this.entitySpec, this.metadataChangeLog.getAspectName()));
log.debug("aspect spec = {}", this.aspectSpec);

Pair<RecordTemplate, RecordTemplate> aspects =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import static com.linkedin.metadata.Constants.INGESTION_MAX_SERIALIZED_STRING_LENGTH;
import static com.linkedin.metadata.Constants.MAX_JACKSON_STRING_SIZE;
import static com.linkedin.metadata.entity.AspectUtils.validateAspect;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.StreamReadConstraints;
Expand Down Expand Up @@ -151,65 +150,82 @@ public ChangeItemImpl applyPatch(RecordTemplate recordTemplate, AspectRetriever

public static class PatchItemImplBuilder {

// Ensure use of other builders
private PatchItemImpl build() {
return null;
}

public PatchItemImpl.PatchItemImplBuilder systemMetadata(SystemMetadata systemMetadata) {
this.systemMetadata = SystemMetadataUtils.generateSystemMetadataIfEmpty(systemMetadata);
return this;
}

public PatchItemImpl.PatchItemImplBuilder aspectSpec(AspectSpec aspectSpec) {
if (!MCPItem.isValidChangeType(ChangeType.PATCH, aspectSpec)) {
throw new UnsupportedOperationException(
"ChangeType not supported: " + ChangeType.PATCH + " for aspect " + this.aspectName);
}
this.aspectSpec = aspectSpec;
return this;
}

public PatchItemImpl.PatchItemImplBuilder patch(JsonPatch patch) {
if (patch == null) {
throw new IllegalArgumentException(String.format("Missing patch to apply. Item: %s", this));
}
this.patch = patch;
return this;
}

public PatchItemImpl build(EntityRegistry entityRegistry) {
ValidationApiUtils.validateUrn(entityRegistry, this.urn);
urn(ValidationApiUtils.validateUrn(entityRegistry, this.urn));
log.debug("entity type = {}", this.urn.getEntityType());

entitySpec(entityRegistry.getEntitySpec(this.urn.getEntityType()));
entitySpec(ValidationApiUtils.validateEntity(entityRegistry, this.urn.getEntityType()));
log.debug("entity spec = {}", this.entitySpec);

aspectSpec(ValidationApiUtils.validate(this.entitySpec, this.aspectName));
aspectSpec(ValidationApiUtils.validateAspect(this.entitySpec, this.aspectName));
log.debug("aspect spec = {}", this.aspectSpec);

if (this.patch == null) {
throw new IllegalArgumentException(
String.format("Missing patch to apply. Aspect: %s", this.aspectSpec.getName()));
if (this.systemMetadata == null) {
// generate default
systemMetadata(null);
}

return new PatchItemImpl(
this.urn,
this.aspectName,
SystemMetadataUtils.generateSystemMetadataIfEmpty(this.systemMetadata),
this.systemMetadata,
this.auditStamp,
this.patch,
Objects.requireNonNull(this.patch),
this.metadataChangeProposal,
this.entitySpec,
this.aspectSpec);
}

public static PatchItemImpl build(
public PatchItemImpl build(
MetadataChangeProposal mcp, AuditStamp auditStamp, EntityRegistry entityRegistry) {
log.debug("entity type = {}", mcp.getEntityType());
EntitySpec entitySpec = entityRegistry.getEntitySpec(mcp.getEntityType());
AspectSpec aspectSpec = validateAspect(mcp, entitySpec);

if (!MCPItem.isValidChangeType(ChangeType.PATCH, aspectSpec)) {
throw new UnsupportedOperationException(
"ChangeType not supported: "
+ mcp.getChangeType()
+ " for aspect "
+ mcp.getAspectName());
}
// Validation includes: Urn, Entity, Aspect
this.metadataChangeProposal = ValidationApiUtils.validateMCP(entityRegistry, mcp);
this.urn = this.metadataChangeProposal.getEntityUrn(); // validation ensures existence
this.auditStamp = auditStamp;
this.aspectName = mcp.getAspectName();
systemMetadata(mcp.getSystemMetadata());
patch(convertToJsonPatch(mcp));

Urn urn = mcp.getEntityUrn();
if (urn == null) {
urn = EntityKeyUtils.getUrnFromProposal(mcp, entitySpec.getKeyAspectSpec());
}
entitySpec(entityRegistry.getEntitySpec(this.urn.getEntityType())); // prior validation
aspectSpec(entitySpec.getAspectSpec(this.aspectName)); // prior validation

return PatchItemImpl.builder()
.urn(urn)
.aspectName(mcp.getAspectName())
.systemMetadata(
SystemMetadataUtils.generateSystemMetadataIfEmpty(mcp.getSystemMetadata()))
.metadataChangeProposal(mcp)
.auditStamp(auditStamp)
.patch(convertToJsonPatch(mcp))
.build(entityRegistry);
return new PatchItemImpl(
this.urn,
this.aspectName,
this.systemMetadata,
this.auditStamp,
this.patch,
this.metadataChangeProposal,
this.entitySpec,
this.aspectSpec);
}

public static JsonPatch convertToJsonPatch(MetadataChangeProposal mcp) {
Expand Down
Loading

0 comments on commit 41b0629

Please sign in to comment.