Skip to content

Commit

Permalink
feat(versioning): Support entity versioning ingestion (#12755)
Browse files Browse the repository at this point in the history
  • Loading branch information
asikowitz authored Mar 4, 2025
1 parent 17de393 commit 5576b3c
Show file tree
Hide file tree
Showing 14 changed files with 1,011 additions and 145 deletions.
2 changes: 1 addition & 1 deletion datahub-web-react/src/app/ingest/secret/SecretsList.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ export const SecretsList = () => {
);
setTimeout(() => {
refetch();
}, 2000);
}, 3000);
})
.catch((e) => {
message.destroy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -968,8 +968,9 @@ private IngestAspectsResult ingestAspectsToLocalDB(
// lock)

// Initial database state from database
Map<String, Map<String, SystemAspect>> batchAspects =
final Map<String, Map<String, SystemAspect>> batchAspects =
aspectDao.getLatestAspects(opContext, urnAspects, true);
final Map<String, Map<String, SystemAspect>> updatedLatestAspects;

// read #2 (potentially)
final Map<String, Map<String, Long>> nextVersions =
Expand All @@ -989,7 +990,6 @@ private IngestAspectsResult ingestAspectsToLocalDB(
// These items are new items from side effects
Map<String, Set<String>> sideEffects = updatedItems.getFirst();

final Map<String, Map<String, SystemAspect>> updatedLatestAspects;
final Map<String, Map<String, Long>> updatedNextVersions;

Map<String, Map<String, SystemAspect>> newLatestAspects =
Expand Down Expand Up @@ -1024,6 +1024,7 @@ private IngestAspectsResult ingestAspectsToLocalDB(
.collect(Collectors.toList());
} else {
changeMCPs = updatedItems.getSecond();
updatedLatestAspects = batchAspects;
}

// No changes, return
Expand Down Expand Up @@ -1080,7 +1081,7 @@ private IngestAspectsResult ingestAspectsToLocalDB(
Latest aspect after possible in-memory mutation
*/
final SystemAspect latestAspect =
batchAspects
updatedLatestAspects
.getOrDefault(writeItem.getUrn().toString(), Map.of())
.get(writeItem.getAspectName());

Expand Down Expand Up @@ -1145,8 +1146,9 @@ This condition is specifically for an older conditional write ingestAspectIfNotP
// Only consider retention when there was a previous version
.filter(
result ->
batchAspects.containsKey(result.getUrn().toString())
&& batchAspects
updatedLatestAspects.containsKey(
result.getUrn().toString())
&& updatedLatestAspects
.get(result.getUrn().toString())
.containsKey(
result.getRequest().getAspectName()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,8 @@ public List<IngestResult> linkLatestVersion(
Urn versionSet,
Urn newLatestVersion,
VersionPropertiesInput inputProperties) {
List<IngestResult> ingestResults = new ArrayList<>();
AspectRetriever aspectRetriever = opContext.getAspectRetriever();
String sortId;
Long versionSetConstraint;
Long versionPropertiesConstraint = -1L;
VersionSetKey versionSetKey =
(VersionSetKey)
Expand All @@ -93,36 +91,26 @@ public List<IngestResult> linkLatestVersion(
+ newLatestVersion.getEntityType());
}
if (!aspectRetriever.entityExists(ImmutableSet.of(versionSet)).get(versionSet)) {
MetadataChangeProposal versionSetKeyProposal = new MetadataChangeProposal();
versionSetKeyProposal.setEntityUrn(versionSet);
versionSetKeyProposal.setEntityType(VERSION_SET_ENTITY_NAME);
versionSetKeyProposal.setAspectName(VERSION_SET_KEY_ASPECT_NAME);
versionSetKeyProposal.setAspect(GenericRecordUtils.serializeAspect(versionSetKey));
versionSetKeyProposal.setChangeType(ChangeType.CREATE_ENTITY);
ingestResults.add(
entityService.ingestProposal(
opContext, versionSetKeyProposal, opContext.getAuditStamp(), false));

sortId = INITIAL_VERSION_SORT_ID;
versionSetConstraint = -1L;
} else {
SystemAspect versionSetPropertiesAspect =
aspectRetriever.getLatestSystemAspect(versionSet, VERSION_SET_PROPERTIES_ASPECT_NAME);
VersionSetProperties versionSetProperties =
RecordUtils.toRecordTemplate(
VersionSetProperties.class, versionSetPropertiesAspect.getRecordTemplate().data());
versionSetConstraint =
versionSetPropertiesAspect
.getSystemMetadataVersion()
.orElse(versionSetPropertiesAspect.getVersion());

if (versionSetProperties.getVersioningScheme()
!= VersioningScheme.ALPHANUMERIC_GENERATED_BY_DATAHUB) {
throw new IllegalArgumentException(
"Only versioning scheme supported is ALPHANUMERIC_GENERATED_BY_DATAHUB");
}

SystemAspect latestVersion =
aspectRetriever.getLatestSystemAspect(
versionSetProperties.getLatest(), VERSION_PROPERTIES_ASPECT_NAME);
VersionProperties latestVersionProperties =
RecordUtils.toRecordTemplate(
VersionProperties.class, latestVersion.getRecordTemplate().data());
// When more impls for versioning scheme are set up, this will need to be resolved to the
// correct scheme generation strategy
sortId = AlphanumericSortIdGenerator.increment(latestVersionProperties.getSortId());
}

Expand Down Expand Up @@ -154,9 +142,9 @@ public List<IngestResult> linkLatestVersion(
.setComment(inputProperties.getComment(), SetMode.IGNORE_NULL)
.setVersion(versionTag)
.setMetadataCreatedTimestamp(opContext.getAuditStamp())
.setSortId(sortId);
.setSortId(sortId)
.setVersioningScheme(VersioningScheme.ALPHANUMERIC_GENERATED_BY_DATAHUB);
if (inputProperties.getSourceCreationTimestamp() != null) {

AuditStamp sourceCreatedAuditStamp =
new AuditStamp().setTime(inputProperties.getSourceCreationTimestamp());
Urn actor = null;
Expand All @@ -182,36 +170,11 @@ public List<IngestResult> linkLatestVersion(
headerMap.put(HTTP_HEADER_IF_VERSION_MATCH, versionPropertiesConstraint.toString());
versionPropertiesProposal.setChangeType(ChangeType.UPSERT);
versionPropertiesProposal.setHeaders(headerMap);
ingestResults.add(
entityService.ingestProposal(
opContext, versionPropertiesProposal, opContext.getAuditStamp(), false));

// Might want to refactor this to a Patch w/ Create if not exists logic if more properties get
// added
// to Version Set Properties
VersionSetProperties versionSetProperties =
new VersionSetProperties()
.setVersioningScheme(
VersioningScheme
.ALPHANUMERIC_GENERATED_BY_DATAHUB) // Only one available, will need to add to
// input properties once more are added.
.setLatest(newLatestVersion);
MetadataChangeProposal versionSetPropertiesProposal = new MetadataChangeProposal();
versionSetPropertiesProposal.setEntityUrn(versionSet);
versionSetPropertiesProposal.setEntityType(VERSION_SET_ENTITY_NAME);
versionSetPropertiesProposal.setAspectName(VERSION_SET_PROPERTIES_ASPECT_NAME);
versionSetPropertiesProposal.setAspect(
GenericRecordUtils.serializeAspect(versionSetProperties));
versionSetPropertiesProposal.setChangeType(ChangeType.UPSERT);
StringMap versionSetHeaderMap = new StringMap();
versionSetHeaderMap.put(HTTP_HEADER_IF_VERSION_MATCH, versionSetConstraint.toString());
versionSetPropertiesProposal.setHeaders(versionSetHeaderMap);
versionSetPropertiesProposal.setSystemMetadata(systemMetadata);
ingestResults.add(
IngestResult result =
entityService.ingestProposal(
opContext, versionSetPropertiesProposal, opContext.getAuditStamp(), false));

return ingestResults;
opContext, versionPropertiesProposal, opContext.getAuditStamp(), false);
return result != null ? List.of(result) : List.of();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
package com.linkedin.metadata.entity.versioning.sideeffects;

import static com.linkedin.metadata.Constants.*;

import com.datahub.util.RecordUtils;
import com.linkedin.common.VersionProperties;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.entity.Aspect;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.RetrieverContext;
import com.linkedin.metadata.aspect.batch.ChangeMCP;
import com.linkedin.metadata.aspect.batch.MCLItem;
import com.linkedin.metadata.aspect.batch.MCPItem;
import com.linkedin.metadata.aspect.patch.GenericJsonPatch;
import com.linkedin.metadata.aspect.patch.PatchOperationType;
import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig;
import com.linkedin.metadata.aspect.plugins.hooks.MCPSideEffect;
import com.linkedin.metadata.entity.ebean.batch.ChangeItemImpl;
import com.linkedin.metadata.entity.ebean.batch.PatchItemImpl;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.utils.EntityKeyUtils;
import com.linkedin.versionset.VersionSetProperties;
import java.util.Collection;
import java.util.List;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;

/**
* Side effect that updates the isLatest property for the referenced versioned entity's Version
* Properties aspect.
*/
@Slf4j
@Getter
@Setter
@Accessors(chain = true)
public class VersionPropertiesSideEffect extends MCPSideEffect {
@Nonnull private AspectPluginConfig config;

@Override
protected Stream<ChangeMCP> applyMCPSideEffect(
Collection<ChangeMCP> changeMCPS, @Nonnull RetrieverContext retrieverContext) {
return changeMCPS.stream().flatMap(item -> processMCP(item, retrieverContext));
}

@Override
protected Stream<MCPItem> postMCPSideEffect(
Collection<MCLItem> mclItems, @Nonnull RetrieverContext retrieverContext) {
return Stream.of();
}

private static Stream<ChangeMCP> processMCP(
ChangeMCP changeMCP, @Nonnull RetrieverContext retrieverContext) {
Urn entityUrn = changeMCP.getUrn();

if (!VERSION_PROPERTIES_ASPECT_NAME.equals(changeMCP.getAspectName())) {
return Stream.empty();
}

VersionProperties versionProperties = changeMCP.getAspect(VersionProperties.class);
if (versionProperties == null) {
log.error("Unable to process version properties for urn: {}", changeMCP.getUrn());
return Stream.empty();
}

Urn versionSetUrn = versionProperties.getVersionSet();
Aspect versionSetPropertiesAspect =
retrieverContext
.getAspectRetriever()
.getLatestAspectObject(versionSetUrn, VERSION_SET_PROPERTIES_ASPECT_NAME);
if (versionSetPropertiesAspect == null) {
return createVersionSet(versionProperties, changeMCP, retrieverContext);
}

// Version set exists -- only update if there is a new latest
VersionSetProperties versionSetProperties =
RecordUtils.toRecordTemplate(VersionSetProperties.class, versionSetPropertiesAspect.data());
Urn prevLatest = versionSetProperties.getLatest();
if (prevLatest.equals(entityUrn)) {
return Stream.empty();
}

VersionProperties prevLatestVersionProperties = null;
Aspect prevLatestVersionPropertiesAspect =
retrieverContext
.getAspectRetriever()
.getLatestAspectObject(prevLatest, VERSION_PROPERTIES_ASPECT_NAME);
if (prevLatestVersionPropertiesAspect != null) {
prevLatestVersionProperties =
RecordUtils.toRecordTemplate(
VersionProperties.class, prevLatestVersionPropertiesAspect.data());
if (versionProperties.getSortId().compareTo(prevLatestVersionProperties.getSortId()) <= 0) {
return Stream.empty();
}
}

// New version properties is the new latest
return updateVersionSetLatest(
versionProperties, prevLatestVersionProperties, prevLatest, changeMCP, retrieverContext);
}

private static Stream<ChangeMCP> createVersionSet(
@Nonnull VersionProperties versionProperties,
ChangeMCP changeMCP,
@Nonnull RetrieverContext retrieverContext) {
versionProperties.setIsLatest(true);

Urn entityUrn = changeMCP.getUrn();
Urn versionSetUrn = versionProperties.getVersionSet();

AspectSpec keyAspectSpec =
retrieverContext
.getAspectRetriever()
.getEntityRegistry()
.getEntitySpec(VERSION_SET_ENTITY_NAME)
.getKeyAspectSpec();
RecordTemplate versionSetKey =
EntityKeyUtils.convertUrnToEntityKey(versionSetUrn, keyAspectSpec);
ChangeMCP createVersionSetKey =
ChangeItemImpl.builder()
.urn(versionSetUrn)
.aspectName(VERSION_SET_KEY_ASPECT_NAME)
.changeType(ChangeType.UPSERT)
.recordTemplate(versionSetKey)
.auditStamp(changeMCP.getAuditStamp())
.systemMetadata(changeMCP.getSystemMetadata())
.build(retrieverContext.getAspectRetriever());

VersionSetProperties versionSetPropertiesWithNewLatest =
new VersionSetProperties()
.setVersioningScheme(versionProperties.getVersioningScheme())
.setLatest(entityUrn);
ChangeMCP createVersionSetProperties =
ChangeItemImpl.builder()
.urn(versionSetUrn)
.aspectName(VERSION_SET_PROPERTIES_ASPECT_NAME)
.changeType(ChangeType.UPSERT)
.recordTemplate(versionSetPropertiesWithNewLatest)
.auditStamp(changeMCP.getAuditStamp())
.systemMetadata(changeMCP.getSystemMetadata())
.build(retrieverContext.getAspectRetriever());

return Stream.of(createVersionSetKey, createVersionSetProperties);
}

private static Stream<ChangeMCP> updateVersionSetLatest(
@Nonnull VersionProperties versionProperties,
@Nullable VersionProperties prevLatestVersionProperties,
@Nonnull Urn prevLatest,
ChangeMCP changeMCP,
@Nonnull RetrieverContext retrieverContext) {
versionProperties.setIsLatest(true);

Urn entityUrn = changeMCP.getUrn();
Urn versionSetUrn = versionProperties.getVersionSet();

VersionSetProperties versionSetPropertiesWithNewLatest =
new VersionSetProperties()
.setVersioningScheme(versionProperties.getVersioningScheme())
.setLatest(entityUrn);
ChangeMCP updateVersionSetProperties =
ChangeItemImpl.builder()
.urn(versionSetUrn)
.aspectName(VERSION_SET_PROPERTIES_ASPECT_NAME)
.changeType(ChangeType.UPSERT)
.recordTemplate(versionSetPropertiesWithNewLatest)
.auditStamp(changeMCP.getAuditStamp())
.systemMetadata(changeMCP.getSystemMetadata())
.build(retrieverContext.getAspectRetriever());

if (prevLatestVersionProperties == null) {
return Stream.of(updateVersionSetProperties);
}

EntitySpec entitySpec =
retrieverContext
.getAspectRetriever()
.getEntityRegistry()
.getEntitySpec(prevLatest.getEntityType());
GenericJsonPatch.PatchOp patchOp = new GenericJsonPatch.PatchOp();
patchOp.setOp(PatchOperationType.ADD.getValue());
patchOp.setPath("/isLatest");
patchOp.setValue(false);
ChangeMCP updateOldLatestVersionProperties =
PatchItemImpl.builder()
.urn(prevLatest)
.entitySpec(entitySpec)
.aspectName(VERSION_PROPERTIES_ASPECT_NAME)
.aspectSpec(entitySpec.getAspectSpec(VERSION_PROPERTIES_ASPECT_NAME))
.patch(GenericJsonPatch.builder().patch(List.of(patchOp)).build().getJsonPatch())
.auditStamp(changeMCP.getAuditStamp())
.systemMetadata(changeMCP.getSystemMetadata())
.build(retrieverContext.getAspectRetriever().getEntityRegistry())
.applyPatch(prevLatestVersionProperties, retrieverContext.getAspectRetriever());

return Stream.of(updateVersionSetProperties, updateOldLatestVersionProperties);
}
}
Loading

0 comments on commit 5576b3c

Please sign in to comment.