Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(versioning): Support entity versioning ingestion #12755

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -968,8 +968,9 @@ private IngestAspectsResult ingestAspectsToLocalDB(
// lock)

// Initial database state from database
Map<String, Map<String, SystemAspect>> batchAspects =
final Map<String, Map<String, SystemAspect>> batchAspects =
aspectDao.getLatestAspects(opContext, urnAspects, true);
final Map<String, Map<String, SystemAspect>> updatedLatestAspects;

// read #2 (potentially)
final Map<String, Map<String, Long>> nextVersions =
Expand All @@ -989,7 +990,6 @@ private IngestAspectsResult ingestAspectsToLocalDB(
// These items are new items from side effects
Map<String, Set<String>> sideEffects = updatedItems.getFirst();

final Map<String, Map<String, SystemAspect>> updatedLatestAspects;
final Map<String, Map<String, Long>> updatedNextVersions;

Map<String, Map<String, SystemAspect>> newLatestAspects =
Expand Down Expand Up @@ -1024,6 +1024,7 @@ private IngestAspectsResult ingestAspectsToLocalDB(
.collect(Collectors.toList());
} else {
changeMCPs = updatedItems.getSecond();
updatedLatestAspects = batchAspects;
}

// No changes, return
Expand Down Expand Up @@ -1080,7 +1081,7 @@ private IngestAspectsResult ingestAspectsToLocalDB(
Latest aspect after possible in-memory mutation
*/
final SystemAspect latestAspect =
batchAspects
updatedLatestAspects
.getOrDefault(writeItem.getUrn().toString(), Map.of())
.get(writeItem.getAspectName());

Expand Down Expand Up @@ -1145,8 +1146,9 @@ This condition is specifically for an older conditional write ingestAspectIfNotP
// Only consider retention when there was a previous version
.filter(
result ->
batchAspects.containsKey(result.getUrn().toString())
&& batchAspects
updatedLatestAspects.containsKey(
result.getUrn().toString())
&& updatedLatestAspects
.get(result.getUrn().toString())
.containsKey(
result.getRequest().getAspectName()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,8 @@ public List<IngestResult> linkLatestVersion(
Urn versionSet,
Urn newLatestVersion,
VersionPropertiesInput inputProperties) {
List<IngestResult> ingestResults = new ArrayList<>();
AspectRetriever aspectRetriever = opContext.getAspectRetriever();
String sortId;
Long versionSetConstraint;
Long versionPropertiesConstraint = -1L;
VersionSetKey versionSetKey =
(VersionSetKey)
Expand All @@ -93,36 +91,26 @@ public List<IngestResult> linkLatestVersion(
+ newLatestVersion.getEntityType());
}
if (!aspectRetriever.entityExists(ImmutableSet.of(versionSet)).get(versionSet)) {
MetadataChangeProposal versionSetKeyProposal = new MetadataChangeProposal();
versionSetKeyProposal.setEntityUrn(versionSet);
versionSetKeyProposal.setEntityType(VERSION_SET_ENTITY_NAME);
versionSetKeyProposal.setAspectName(VERSION_SET_KEY_ASPECT_NAME);
versionSetKeyProposal.setAspect(GenericRecordUtils.serializeAspect(versionSetKey));
versionSetKeyProposal.setChangeType(ChangeType.CREATE_ENTITY);
ingestResults.add(
entityService.ingestProposal(
opContext, versionSetKeyProposal, opContext.getAuditStamp(), false));

sortId = INITIAL_VERSION_SORT_ID;
versionSetConstraint = -1L;
} else {
SystemAspect versionSetPropertiesAspect =
aspectRetriever.getLatestSystemAspect(versionSet, VERSION_SET_PROPERTIES_ASPECT_NAME);
VersionSetProperties versionSetProperties =
RecordUtils.toRecordTemplate(
VersionSetProperties.class, versionSetPropertiesAspect.getRecordTemplate().data());
versionSetConstraint =
versionSetPropertiesAspect
.getSystemMetadataVersion()
.orElse(versionSetPropertiesAspect.getVersion());

if (versionSetProperties.getVersioningScheme()
!= VersioningScheme.ALPHANUMERIC_GENERATED_BY_DATAHUB) {
throw new IllegalArgumentException(
"Only versioning scheme supported is ALPHANUMERIC_GENERATED_BY_DATAHUB");
}

SystemAspect latestVersion =
aspectRetriever.getLatestSystemAspect(
versionSetProperties.getLatest(), VERSION_PROPERTIES_ASPECT_NAME);
VersionProperties latestVersionProperties =
RecordUtils.toRecordTemplate(
VersionProperties.class, latestVersion.getRecordTemplate().data());
// When more impls for versioning scheme are set up, this will need to be resolved to the
// correct scheme generation strategy
sortId = AlphanumericSortIdGenerator.increment(latestVersionProperties.getSortId());
}

Expand Down Expand Up @@ -154,9 +142,9 @@ public List<IngestResult> linkLatestVersion(
.setComment(inputProperties.getComment(), SetMode.IGNORE_NULL)
.setVersion(versionTag)
.setMetadataCreatedTimestamp(opContext.getAuditStamp())
.setSortId(sortId);
.setSortId(sortId)
.setVersioningScheme(VersioningScheme.ALPHANUMERIC_GENERATED_BY_DATAHUB);
if (inputProperties.getSourceCreationTimestamp() != null) {

AuditStamp sourceCreatedAuditStamp =
new AuditStamp().setTime(inputProperties.getSourceCreationTimestamp());
Urn actor = null;
Expand All @@ -182,36 +170,11 @@ public List<IngestResult> linkLatestVersion(
headerMap.put(HTTP_HEADER_IF_VERSION_MATCH, versionPropertiesConstraint.toString());
versionPropertiesProposal.setChangeType(ChangeType.UPSERT);
versionPropertiesProposal.setHeaders(headerMap);
ingestResults.add(
entityService.ingestProposal(
opContext, versionPropertiesProposal, opContext.getAuditStamp(), false));

// Might want to refactor this to a Patch w/ Create if not exists logic if more properties get
// added
// to Version Set Properties
VersionSetProperties versionSetProperties =
new VersionSetProperties()
.setVersioningScheme(
VersioningScheme
.ALPHANUMERIC_GENERATED_BY_DATAHUB) // Only one available, will need to add to
// input properties once more are added.
.setLatest(newLatestVersion);
MetadataChangeProposal versionSetPropertiesProposal = new MetadataChangeProposal();
versionSetPropertiesProposal.setEntityUrn(versionSet);
versionSetPropertiesProposal.setEntityType(VERSION_SET_ENTITY_NAME);
versionSetPropertiesProposal.setAspectName(VERSION_SET_PROPERTIES_ASPECT_NAME);
versionSetPropertiesProposal.setAspect(
GenericRecordUtils.serializeAspect(versionSetProperties));
versionSetPropertiesProposal.setChangeType(ChangeType.UPSERT);
StringMap versionSetHeaderMap = new StringMap();
versionSetHeaderMap.put(HTTP_HEADER_IF_VERSION_MATCH, versionSetConstraint.toString());
versionSetPropertiesProposal.setHeaders(versionSetHeaderMap);
versionSetPropertiesProposal.setSystemMetadata(systemMetadata);
ingestResults.add(
IngestResult result =
entityService.ingestProposal(
opContext, versionSetPropertiesProposal, opContext.getAuditStamp(), false));

return ingestResults;
opContext, versionPropertiesProposal, opContext.getAuditStamp(), false);
return result != null ? List.of(result) : List.of();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package com.linkedin.metadata.entity.versioning.sideeffects;

import static com.linkedin.metadata.Constants.*;

import com.datahub.util.RecordUtils;
import com.linkedin.common.VersionProperties;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.RetrieverContext;
import com.linkedin.metadata.aspect.SystemAspect;
import com.linkedin.metadata.aspect.batch.ChangeMCP;
import com.linkedin.metadata.aspect.batch.MCLItem;
import com.linkedin.metadata.aspect.batch.MCPItem;
import com.linkedin.metadata.aspect.patch.GenericJsonPatch;
import com.linkedin.metadata.aspect.patch.PatchOperationType;
import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig;
import com.linkedin.metadata.aspect.plugins.hooks.MCPSideEffect;
import com.linkedin.metadata.entity.ebean.batch.ChangeItemImpl;
import com.linkedin.metadata.entity.ebean.batch.PatchItemImpl;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.utils.EntityKeyUtils;
import com.linkedin.versionset.VersionSetProperties;
import java.util.Collection;
import java.util.List;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;

/**
* Side effect that updates the isLatest property for the referenced versioned entity's Version
* Properties aspect.
*/
@Slf4j
@Getter
@Setter
@Accessors(chain = true)
public class VersionPropertiesSideEffect extends MCPSideEffect {
@Nonnull private AspectPluginConfig config;

@Override
protected Stream<ChangeMCP> applyMCPSideEffect(
Collection<ChangeMCP> changeMCPS, @Nonnull RetrieverContext retrieverContext) {
return changeMCPS.stream().flatMap(item -> upsertVersionSet(item, retrieverContext));
}

@Override
protected Stream<MCPItem> postMCPSideEffect(
Collection<MCLItem> mclItems, @Nonnull RetrieverContext retrieverContext) {
return Stream.of();
}

private static Stream<ChangeMCP> upsertVersionSet(
ChangeMCP changeMCP, @Nonnull RetrieverContext retrieverContext) {
Urn entityUrn = changeMCP.getUrn();

if (!VERSION_PROPERTIES_ASPECT_NAME.equals(changeMCP.getAspectName())) {
return Stream.empty();
}

VersionProperties versionProperties = changeMCP.getAspect(VersionProperties.class);
if (versionProperties == null) {
log.error("Unable to process version properties for urn: {}", changeMCP.getUrn());
return Stream.empty();
}

VersionSetProperties newVersionSetProperties =
new VersionSetProperties()
.setVersioningScheme(versionProperties.getVersioningScheme())
.setLatest(entityUrn);

Urn versionSetUrn = versionProperties.getVersionSet();
SystemAspect versionSetPropertiesAspect =
retrieverContext
.getAspectRetriever()
.getLatestSystemAspect(versionSetUrn, VERSION_SET_PROPERTIES_ASPECT_NAME);
if (versionSetPropertiesAspect != null) {
VersionSetProperties versionSetProperties =
RecordUtils.toRecordTemplate(
VersionSetProperties.class, versionSetPropertiesAspect.getRecordTemplate().data());
Urn prevLatest = versionSetProperties.getLatest();
if (prevLatest.equals(entityUrn)) {
return Stream.empty();
}

SystemAspect prevLatestVersionPropertiesAspect =
retrieverContext
.getAspectRetriever()
.getLatestSystemAspect(prevLatest, VERSION_PROPERTIES_ASPECT_NAME);
if (prevLatestVersionPropertiesAspect == null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confused on this, if the previous latest doesn't exist we don't update? Is the idea here that the VersionSetProperties got set to something that hasn't been processed yet and because that comes later than this it is considered more latest even though it could have been deleted or something else?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm your'e right I should update here

return Stream.empty();
}

VersionProperties prevLatestVersionProperties =
RecordUtils.toRecordTemplate(
VersionProperties.class,
prevLatestVersionPropertiesAspect.getRecordTemplate().data());
if (versionProperties.getSortId().compareTo(prevLatestVersionProperties.getSortId()) <= 0) {
return Stream.empty();
}

// New version properties aspect is the latest
ChangeMCP updateVersionSetProperties =
ChangeItemImpl.builder()
.urn(versionSetUrn)
.aspectName(VERSION_SET_PROPERTIES_ASPECT_NAME)
.changeType(ChangeType.UPSERT)
.recordTemplate(newVersionSetProperties)
.auditStamp(changeMCP.getAuditStamp())
.systemMetadata(changeMCP.getSystemMetadata())
.build(retrieverContext.getAspectRetriever());

EntitySpec entitySpec =
retrieverContext
.getAspectRetriever()
.getEntityRegistry()
.getEntitySpec(prevLatest.getEntityType());
GenericJsonPatch.PatchOp patchOp = new GenericJsonPatch.PatchOp();
patchOp.setOp(PatchOperationType.ADD.getValue());
patchOp.setPath("/isLatest");
patchOp.setValue(false);
ChangeMCP updateOldLatestVersionProperties =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is the reason you're protecting against empty values and returning Stream.empty, I think it makes more sense to make this part optional to execute, i.e.

Suggested change
ChangeMCP updateOldLatestVersionProperties =
if (prevLatestVersionPropertiesAspect != null) {
prevLatestVersionProperties =
RecordUtils.toRecordTemplate(
VersionProperties.class,
prevLatestVersionPropertiesAspect.getRecordTemplate().data());
if (prevLatestVersionProperties != null) {
ChangeMCP updateOldLatestVersionProperties =

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No just an oversight lol

PatchItemImpl.builder()
.urn(prevLatest)
.entitySpec(entitySpec)
.aspectName(VERSION_PROPERTIES_ASPECT_NAME)
.aspectSpec(entitySpec.getAspectSpec(VERSION_PROPERTIES_ASPECT_NAME))
.patch(GenericJsonPatch.builder().patch(List.of(patchOp)).build().getJsonPatch())
.auditStamp(changeMCP.getAuditStamp())
.systemMetadata(changeMCP.getSystemMetadata())
.build(retrieverContext.getAspectRetriever().getEntityRegistry())
.applyPatch(prevLatestVersionProperties, retrieverContext.getAspectRetriever());

versionProperties.setIsLatest(true);
return Stream.of(updateVersionSetProperties, updateOldLatestVersionProperties);
}

// Version Set does not exist
final AspectSpec keyAspectSpec =
retrieverContext
.getAspectRetriever()
.getEntityRegistry()
.getEntitySpec(VERSION_SET_ENTITY_NAME)
.getKeyAspectSpec();
RecordTemplate versionSetKey =
EntityKeyUtils.convertUrnToEntityKey(versionSetUrn, keyAspectSpec);

ChangeMCP createVersionSetKey =
ChangeItemImpl.builder()
.urn(versionSetUrn)
.aspectName(VERSION_SET_KEY_ASPECT_NAME)
.changeType(ChangeType.UPSERT)
.recordTemplate(versionSetKey)
.auditStamp(changeMCP.getAuditStamp())
.systemMetadata(changeMCP.getSystemMetadata())
.build(retrieverContext.getAspectRetriever());

ChangeMCP createVersionSetProperties =
ChangeItemImpl.builder()
.urn(versionSetUrn)
.aspectName(VERSION_SET_PROPERTIES_ASPECT_NAME)
.changeType(ChangeType.UPSERT)
.recordTemplate(newVersionSetProperties)
.auditStamp(changeMCP.getAuditStamp())
.systemMetadata(changeMCP.getSystemMetadata())
.build(retrieverContext.getAspectRetriever());

versionProperties.setIsLatest(true);
return Stream.of(createVersionSetKey, createVersionSetProperties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,21 +124,26 @@ private static Stream<ChangeMCP> updateLatest(
.getEntitySpec(newLatest.getEntityType());
VersionProperties newLatestProperties =
RecordUtils.toRecordTemplate(VersionProperties.class, newLatestEntity.data());
GenericJsonPatch.PatchOp currentPatch = new GenericJsonPatch.PatchOp();
currentPatch.setOp(PatchOperationType.ADD.getValue());
currentPatch.setPath("/isLatest");
currentPatch.setValue(true);
mcpItems.add(
PatchItemImpl.builder()
.urn(newLatest)
.entitySpec(entitySpec)
.aspectName(VERSION_PROPERTIES_ASPECT_NAME)
.aspectSpec(entitySpec.getAspectSpec(VERSION_PROPERTIES_ASPECT_NAME))
.patch(GenericJsonPatch.builder().patch(List.of(currentPatch)).build().getJsonPatch())
.auditStamp(changeMCP.getAuditStamp())
.systemMetadata(changeMCP.getSystemMetadata())
.build(retrieverContext.getAspectRetriever().getEntityRegistry())
.applyPatch(newLatestProperties, retrieverContext.getAspectRetriever()));

if (Boolean.FALSE.equals(newLatestProperties.isIsLatest())) {
GenericJsonPatch.PatchOp currentPatch = new GenericJsonPatch.PatchOp();
currentPatch.setOp(PatchOperationType.ADD.getValue());
currentPatch.setPath("/isLatest");
currentPatch.setValue(true);
mcpItems.add(
PatchItemImpl.builder()
.urn(newLatest)
.entitySpec(entitySpec)
.aspectName(VERSION_PROPERTIES_ASPECT_NAME)
.aspectSpec(entitySpec.getAspectSpec(VERSION_PROPERTIES_ASPECT_NAME))
.patch(
GenericJsonPatch.builder().patch(List.of(currentPatch)).build().getJsonPatch())
.auditStamp(changeMCP.getAuditStamp())
.systemMetadata(changeMCP.getSystemMetadata())
.build(retrieverContext.getAspectRetriever().getEntityRegistry())
.applyPatch(newLatestProperties, retrieverContext.getAspectRetriever()));
}

return mcpItems.stream();
}
return Stream.empty();
Expand Down
Loading
Loading