Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(versioning): Support entity versioning ingestion #12755

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datahub-web-react/src/app/ingest/secret/SecretsList.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ export const SecretsList = () => {
);
setTimeout(() => {
refetch();
}, 2000);
}, 3000);
})
.catch((e) => {
message.destroy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -968,8 +968,9 @@ private IngestAspectsResult ingestAspectsToLocalDB(
// lock)

// Initial database state from database
Map<String, Map<String, SystemAspect>> batchAspects =
final Map<String, Map<String, SystemAspect>> batchAspects =
aspectDao.getLatestAspects(opContext, urnAspects, true);
final Map<String, Map<String, SystemAspect>> updatedLatestAspects;

// read #2 (potentially)
final Map<String, Map<String, Long>> nextVersions =
Expand All @@ -989,7 +990,6 @@ private IngestAspectsResult ingestAspectsToLocalDB(
// These items are new items from side effects
Map<String, Set<String>> sideEffects = updatedItems.getFirst();

final Map<String, Map<String, SystemAspect>> updatedLatestAspects;
final Map<String, Map<String, Long>> updatedNextVersions;

Map<String, Map<String, SystemAspect>> newLatestAspects =
Expand Down Expand Up @@ -1024,6 +1024,7 @@ private IngestAspectsResult ingestAspectsToLocalDB(
.collect(Collectors.toList());
} else {
changeMCPs = updatedItems.getSecond();
updatedLatestAspects = batchAspects;
}

// No changes, return
Expand Down Expand Up @@ -1080,7 +1081,7 @@ private IngestAspectsResult ingestAspectsToLocalDB(
Latest aspect after possible in-memory mutation
*/
final SystemAspect latestAspect =
batchAspects
updatedLatestAspects
.getOrDefault(writeItem.getUrn().toString(), Map.of())
.get(writeItem.getAspectName());

Expand Down Expand Up @@ -1145,8 +1146,9 @@ This condition is specifically for an older conditional write ingestAspectIfNotP
// Only consider retention when there was a previous version
.filter(
result ->
batchAspects.containsKey(result.getUrn().toString())
&& batchAspects
updatedLatestAspects.containsKey(
result.getUrn().toString())
&& updatedLatestAspects
.get(result.getUrn().toString())
.containsKey(
result.getRequest().getAspectName()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,8 @@ public List<IngestResult> linkLatestVersion(
Urn versionSet,
Urn newLatestVersion,
VersionPropertiesInput inputProperties) {
List<IngestResult> ingestResults = new ArrayList<>();
AspectRetriever aspectRetriever = opContext.getAspectRetriever();
String sortId;
Long versionSetConstraint;
Long versionPropertiesConstraint = -1L;
VersionSetKey versionSetKey =
(VersionSetKey)
Expand All @@ -93,36 +91,26 @@ public List<IngestResult> linkLatestVersion(
+ newLatestVersion.getEntityType());
}
if (!aspectRetriever.entityExists(ImmutableSet.of(versionSet)).get(versionSet)) {
MetadataChangeProposal versionSetKeyProposal = new MetadataChangeProposal();
versionSetKeyProposal.setEntityUrn(versionSet);
versionSetKeyProposal.setEntityType(VERSION_SET_ENTITY_NAME);
versionSetKeyProposal.setAspectName(VERSION_SET_KEY_ASPECT_NAME);
versionSetKeyProposal.setAspect(GenericRecordUtils.serializeAspect(versionSetKey));
versionSetKeyProposal.setChangeType(ChangeType.CREATE_ENTITY);
ingestResults.add(
entityService.ingestProposal(
opContext, versionSetKeyProposal, opContext.getAuditStamp(), false));

sortId = INITIAL_VERSION_SORT_ID;
versionSetConstraint = -1L;
} else {
SystemAspect versionSetPropertiesAspect =
aspectRetriever.getLatestSystemAspect(versionSet, VERSION_SET_PROPERTIES_ASPECT_NAME);
VersionSetProperties versionSetProperties =
RecordUtils.toRecordTemplate(
VersionSetProperties.class, versionSetPropertiesAspect.getRecordTemplate().data());
versionSetConstraint =
versionSetPropertiesAspect
.getSystemMetadataVersion()
.orElse(versionSetPropertiesAspect.getVersion());

if (versionSetProperties.getVersioningScheme()
!= VersioningScheme.ALPHANUMERIC_GENERATED_BY_DATAHUB) {
throw new IllegalArgumentException(
"Only versioning scheme supported is ALPHANUMERIC_GENERATED_BY_DATAHUB");
}

SystemAspect latestVersion =
aspectRetriever.getLatestSystemAspect(
versionSetProperties.getLatest(), VERSION_PROPERTIES_ASPECT_NAME);
VersionProperties latestVersionProperties =
RecordUtils.toRecordTemplate(
VersionProperties.class, latestVersion.getRecordTemplate().data());
// When more impls for versioning scheme are set up, this will need to be resolved to the
// correct scheme generation strategy
sortId = AlphanumericSortIdGenerator.increment(latestVersionProperties.getSortId());
}

Expand Down Expand Up @@ -154,9 +142,9 @@ public List<IngestResult> linkLatestVersion(
.setComment(inputProperties.getComment(), SetMode.IGNORE_NULL)
.setVersion(versionTag)
.setMetadataCreatedTimestamp(opContext.getAuditStamp())
.setSortId(sortId);
.setSortId(sortId)
.setVersioningScheme(VersioningScheme.ALPHANUMERIC_GENERATED_BY_DATAHUB);
if (inputProperties.getSourceCreationTimestamp() != null) {

AuditStamp sourceCreatedAuditStamp =
new AuditStamp().setTime(inputProperties.getSourceCreationTimestamp());
Urn actor = null;
Expand All @@ -182,36 +170,11 @@ public List<IngestResult> linkLatestVersion(
headerMap.put(HTTP_HEADER_IF_VERSION_MATCH, versionPropertiesConstraint.toString());
versionPropertiesProposal.setChangeType(ChangeType.UPSERT);
versionPropertiesProposal.setHeaders(headerMap);
ingestResults.add(
entityService.ingestProposal(
opContext, versionPropertiesProposal, opContext.getAuditStamp(), false));

// Might want to refactor this to a Patch w/ Create if not exists logic if more properties get
// added
// to Version Set Properties
VersionSetProperties versionSetProperties =
new VersionSetProperties()
.setVersioningScheme(
VersioningScheme
.ALPHANUMERIC_GENERATED_BY_DATAHUB) // Only one available, will need to add to
// input properties once more are added.
.setLatest(newLatestVersion);
MetadataChangeProposal versionSetPropertiesProposal = new MetadataChangeProposal();
versionSetPropertiesProposal.setEntityUrn(versionSet);
versionSetPropertiesProposal.setEntityType(VERSION_SET_ENTITY_NAME);
versionSetPropertiesProposal.setAspectName(VERSION_SET_PROPERTIES_ASPECT_NAME);
versionSetPropertiesProposal.setAspect(
GenericRecordUtils.serializeAspect(versionSetProperties));
versionSetPropertiesProposal.setChangeType(ChangeType.UPSERT);
StringMap versionSetHeaderMap = new StringMap();
versionSetHeaderMap.put(HTTP_HEADER_IF_VERSION_MATCH, versionSetConstraint.toString());
versionSetPropertiesProposal.setHeaders(versionSetHeaderMap);
versionSetPropertiesProposal.setSystemMetadata(systemMetadata);
ingestResults.add(
IngestResult result =
entityService.ingestProposal(
opContext, versionSetPropertiesProposal, opContext.getAuditStamp(), false));

return ingestResults;
opContext, versionPropertiesProposal, opContext.getAuditStamp(), false);
return result != null ? List.of(result) : List.of();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
package com.linkedin.metadata.entity.versioning.sideeffects;

import static com.linkedin.metadata.Constants.*;

import com.datahub.util.RecordUtils;
import com.linkedin.common.VersionProperties;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.entity.Aspect;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.RetrieverContext;
import com.linkedin.metadata.aspect.batch.ChangeMCP;
import com.linkedin.metadata.aspect.batch.MCLItem;
import com.linkedin.metadata.aspect.batch.MCPItem;
import com.linkedin.metadata.aspect.patch.GenericJsonPatch;
import com.linkedin.metadata.aspect.patch.PatchOperationType;
import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig;
import com.linkedin.metadata.aspect.plugins.hooks.MCPSideEffect;
import com.linkedin.metadata.entity.ebean.batch.ChangeItemImpl;
import com.linkedin.metadata.entity.ebean.batch.PatchItemImpl;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.utils.EntityKeyUtils;
import com.linkedin.versionset.VersionSetProperties;
import java.util.Collection;
import java.util.List;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;

/**
* Side effect that updates the isLatest property for the referenced versioned entity's Version
* Properties aspect.
*/
@Slf4j
@Getter
@Setter
@Accessors(chain = true)
public class VersionPropertiesSideEffect extends MCPSideEffect {
@Nonnull private AspectPluginConfig config;

@Override
protected Stream<ChangeMCP> applyMCPSideEffect(
Collection<ChangeMCP> changeMCPS, @Nonnull RetrieverContext retrieverContext) {
return changeMCPS.stream().flatMap(item -> processMCP(item, retrieverContext));
}

@Override
protected Stream<MCPItem> postMCPSideEffect(
Collection<MCLItem> mclItems, @Nonnull RetrieverContext retrieverContext) {
return Stream.of();
}

private static Stream<ChangeMCP> processMCP(
ChangeMCP changeMCP, @Nonnull RetrieverContext retrieverContext) {
Urn entityUrn = changeMCP.getUrn();

if (!VERSION_PROPERTIES_ASPECT_NAME.equals(changeMCP.getAspectName())) {
return Stream.empty();
}

VersionProperties versionProperties = changeMCP.getAspect(VersionProperties.class);
if (versionProperties == null) {
log.error("Unable to process version properties for urn: {}", changeMCP.getUrn());
return Stream.empty();
}

Urn versionSetUrn = versionProperties.getVersionSet();
Aspect versionSetPropertiesAspect =
retrieverContext
.getAspectRetriever()
.getLatestAspectObject(versionSetUrn, VERSION_SET_PROPERTIES_ASPECT_NAME);
if (versionSetPropertiesAspect == null) {
return createVersionSet(versionProperties, changeMCP, retrieverContext);
}

// Version set exists -- only update if there is a new latest
VersionSetProperties versionSetProperties =
RecordUtils.toRecordTemplate(VersionSetProperties.class, versionSetPropertiesAspect.data());
Urn prevLatest = versionSetProperties.getLatest();
if (prevLatest.equals(entityUrn)) {
return Stream.empty();
}

VersionProperties prevLatestVersionProperties = null;
Aspect prevLatestVersionPropertiesAspect =
retrieverContext
.getAspectRetriever()
.getLatestAspectObject(prevLatest, VERSION_PROPERTIES_ASPECT_NAME);
if (prevLatestVersionPropertiesAspect != null) {
prevLatestVersionProperties =
RecordUtils.toRecordTemplate(
VersionProperties.class, prevLatestVersionPropertiesAspect.data());
if (versionProperties.getSortId().compareTo(prevLatestVersionProperties.getSortId()) <= 0) {
return Stream.empty();
}
}

// New version properties is the new latest
return updateVersionSetLatest(
versionProperties, prevLatestVersionProperties, prevLatest, changeMCP, retrieverContext);
}

private static Stream<ChangeMCP> createVersionSet(
@Nonnull VersionProperties versionProperties,
ChangeMCP changeMCP,
@Nonnull RetrieverContext retrieverContext) {
versionProperties.setIsLatest(true);

Urn entityUrn = changeMCP.getUrn();
Urn versionSetUrn = versionProperties.getVersionSet();

AspectSpec keyAspectSpec =
retrieverContext
.getAspectRetriever()
.getEntityRegistry()
.getEntitySpec(VERSION_SET_ENTITY_NAME)
.getKeyAspectSpec();
RecordTemplate versionSetKey =
EntityKeyUtils.convertUrnToEntityKey(versionSetUrn, keyAspectSpec);
ChangeMCP createVersionSetKey =
ChangeItemImpl.builder()
.urn(versionSetUrn)
.aspectName(VERSION_SET_KEY_ASPECT_NAME)
.changeType(ChangeType.UPSERT)
.recordTemplate(versionSetKey)
.auditStamp(changeMCP.getAuditStamp())
.systemMetadata(changeMCP.getSystemMetadata())
.build(retrieverContext.getAspectRetriever());

VersionSetProperties versionSetPropertiesWithNewLatest =
new VersionSetProperties()
.setVersioningScheme(versionProperties.getVersioningScheme())
.setLatest(entityUrn);
ChangeMCP createVersionSetProperties =
ChangeItemImpl.builder()
.urn(versionSetUrn)
.aspectName(VERSION_SET_PROPERTIES_ASPECT_NAME)
.changeType(ChangeType.UPSERT)
.recordTemplate(versionSetPropertiesWithNewLatest)
.auditStamp(changeMCP.getAuditStamp())
.systemMetadata(changeMCP.getSystemMetadata())
.build(retrieverContext.getAspectRetriever());

return Stream.of(createVersionSetKey, createVersionSetProperties);
}

private static Stream<ChangeMCP> updateVersionSetLatest(
@Nonnull VersionProperties versionProperties,
@Nullable VersionProperties prevLatestVersionProperties,
@Nonnull Urn prevLatest,
ChangeMCP changeMCP,
@Nonnull RetrieverContext retrieverContext) {
versionProperties.setIsLatest(true);

Urn entityUrn = changeMCP.getUrn();
Urn versionSetUrn = versionProperties.getVersionSet();

VersionSetProperties versionSetPropertiesWithNewLatest =
new VersionSetProperties()
.setVersioningScheme(versionProperties.getVersioningScheme())
.setLatest(entityUrn);
ChangeMCP updateVersionSetProperties =
ChangeItemImpl.builder()
.urn(versionSetUrn)
.aspectName(VERSION_SET_PROPERTIES_ASPECT_NAME)
.changeType(ChangeType.UPSERT)
.recordTemplate(versionSetPropertiesWithNewLatest)
.auditStamp(changeMCP.getAuditStamp())
.systemMetadata(changeMCP.getSystemMetadata())
.build(retrieverContext.getAspectRetriever());

if (prevLatestVersionProperties == null) {
return Stream.of(updateVersionSetProperties);
}

EntitySpec entitySpec =
retrieverContext
.getAspectRetriever()
.getEntityRegistry()
.getEntitySpec(prevLatest.getEntityType());
GenericJsonPatch.PatchOp patchOp = new GenericJsonPatch.PatchOp();
patchOp.setOp(PatchOperationType.ADD.getValue());
patchOp.setPath("/isLatest");
patchOp.setValue(false);
ChangeMCP updateOldLatestVersionProperties =
PatchItemImpl.builder()
.urn(prevLatest)
.entitySpec(entitySpec)
.aspectName(VERSION_PROPERTIES_ASPECT_NAME)
.aspectSpec(entitySpec.getAspectSpec(VERSION_PROPERTIES_ASPECT_NAME))
.patch(GenericJsonPatch.builder().patch(List.of(patchOp)).build().getJsonPatch())
.auditStamp(changeMCP.getAuditStamp())
.systemMetadata(changeMCP.getSystemMetadata())
.build(retrieverContext.getAspectRetriever().getEntityRegistry())
.applyPatch(prevLatestVersionProperties, retrieverContext.getAspectRetriever());

return Stream.of(updateVersionSetProperties, updateOldLatestVersionProperties);
}
}
Loading
Loading